diff options
Diffstat (limited to 'test/cpp')
22 files changed, 1369 insertions, 142 deletions
diff --git a/test/cpp/common/channel_arguments_test.cc b/test/cpp/common/channel_arguments_test.cc index 190d32ce06..9bcc9f99f6 100644 --- a/test/cpp/common/channel_arguments_test.cc +++ b/test/cpp/common/channel_arguments_test.cc @@ -230,13 +230,6 @@ TEST_F(ChannelArgumentsTest, SetSocketMutator) { EXPECT_TRUE(HasArg(arg1)); // arg0 is replaced by arg1 EXPECT_FALSE(HasArg(arg0)); - - // arg0 is destroyed by grpc_socket_mutator_to_arg(mutator1) - { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - arg1.value.pointer.vtable->destroy(&exec_ctx, arg1.value.pointer.p); - grpc_exec_ctx_finish(&exec_ctx); - } } TEST_F(ChannelArgumentsTest, SetUserAgentPrefix) { diff --git a/test/cpp/interop/http2_client.cc b/test/cpp/interop/http2_client.cc index 01c07823cf..38a437f39f 100644 --- a/test/cpp/interop/http2_client.cc +++ b/test/cpp/interop/http2_client.cc @@ -41,7 +41,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/transport/byte_stream.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/interop/http2_client.h" diff --git a/test/cpp/interop/http2_client.h b/test/cpp/interop/http2_client.h index 12df5d26bc..e57d695205 100644 --- a/test/cpp/interop/http2_client.h +++ b/test/cpp/interop/http2_client.h @@ -38,7 +38,7 @@ #include <grpc++/channel.h> #include <grpc/grpc.h> -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" namespace grpc { diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index b7f2723c39..55ba324cc7 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -46,8 +46,8 @@ #include <grpc/support/useful.h> #include "src/core/lib/transport/byte_stream.h" -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/empty.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/interop/client_helper.h" #include "test/cpp/interop/interop_client.h" diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 74f4db6b78..efcb7d2860 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -38,7 +38,7 @@ #include <grpc++/channel.h> #include <grpc/grpc.h> -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" namespace grpc { diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index 5a810b45ef..1cbca17928 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -48,8 +48,8 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/byte_stream.h" -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/empty.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/interop/server_helper.h" #include "test/cpp/util/test_config.h" diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index 1c2f606637..01d985068d 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -40,8 +40,8 @@ #include <grpc++/support/channel_arguments.h> #include <grpc/grpc.h> #include <grpc/support/log.h> -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/empty.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/util/test_config.h" diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc index 634d0a90fc..8d1b884af9 100644 --- a/test/cpp/interop/reconnect_interop_server.cc +++ b/test/cpp/interop/reconnect_interop_server.cc @@ -47,8 +47,8 @@ #include <grpc/grpc.h> #include <grpc/support/log.h> -#include "src/proto/grpc/testing/empty.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/empty.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/core/util/reconnect_server.h" #include "test/cpp/util/test_config.h" diff --git a/test/cpp/microbenchmarks/bm_arena.cc b/test/cpp/microbenchmarks/bm_arena.cc new file mode 100644 index 0000000000..770c0b6d47 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_arena.cc @@ -0,0 +1,76 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Benchmark arenas */ + +extern "C" { +#include "src/core/lib/support/arena.h" +} +#include "test/cpp/microbenchmarks/helpers.h" +#include "third_party/benchmark/include/benchmark/benchmark.h" + +static void BM_Arena_NoOp(benchmark::State& state) { + while (state.KeepRunning()) { + gpr_arena_destroy(gpr_arena_create(state.range(0))); + } +} +BENCHMARK(BM_Arena_NoOp)->Range(1, 1024 * 1024); + +static void BM_Arena_ManyAlloc(benchmark::State& state) { + gpr_arena* a = gpr_arena_create(state.range(0)); + const size_t realloc_after = + 1024 * 1024 * 1024 / ((state.range(1) + 15) & 0xffffff0u); + while (state.KeepRunning()) { + gpr_arena_alloc(a, state.range(1)); + // periodically recreate arena to avoid OOM + if (state.iterations() % realloc_after == 0) { + gpr_arena_destroy(a); + a = gpr_arena_create(state.range(0)); + } + } + gpr_arena_destroy(a); +} +BENCHMARK(BM_Arena_ManyAlloc)->Ranges({{1, 1024 * 1024}, {1, 32 * 1024}}); + +static void BM_Arena_Batch(benchmark::State& state) { + while (state.KeepRunning()) { + gpr_arena* a = gpr_arena_create(state.range(0)); + for (int i = 0; i < state.range(1); i++) { + gpr_arena_alloc(a, state.range(2)); + } + gpr_arena_destroy(a); + } +} +BENCHMARK(BM_Arena_Batch)->Ranges({{1, 64 * 1024}, {1, 64}, {1, 1024}}); + +BENCHMARK_MAIN(); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 014e2b96b5..1ef8caa690 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -53,6 +53,7 @@ extern "C" { #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/channel/message_size_filter.h" +#include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" } @@ -85,6 +86,9 @@ BENCHMARK(BM_Zalloc) ->Arg(6144) ->Arg(7168); +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks creating full stacks + class BaseChannelFixture { public: BaseChannelFixture(grpc_channel *channel) : channel_(channel) {} @@ -130,6 +134,9 @@ static void BM_CallCreateDestroy(benchmark::State &state) { BENCHMARK_TEMPLATE(BM_CallCreateDestroy, InsecureChannel); BENCHMARK_TEMPLATE(BM_CallCreateDestroy, LameChannel); +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks isolating individual filters + static void *tag(int i) { return reinterpret_cast<void *>(static_cast<intptr_t>(i)); } @@ -232,7 +239,7 @@ static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) {} + grpc_closure *then_sched_closure) {} grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { @@ -275,7 +282,7 @@ const char *name; /* implementation of grpc_transport_init_stream */ int InitStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { return 0; } @@ -299,7 +306,7 @@ void PerformOp(grpc_exec_ctx *exec_ctx, grpc_transport *self, /* implementation of grpc_transport_destroy_stream */ void DestroyStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, void *and_free_memory) {} + grpc_stream *stream, grpc_closure *then_sched_closure) {} /* implementation of grpc_transport_destroy */ void Destroy(grpc_exec_ctx *exec_ctx, grpc_transport *self) {} @@ -394,7 +401,7 @@ static void BM_IsolatedFilter(benchmark::State &state) { grpc_channel_stack *channel_stack = static_cast<grpc_channel_stack *>(gpr_zalloc(channel_size)); GPR_ASSERT(GRPC_LOG_IF_ERROR( - "call_stack_init", + "channel_stack_init", grpc_channel_stack_init(&exec_ctx, 1, FilterDestroy, channel_stack, &filters[0], filters.size(), &channel_args, fixture.flags & REQUIRES_TRANSPORT @@ -409,15 +416,29 @@ static void BM_IsolatedFilter(benchmark::State &state) { grpc_slice method = grpc_slice_from_static_string("/foo/bar"); grpc_call_final_info final_info; TestOp test_op_data; + grpc_call_element_args call_args; + call_args.call_stack = call_stack; + call_args.server_transport_data = NULL; + call_args.context = NULL; + call_args.path = method; + call_args.start_time = start_time; + call_args.deadline = deadline; + const int kArenaSize = 4096; + call_args.arena = gpr_arena_create(kArenaSize); while (state.KeepRunning()) { GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1, - DoNothing, NULL, NULL, NULL, method, - start_time, deadline, call_stack)); + DoNothing, NULL, &call_args)); typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack); grpc_call_stack_destroy(&exec_ctx, call_stack, &final_info, NULL); op.Finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx); + // recreate arena every 64k iterations to avoid oom + if (0 == (state.iterations() & 0xffff)) { + gpr_arena_destroy(call_args.arena); + call_args.arena = gpr_arena_create(kArenaSize); + } } + gpr_arena_destroy(call_args.arena); grpc_channel_stack_destroy(&exec_ctx, channel_stack); grpc_exec_ctx_finish(&exec_ctx); gpr_free(channel_stack); @@ -460,4 +481,205 @@ typedef Fixture<&grpc_load_reporting_filter, CHECKS_NOT_LAST> BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata); +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks isolating grpc_call + +namespace isolated_call_filter { + +static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + if (op->recv_initial_metadata) { + grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } + if (op->recv_message) { + grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_NONE); + } + grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE); +} + +static void StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + if (op->disconnect_with_error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(op->disconnect_with_error); + } + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); +} + +static grpc_error *InitCallElem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_element_args *args) { + return GRPC_ERROR_NONE; +} + +static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_polling_entity *pollent) {} + +static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const grpc_call_final_info *final_info, + grpc_closure *then_sched_closure) { + grpc_closure_sched(exec_ctx, then_sched_closure, GRPC_ERROR_NONE); +} + +grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_channel_element_args *args) { + return GRPC_ERROR_NONE; +} + +void DestroyChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} + +char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + return gpr_strdup("peer"); +} + +void GetChannelInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + const grpc_channel_info *channel_info) {} + +static const grpc_channel_filter isolated_call_filter = { + StartTransportStreamOp, + StartTransportOp, + 0, + InitCallElem, + SetPollsetOrPollsetSet, + DestroyCallElem, + 0, + InitChannelElem, + DestroyChannelElem, + GetPeer, + GetChannelInfo, + "isolated_call_filter"}; +} + +class IsolatedCallFixture : public TrackCounters { + public: + IsolatedCallFixture() { + grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder_set_name(builder, "dummy"); + grpc_channel_stack_builder_set_target(builder, "dummy_target"); + GPR_ASSERT(grpc_channel_stack_builder_append_filter( + builder, &isolated_call_filter::isolated_call_filter, NULL, NULL)); + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + channel_ = grpc_channel_create_with_builder(&exec_ctx, builder, + GRPC_CLIENT_CHANNEL); + grpc_exec_ctx_finish(&exec_ctx); + } + cq_ = grpc_completion_queue_create(NULL); + } + + void Finish(benchmark::State &state) { + grpc_completion_queue_destroy(cq_); + grpc_channel_destroy(channel_); + TrackCounters::Finish(state); + } + + grpc_channel *channel() const { return channel_; } + grpc_completion_queue *cq() const { return cq_; } + + private: + grpc_completion_queue *cq_; + grpc_channel *channel_; +}; + +static void BM_IsolatedCall_NoOp(benchmark::State &state) { + IsolatedCallFixture fixture; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + void *method_hdl = + grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); + while (state.KeepRunning()) { + grpc_call_destroy(grpc_channel_create_registered_call( + fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(), + method_hdl, deadline, NULL)); + } + fixture.Finish(state); +} +BENCHMARK(BM_IsolatedCall_NoOp); + +static void BM_IsolatedCall_Unary(benchmark::State &state) { + IsolatedCallFixture fixture; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + void *method_hdl = + grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); + grpc_slice slice = grpc_slice_from_static_string("hello world"); + grpc_byte_buffer *send_message = grpc_raw_byte_buffer_create(&slice, 1); + grpc_byte_buffer *recv_message = NULL; + grpc_status_code status_code; + grpc_slice status_details = grpc_empty_slice(); + grpc_metadata_array recv_initial_metadata; + grpc_metadata_array_init(&recv_initial_metadata); + grpc_metadata_array recv_trailing_metadata; + grpc_metadata_array_init(&recv_trailing_metadata); + grpc_op ops[6]; + memset(ops, 0, sizeof(ops)); + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[1].op = GRPC_OP_SEND_MESSAGE; + ops[1].data.send_message.send_message = send_message; + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata.recv_initial_metadata = + &recv_initial_metadata; + ops[4].op = GRPC_OP_RECV_MESSAGE; + ops[4].data.recv_message.recv_message = &recv_message; + ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[5].data.recv_status_on_client.status = &status_code; + ops[5].data.recv_status_on_client.status_details = &status_details; + ops[5].data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata; + while (state.KeepRunning()) { + grpc_call *call = grpc_channel_create_registered_call( + fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(), + method_hdl, deadline, NULL); + grpc_call_start_batch(call, ops, 6, tag(1), NULL); + grpc_completion_queue_next(fixture.cq(), + gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); + grpc_call_destroy(call); + } + fixture.Finish(state); + grpc_metadata_array_destroy(&recv_initial_metadata); + grpc_metadata_array_destroy(&recv_trailing_metadata); + grpc_byte_buffer_destroy(send_message); +} +BENCHMARK(BM_IsolatedCall_Unary); + +static void BM_IsolatedCall_StreamingSend(benchmark::State &state) { + IsolatedCallFixture fixture; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + void *method_hdl = + grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); + grpc_slice slice = grpc_slice_from_static_string("hello world"); + grpc_byte_buffer *send_message = grpc_raw_byte_buffer_create(&slice, 1); + grpc_metadata_array recv_initial_metadata; + grpc_metadata_array_init(&recv_initial_metadata); + grpc_metadata_array recv_trailing_metadata; + grpc_metadata_array_init(&recv_trailing_metadata); + grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[1].data.recv_initial_metadata.recv_initial_metadata = + &recv_initial_metadata; + grpc_call *call = grpc_channel_create_registered_call( + fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(), + method_hdl, deadline, NULL); + grpc_call_start_batch(call, ops, 2, tag(1), NULL); + grpc_completion_queue_next(fixture.cq(), gpr_inf_future(GPR_CLOCK_MONOTONIC), + NULL); + memset(ops, 0, sizeof(ops)); + ops[0].op = GRPC_OP_SEND_MESSAGE; + ops[0].data.send_message.send_message = send_message; + while (state.KeepRunning()) { + grpc_call_start_batch(call, ops, 1, tag(2), NULL); + grpc_completion_queue_next(fixture.cq(), + gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); + } + grpc_call_destroy(call); + fixture.Finish(state); + grpc_metadata_array_destroy(&recv_initial_metadata); + grpc_metadata_array_destroy(&recv_trailing_metadata); + grpc_byte_buffer_destroy(send_message); +} +BENCHMARK(BM_IsolatedCall_StreamingSend); + BENCHMARK_MAIN(); diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc new file mode 100644 index 0000000000..254d57de20 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -0,0 +1,587 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Microbenchmarks around CHTTP2 transport operations */ + +#include <grpc++/support/channel_arguments.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <string.h> +#include <memory> +#include <queue> +#include <sstream> +extern "C" { +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/transport/static_metadata.h" +} +#include "test/cpp/microbenchmarks/helpers.h" +#include "third_party/benchmark/include/benchmark/benchmark.h" + +auto &force_library_initialization = Library::get(); + +//////////////////////////////////////////////////////////////////////////////// +// Helper classes +// + +class DummyEndpoint : public grpc_endpoint { + public: + DummyEndpoint() { + static const grpc_endpoint_vtable my_vtable = {read, + write, + get_workqueue, + add_to_pollset, + add_to_pollset_set, + shutdown, + destroy, + get_resource_user, + get_peer, + get_fd}; + grpc_endpoint::vtable = &my_vtable; + ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint"); + } + + void PushInput(grpc_exec_ctx *exec_ctx, grpc_slice slice) { + if (read_cb_ == nullptr) { + GPR_ASSERT(!have_slice_); + buffered_slice_ = slice; + have_slice_ = true; + return; + } + grpc_slice_buffer_add(slices_, slice); + grpc_closure_sched(exec_ctx, read_cb_, GRPC_ERROR_NONE); + read_cb_ = nullptr; + } + + private: + grpc_resource_user *ru_; + grpc_closure *read_cb_ = nullptr; + grpc_slice_buffer *slices_ = nullptr; + bool have_slice_ = false; + grpc_slice buffered_slice_; + + void QueueRead(grpc_exec_ctx *exec_ctx, grpc_slice_buffer *slices, + grpc_closure *cb) { + GPR_ASSERT(read_cb_ == nullptr); + if (have_slice_) { + have_slice_ = false; + grpc_slice_buffer_add(slices, buffered_slice_); + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE); + return; + } + read_cb_ = cb; + slices_ = slices; + } + + static void read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_slice_buffer *slices, grpc_closure *cb) { + static_cast<DummyEndpoint *>(ep)->QueueRead(exec_ctx, slices, cb); + } + + static void write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_slice_buffer *slices, grpc_closure *cb) { + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE); + } + + static grpc_workqueue *get_workqueue(grpc_endpoint *ep) { return NULL; } + + static void add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset *pollset) {} + + static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + + static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { + grpc_resource_user_shutdown(exec_ctx, + static_cast<DummyEndpoint *>(ep)->ru_); + grpc_closure_sched(exec_ctx, static_cast<DummyEndpoint *>(ep)->read_cb_, + why); + } + + static void destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_resource_user_unref(exec_ctx, static_cast<DummyEndpoint *>(ep)->ru_); + delete static_cast<DummyEndpoint *>(ep); + } + + static grpc_resource_user *get_resource_user(grpc_endpoint *ep) { + return static_cast<DummyEndpoint *>(ep)->ru_; + } + static char *get_peer(grpc_endpoint *ep) { return gpr_strdup("test"); } + static int get_fd(grpc_endpoint *ep) { return 0; } +}; + +class Fixture { + public: + Fixture(const grpc::ChannelArguments &args, bool client) { + grpc_channel_args c_args = args.c_channel_args(); + ep_ = new DummyEndpoint; + t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client); + grpc_chttp2_transport_start_reading(exec_ctx(), t_, NULL); + FlushExecCtx(); + } + + void FlushExecCtx() { grpc_exec_ctx_flush(&exec_ctx_); } + + ~Fixture() { + grpc_transport_destroy(&exec_ctx_, t_); + grpc_exec_ctx_finish(&exec_ctx_); + } + + grpc_chttp2_transport *chttp2_transport() { + return reinterpret_cast<grpc_chttp2_transport *>(t_); + } + grpc_transport *transport() { return t_; } + grpc_exec_ctx *exec_ctx() { return &exec_ctx_; } + + void PushInput(grpc_slice slice) { ep_->PushInput(exec_ctx(), slice); } + + private: + DummyEndpoint *ep_; + grpc_exec_ctx exec_ctx_ = GRPC_EXEC_CTX_INIT; + grpc_transport *t_; +}; + +static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +class Stream { + public: + Stream(Fixture *f) : f_(f) { + GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); + stream_size_ = grpc_transport_stream_size(f->transport()); + stream_ = gpr_malloc(stream_size_); + arena_ = gpr_arena_create(4096); + } + + ~Stream() { + gpr_free(stream_); + gpr_arena_destroy(arena_); + } + + void Init(benchmark::State &state) { + memset(stream_, 0, stream_size_); + if ((state.iterations() & 0xffff) == 0) { + gpr_arena_destroy(arena_); + arena_ = gpr_arena_create(4096); + } + grpc_transport_init_stream(f_->exec_ctx(), f_->transport(), + static_cast<grpc_stream *>(stream_), &refcount_, + NULL, arena_); + } + + void DestroyThen(grpc_closure *closure) { + grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), + static_cast<grpc_stream *>(stream_), closure); + } + + void Op(grpc_transport_stream_op *op) { + grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(), + static_cast<grpc_stream *>(stream_), op); + } + + grpc_chttp2_stream *chttp2_stream() { + return static_cast<grpc_chttp2_stream *>(stream_); + } + + private: + Fixture *f_; + grpc_stream_refcount refcount_; + gpr_arena *arena_; + size_t stream_size_; + void *stream_; +}; + +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template <class F> +std::unique_ptr<Closure> MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + grpc_closure_init(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr<Closure>(new C(f, sched)); +} + +template <class F> +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + delete static_cast<C *>(arg); + } + }; + auto *c = new C{f}; + return grpc_closure_init(c, C::Execute, c, sched); +} + +//////////////////////////////////////////////////////////////////////////////// +// Benchmarks +// + +static void BM_StreamCreateDestroy(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + std::unique_ptr<Closure> next = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + s.Init(state); + s.DestroyThen(next.get()); + }); + grpc_closure_run(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + track_counters.Finish(state); +} +BENCHMARK(BM_StreamCreateDestroy); + +class RepresentativeClientInitialMetadata { + public: + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) { + return { + GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_METHOD_POST, + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_PATH, + grpc_slice_intern(grpc_slice_from_static_string("/foo/bar"))), + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_AUTHORITY, + grpc_slice_intern(grpc_slice_from_static_string( + "foo.test.google.fr:1234"))), + GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE_COMMA_GZIP, + GRPC_MDELEM_TE_TRAILERS, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_USER_AGENT, + grpc_slice_intern(grpc_slice_from_static_string( + "grpc-c/3.0.0-dev (linux; chttp2; green)")))}; + } +}; + +template <class Metadata> +static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + grpc_transport_stream_op op; + std::unique_ptr<Closure> start; + std::unique_ptr<Closure> done; + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector<grpc_mdelem> elems = Metadata::GetElems(f.exec_ctx()); + std::vector<grpc_linked_mdelem> storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + f.FlushExecCtx(); + start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + s.Init(state); + memset(&op, 0, sizeof(op)); + op.on_complete = done.get(); + op.send_initial_metadata = &b; + s.Op(&op); + }); + done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + memset(&op, 0, sizeof(op)); + op.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen(start.get()); + }); + grpc_closure_sched(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + track_counters.Finish(state); +} +BENCHMARK_TEMPLATE(BM_StreamCreateSendInitialMetadataDestroy, + RepresentativeClientInitialMetadata); + +static void BM_TransportEmptyOp(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op op; + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + memset(&op, 0, sizeof(op)); + op.on_complete = c.get(); + s.Op(&op); + }); + grpc_closure_sched(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); + f.FlushExecCtx(); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); +} +BENCHMARK(BM_TransportEmptyOp); + +static void BM_TransportStreamSend(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op op; + grpc_slice_buffer_stream send_stream; + grpc_slice_buffer send_buffer; + grpc_slice_buffer_init(&send_buffer); + grpc_slice_buffer_add(&send_buffer, gpr_slice_malloc(state.range(0))); + memset(GRPC_SLICE_START_PTR(send_buffer.slices[0]), 0, + GRPC_SLICE_LENGTH(send_buffer.slices[0])); + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector<grpc_mdelem> elems = + RepresentativeClientInitialMetadata::GetElems(f.exec_ctx()); + std::vector<grpc_linked_mdelem> storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; + f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024; + grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); + memset(&op, 0, sizeof(op)); + op.on_complete = c.get(); + op.send_message = &send_stream.base; + s.Op(&op); + }); + + memset(&op, 0, sizeof(op)); + op.send_initial_metadata = &b; + op.on_complete = c.get(); + s.Op(&op); + + f.FlushExecCtx(); + memset(&op, 0, sizeof(op)); + op.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + grpc_slice_buffer_destroy(&send_buffer); +} +BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024); + +#define SLICE_FROM_BUFFER(s) grpc_slice_from_static_buffer(s, sizeof(s) - 1) + +static grpc_slice CreateIncomingDataSlice(size_t length, size_t frame_size) { + std::queue<char> unframed; + + unframed.push(static_cast<uint8_t>(0)); + unframed.push(static_cast<uint8_t>(length >> 24)); + unframed.push(static_cast<uint8_t>(length >> 16)); + unframed.push(static_cast<uint8_t>(length >> 8)); + unframed.push(static_cast<uint8_t>(length)); + for (size_t i = 0; i < length; i++) { + unframed.push('a'); + } + + std::vector<char> framed; + while (unframed.size() > frame_size) { + // frame size + framed.push_back(static_cast<uint8_t>(frame_size >> 16)); + framed.push_back(static_cast<uint8_t>(frame_size >> 8)); + framed.push_back(static_cast<uint8_t>(frame_size)); + // data frame + framed.push_back(0); + // no flags + framed.push_back(0); + // stream id + framed.push_back(0); + framed.push_back(0); + framed.push_back(0); + framed.push_back(1); + // frame data + for (size_t i = 0; i < frame_size; i++) { + framed.push_back(unframed.front()); + unframed.pop(); + } + } + + // frame size + framed.push_back(static_cast<uint8_t>(unframed.size() >> 16)); + framed.push_back(static_cast<uint8_t>(unframed.size() >> 8)); + framed.push_back(static_cast<uint8_t>(unframed.size())); + // data frame + framed.push_back(0); + // no flags + framed.push_back(0); + // stream id + framed.push_back(0); + framed.push_back(0); + framed.push_back(0); + framed.push_back(1); + while (!unframed.empty()) { + framed.push_back(unframed.front()); + unframed.pop(); + } + + return grpc_slice_from_copied_buffer(framed.data(), framed.size()); +} + +static void BM_TransportStreamRecv(benchmark::State &state) { + TrackCounters track_counters; + Fixture f(grpc::ChannelArguments(), true); + Stream s(&f); + s.Init(state); + grpc_transport_stream_op op; + grpc_byte_stream *recv_stream; + grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384); + + grpc_metadata_batch b; + grpc_metadata_batch_init(&b); + grpc_metadata_batch b_recv; + grpc_metadata_batch_init(&b_recv); + b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + std::vector<grpc_mdelem> elems = + RepresentativeClientInitialMetadata::GetElems(f.exec_ctx()); + std::vector<grpc_linked_mdelem> storage(elems.size()); + for (size_t i = 0; i < elems.size(); i++) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); + } + + std::unique_ptr<Closure> do_nothing = + MakeClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}); + + uint32_t received; + + std::unique_ptr<Closure> drain_start; + std::unique_ptr<Closure> drain; + std::unique_ptr<Closure> drain_continue; + grpc_slice recv_slice; + + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->incoming_window_delta = 1024 * 1024 * 1024; + f.chttp2_transport()->incoming_window = 1024 * 1024 * 1024; + received = 0; + memset(&op, 0, sizeof(op)); + op.on_complete = do_nothing.get(); + op.recv_message = &recv_stream; + op.recv_message_ready = drain_start.get(); + s.Op(&op); + f.PushInput(grpc_slice_ref(incoming_data)); + }); + + drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + if (recv_stream == NULL) { + GPR_ASSERT(!state.KeepRunning()); + return; + } + grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); + }); + + drain = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + do { + if (received == recv_stream->length) { + grpc_byte_stream_destroy(exec_ctx, recv_stream); + grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE); + return; + } + } while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice, + recv_stream->length - received, + drain_continue.get())); + }); + + drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { + received += GRPC_SLICE_LENGTH(recv_slice); + grpc_slice_unref_internal(exec_ctx, recv_slice); + grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); + }); + + memset(&op, 0, sizeof(op)); + op.send_initial_metadata = &b; + op.recv_initial_metadata = &b_recv; + op.on_complete = c.get(); + s.Op(&op); + f.PushInput(SLICE_FROM_BUFFER( + "\x00\x00\x00\x04\x00\x00\x00\x00\x00" + // Generated using: + // tools/codegen/core/gen_header_frame.py < + // test/cpp/microbenchmarks/representative_server_initial_metadata.headers + "\x00\x00X\x01\x04\x00\x00\x00\x01" + "\x10\x07:status\x03" + "200" + "\x10\x0c" + "content-type\x10" + "application/grpc" + "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip")); + + f.FlushExecCtx(); + memset(&op, 0, sizeof(op)); + op.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(&op); + s.DestroyThen( + MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + f.FlushExecCtx(); + track_counters.Finish(state); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + grpc_metadata_batch_destroy(f.exec_ctx(), &b_recv); + grpc_slice_unref(incoming_data); +} +BENCHMARK(BM_TransportStreamRecv)->Range(0, 128 * 1024 * 1024); + +BENCHMARK_MAIN(); diff --git a/test/cpp/microbenchmarks/bm_error.cc b/test/cpp/microbenchmarks/bm_error.cc index c4f6aa19d5..00e1a08cab 100644 --- a/test/cpp/microbenchmarks/bm_error.cc +++ b/test/cpp/microbenchmarks/bm_error.cc @@ -51,21 +51,30 @@ class ErrorDeleter { }; typedef std::unique_ptr<grpc_error, ErrorDeleter> ErrorPtr; -static void BM_ErrorCreate(benchmark::State& state) { +static void BM_ErrorCreateFromStatic(benchmark::State& state) { TrackCounters track_counters; while (state.KeepRunning()) { - GRPC_ERROR_UNREF(GRPC_ERROR_CREATE("Error")); + GRPC_ERROR_UNREF(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error")); } track_counters.Finish(state); } -BENCHMARK(BM_ErrorCreate); +BENCHMARK(BM_ErrorCreateFromStatic); + +static void BM_ErrorCreateFromCopied(benchmark::State& state) { + TrackCounters track_counters; + while (state.KeepRunning()) { + GRPC_ERROR_UNREF(GRPC_ERROR_CREATE_FROM_COPIED_STRING("Error not inline")); + } + track_counters.Finish(state); +} +BENCHMARK(BM_ErrorCreateFromCopied); static void BM_ErrorCreateAndSetStatus(benchmark::State& state) { TrackCounters track_counters; while (state.KeepRunning()) { - GRPC_ERROR_UNREF(grpc_error_set_int(GRPC_ERROR_CREATE("Error"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_ABORTED)); + GRPC_ERROR_UNREF( + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_ABORTED)); } track_counters.Finish(state); } @@ -75,9 +84,10 @@ static void BM_ErrorCreateAndSetIntAndStr(benchmark::State& state) { TrackCounters track_counters; while (state.KeepRunning()) { GRPC_ERROR_UNREF(grpc_error_set_str( - grpc_error_set_int(GRPC_ERROR_CREATE("GOAWAY received"), - GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)0), - GRPC_ERROR_STR_RAW_BYTES, "raw bytes")); + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"), + GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)0), + GRPC_ERROR_STR_RAW_BYTES, grpc_slice_from_static_string("raw bytes"))); } track_counters.Finish(state); } @@ -85,7 +95,7 @@ BENCHMARK(BM_ErrorCreateAndSetIntAndStr); static void BM_ErrorCreateAndSetIntLoop(benchmark::State& state) { TrackCounters track_counters; - grpc_error* error = GRPC_ERROR_CREATE("Error"); + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"); int n = 0; while (state.KeepRunning()) { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, n++); @@ -97,10 +107,11 @@ BENCHMARK(BM_ErrorCreateAndSetIntLoop); static void BM_ErrorCreateAndSetStrLoop(benchmark::State& state) { TrackCounters track_counters; - grpc_error* error = GRPC_ERROR_CREATE("Error"); + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"); const char* str = "hello"; while (state.KeepRunning()) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, str); + error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_from_static_string(str)); } GRPC_ERROR_UNREF(error); track_counters.Finish(state); @@ -109,7 +120,7 @@ BENCHMARK(BM_ErrorCreateAndSetStrLoop); static void BM_ErrorRefUnref(benchmark::State& state) { TrackCounters track_counters; - grpc_error* error = GRPC_ERROR_CREATE("Error"); + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"); while (state.KeepRunning()) { GRPC_ERROR_UNREF(GRPC_ERROR_REF(error)); } @@ -138,8 +149,8 @@ BENCHMARK(BM_ErrorGetIntFromNoError); static void BM_ErrorGetMissingInt(benchmark::State& state) { TrackCounters track_counters; - ErrorPtr error( - grpc_error_set_int(GRPC_ERROR_CREATE("Error"), GRPC_ERROR_INT_INDEX, 1)); + ErrorPtr error(grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_INDEX, 1)); while (state.KeepRunning()) { intptr_t value; grpc_error_get_int(error.get(), GRPC_ERROR_INT_OFFSET, &value); @@ -150,8 +161,8 @@ BENCHMARK(BM_ErrorGetMissingInt); static void BM_ErrorGetPresentInt(benchmark::State& state) { TrackCounters track_counters; - ErrorPtr error( - grpc_error_set_int(GRPC_ERROR_CREATE("Error"), GRPC_ERROR_INT_OFFSET, 1)); + ErrorPtr error(grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_OFFSET, 1)); while (state.KeepRunning()) { intptr_t value; grpc_error_get_int(error.get(), GRPC_ERROR_INT_OFFSET, &value); @@ -186,7 +197,7 @@ class SimpleError { private: const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); - ErrorPtr error_{GRPC_ERROR_CREATE("Error")}; + ErrorPtr error_{GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error")}; }; class ErrorWithGrpcStatus { @@ -196,9 +207,9 @@ class ErrorWithGrpcStatus { private: const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); - ErrorPtr error_{grpc_error_set_int(GRPC_ERROR_CREATE("Error"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNIMPLEMENTED)}; + ErrorPtr error_{grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNIMPLEMENTED)}; }; class ErrorWithHttpError { @@ -208,9 +219,9 @@ class ErrorWithHttpError { private: const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); - ErrorPtr error_{grpc_error_set_int(GRPC_ERROR_CREATE("Error"), - GRPC_ERROR_INT_HTTP2_ERROR, - GRPC_HTTP2_COMPRESSION_ERROR)}; + ErrorPtr error_{grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_HTTP2_ERROR, + GRPC_HTTP2_COMPRESSION_ERROR)}; }; class ErrorWithNestedGrpcStatus { @@ -220,11 +231,12 @@ class ErrorWithNestedGrpcStatus { private: const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); - ErrorPtr nested_error_{grpc_error_set_int(GRPC_ERROR_CREATE("Error"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNIMPLEMENTED)}; + ErrorPtr nested_error_{grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error"), GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNIMPLEMENTED)}; grpc_error* nested_errors_[1] = {nested_error_.get()}; - ErrorPtr error_{GRPC_ERROR_CREATE_REFERENCING("Error", nested_errors_, 1)}; + ErrorPtr error_{GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Error", nested_errors_, 1)}; }; template <class Fixture> @@ -253,8 +265,8 @@ static void BM_ErrorGetStatus(benchmark::State& state) { Fixture fixture; while (state.KeepRunning()) { grpc_status_code status; - const char* msg; - grpc_error_get_status(fixture.error(), fixture.deadline(), &status, &msg, + grpc_slice slice; + grpc_error_get_status(fixture.error(), fixture.deadline(), &status, &slice, NULL); } track_counters.Finish(state); diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc index dc0e7d769a..00e37f7912 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc @@ -54,86 +54,141 @@ auto& force_library_initialization = Library::get(); static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } -template <class Fixture> -static void BM_PumpStreamClientToServer(benchmark::State& state) { +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPong(benchmark::State& state) { + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + EchoTestService::AsyncService service; std::unique_ptr<Fixture> fixture(new Fixture(&service)); { + EchoResponse send_response; + EchoResponse recv_response; EchoRequest send_request; EchoRequest recv_request; - if (state.range(0) > 0) { - send_request.set_message(std::string(state.range(0), 'a')); + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); } - Status recv_status; - ServerContext svr_ctx; - ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); - service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), - fixture->cq(), tag(0)); + std::unique_ptr<EchoTestService::Stub> stub( EchoTestService::NewStub(fixture->channel())); - ClientContext cli_ctx; - auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); - void* t; - bool ok; - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - GPR_ASSERT(ok); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); - } - response_rw.Read(&recv_request, tag(0)); + while (state.KeepRunning()) { - GPR_TIMER_SCOPE("BenchmarkCycle", 0); - request_rw->Write(send_request, tag(1)); - while (true) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + // Establish async stream between client side and server side + void* t; + bool ok; + int need_tags = (1 << 0) | (1 << 1); + while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - response_rw.Read(&recv_request, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); } + + ping_pong_cnt++; } - } - request_rw->WritesDone(tag(1)); - need_tags = (1 << 0) | (1 << 1); - while (need_tags) { - GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - int i = (int)(intptr_t)t; - GPR_ASSERT(need_tags & (1 << i)); - need_tags &= ~(1 << i); + + request_rw->WritesDone(tag(0)); + response_rw.Finish(Status::OK, tag(1)); + + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); } } + fixture->Finish(state); fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); } -template <class Fixture> -static void BM_PumpStreamServerToClient(benchmark::State& state) { +// Repeatedly sends ping pong messages in a single streaming Bidi call in a loop +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongMsgs(benchmark::State& state) { + const int msg_size = state.range(0); + EchoTestService::AsyncService service; std::unique_ptr<Fixture> fixture(new Fixture(&service)); { EchoResponse send_response; EchoResponse recv_response; - if (state.range(0) > 0) { - send_response.set_message(std::string(state.range(0), 'a')); + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); } - Status recv_status; + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), fixture->cq(), tag(0)); - std::unique_ptr<EchoTestService::Stub> stub( - EchoTestService::NewStub(fixture->channel())); + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); - int need_tags = (1 << 0) | (1 << 1); + + // Establish async stream between client side and server side void* t; bool ok; + int need_tags = (1 << 0) | (1 << 1); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); GPR_ASSERT(ok); @@ -141,54 +196,78 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) { GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } - request_rw->Read(&recv_response, tag(0)); + while (state.KeepRunning()) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); - response_rw.Write(send_response, tag(1)); - while (true) { + request_rw->Write(send_request, tag(0)); // Start client send + response_rw.Read(&recv_request, tag(1)); // Start server recv + request_rw->Read(&recv_response, tag(2)); // Start client recv + + need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3); + while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); - if (t == tag(0)) { - request_rw->Read(&recv_response, tag(0)); - } else if (t == tag(1)) { - break; - } else { - GPR_ASSERT(false); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 1) { + response_rw.Write(send_response, tag(3)); } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); } } + + request_rw->WritesDone(tag(0)); response_rw.Finish(Status::OK, tag(1)); - need_tags = (1 << 0) | (1 << 1); + Status recv_status; + request_rw->Finish(&recv_status, tag(2)); + + need_tags = (1 << 0) | (1 << 1) | (1 << 2); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); int i = (int)(intptr_t)t; GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } + + GPR_ASSERT(recv_status.ok()); } + fixture->Finish(state); fixture.reset(); - state.SetBytesProcessed(state.range(0) * state.iterations()); + state.SetBytesProcessed(msg_size * state.iterations() * 2); } /******************************************************************************* * CONFIGURATIONS */ -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, SockPair) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, InProcessCHTTP2) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, TCP) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, UDS) - ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair) +// Generate Args for StreamingPingPong benchmarks. Currently generates args for +// only "small streams" (i.e streams with 0, 1 or 2 messages) +static void StreamingPingPongArgs(benchmark::internal::Benchmark* b) { + int msg_size = 0; + + b->Args({0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here) + + for (msg_size = 0; msg_size <= 128 * 1024 * 1024; + msg_size == 0 ? msg_size++ : msg_size *= 8) { + b->Args({msg_size, 1}); + b->Args({msg_size, 2}); + } +} + +BENCHMARK_TEMPLATE(BM_StreamingPingPong, InProcessCHTTP2, NoOpMutator, + NoOpMutator) + ->Apply(StreamingPingPongArgs); +BENCHMARK_TEMPLATE(BM_StreamingPingPong, TCP, NoOpMutator, NoOpMutator) + ->Apply(StreamingPingPongArgs); + +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, InProcessCHTTP2, NoOpMutator, + NoOpMutator) ->Range(0, 128 * 1024 * 1024); -BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2) +BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator) ->Range(0, 128 * 1024 * 1024); } // namespace testing diff --git a/test/cpp/microbenchmarks/bm_metadata.cc b/test/cpp/microbenchmarks/bm_metadata.cc index ee3dec2bce..34874b57f5 100644 --- a/test/cpp/microbenchmarks/bm_metadata.cc +++ b/test/cpp/microbenchmarks/bm_metadata.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2017, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc new file mode 100644 index 0000000000..0f3d3cef66 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_pollset.cc @@ -0,0 +1,254 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Test out pollset latencies */ + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +extern "C" { +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/port.h" +#include "src/core/lib/iomgr/wakeup_fd_posix.h" +} + +#include "test/cpp/microbenchmarks/helpers.h" +#include "third_party/benchmark/include/benchmark/benchmark.h" + +#include <string.h> + +#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL +#include <sys/epoll.h> +#include <sys/eventfd.h> +#include <unistd.h> +#endif + +auto& force_library_initialization = Library::get(); + +static void shutdown_ps(grpc_exec_ctx* exec_ctx, void* ps, grpc_error* error) { + grpc_pollset_destroy(static_cast<grpc_pollset*>(ps)); +} + +static void BM_CreateDestroyPollset(benchmark::State& state) { + TrackCounters track_counters; + size_t ps_sz = grpc_pollset_size(); + grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_malloc(ps_sz)); + gpr_mu* mu; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_closure shutdown_ps_closure; + grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps, + grpc_schedule_on_exec_ctx); + while (state.KeepRunning()) { + memset(ps, 0, ps_sz); + grpc_pollset_init(ps, &mu); + gpr_mu_lock(mu); + grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure); + gpr_mu_unlock(mu); + grpc_exec_ctx_flush(&exec_ctx); + } + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(ps); + track_counters.Finish(state); +} +BENCHMARK(BM_CreateDestroyPollset); + +#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL +static void BM_PollEmptyPollset_SpeedOfLight(benchmark::State& state) { + // equivalent to BM_PollEmptyPollset, but just use the OS primitives to guage + // what the speed of light would be if we abstracted perfectly + TrackCounters track_counters; + int epfd = epoll_create1(0); + GPR_ASSERT(epfd != -1); + size_t nev = state.range(0); + size_t nfd = state.range(1); + epoll_event* ev = new epoll_event[nev]; + std::vector<int> fds; + for (size_t i = 0; i < nfd; i++) { + fds.push_back(eventfd(0, 0)); + epoll_event ev; + ev.events = EPOLLIN; + epoll_ctl(epfd, EPOLL_CTL_ADD, fds.back(), &ev); + } + while (state.KeepRunning()) { + epoll_wait(epfd, ev, nev, 0); + } + for (auto fd : fds) { + close(fd); + } + close(epfd); + delete[] ev; + track_counters.Finish(state); +} +BENCHMARK(BM_PollEmptyPollset_SpeedOfLight) + ->Args({1, 0}) + ->Args({1, 1}) + ->Args({1, 10}) + ->Args({1, 100}) + ->Args({1, 1000}) + ->Args({1, 10000}) + ->Args({1, 100000}) + ->Args({10, 1}) + ->Args({100, 1}) + ->Args({1000, 1}); +#endif + +static void BM_PollEmptyPollset(benchmark::State& state) { + TrackCounters track_counters; + size_t ps_sz = grpc_pollset_size(); + grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz)); + gpr_mu* mu; + grpc_pollset_init(ps, &mu); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_timespec now = gpr_time_0(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + gpr_mu_lock(mu); + while (state.KeepRunning()) { + grpc_pollset_worker* worker; + GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline)); + } + grpc_closure shutdown_ps_closure; + grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure); + gpr_mu_unlock(mu); + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(ps); + track_counters.Finish(state); +} +BENCHMARK(BM_PollEmptyPollset); + +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template <class F> +Closure* MakeClosure(F f, grpc_closure_scheduler* scheduler) { + struct C : public Closure { + C(F f, grpc_closure_scheduler* scheduler) : f_(f) { + grpc_closure_init(this, C::cbfn, this, scheduler); + } + static void cbfn(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + C* p = static_cast<C*>(arg); + p->f_(); + } + F f_; + }; + return new C(f, scheduler); +} + +#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL +static void BM_SingleThreadPollOneFd_SpeedOfLight(benchmark::State& state) { + // equivalent to BM_PollEmptyPollset, but just use the OS primitives to guage + // what the speed of light would be if we abstracted perfectly + TrackCounters track_counters; + int epfd = epoll_create1(0); + GPR_ASSERT(epfd != -1); + epoll_event ev[100]; + int fd = eventfd(0, EFD_NONBLOCK); + ev[0].events = EPOLLIN; + epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev[0]); + while (state.KeepRunning()) { + int err; + do { + err = eventfd_write(fd, 1); + } while (err < 0 && errno == EINTR); + GPR_ASSERT(err == 0); + do { + err = epoll_wait(epfd, ev, GPR_ARRAY_SIZE(ev), 0); + } while (err < 0 && errno == EINTR); + GPR_ASSERT(err == 1); + eventfd_t value; + do { + err = eventfd_read(fd, &value); + } while (err < 0 && errno == EINTR); + GPR_ASSERT(err == 0); + } + close(fd); + close(epfd); + track_counters.Finish(state); +} +BENCHMARK(BM_SingleThreadPollOneFd_SpeedOfLight); +#endif + +static void BM_SingleThreadPollOneFd(benchmark::State& state) { + TrackCounters track_counters; + size_t ps_sz = grpc_pollset_size(); + grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz)); + gpr_mu* mu; + grpc_pollset_init(ps, &mu); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_timespec now = gpr_time_0(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + grpc_wakeup_fd wakeup_fd; + GRPC_ERROR_UNREF(grpc_wakeup_fd_init(&wakeup_fd)); + grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read"); + grpc_pollset_add_fd(&exec_ctx, ps, wakeup); + bool done = false; + Closure* continue_closure = MakeClosure( + [&]() { + GRPC_ERROR_UNREF(grpc_wakeup_fd_consume_wakeup(&wakeup_fd)); + if (!state.KeepRunning()) { + done = true; + return; + } + GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd)); + grpc_fd_notify_on_read(&exec_ctx, wakeup, continue_closure); + }, + grpc_schedule_on_exec_ctx); + GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd)); + grpc_fd_notify_on_read(&exec_ctx, wakeup, continue_closure); + gpr_mu_lock(mu); + while (!done) { + grpc_pollset_worker* worker; + GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline)); + } + grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done"); + wakeup_fd.read_fd = 0; + grpc_closure shutdown_ps_closure; + grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure); + gpr_mu_unlock(mu); + grpc_exec_ctx_finish(&exec_ctx); + grpc_wakeup_fd_destroy(&wakeup_fd); + gpr_free(ps); + track_counters.Finish(state); + delete continue_closure; +} +BENCHMARK(BM_SingleThreadPollOneFd); + +BENCHMARK_MAIN(); diff --git a/test/cpp/microbenchmarks/helpers.h b/test/cpp/microbenchmarks/helpers.h index 2829a46e5a..f44b7cf83a 100644 --- a/test/cpp/microbenchmarks/helpers.h +++ b/test/cpp/microbenchmarks/helpers.h @@ -55,7 +55,9 @@ class Library { private: Library() { +#ifdef GPR_LOW_LEVEL_COUNTERS grpc_memory_counters_init(); +#endif init_lib_.init(); rq_ = grpc_resource_quota_create("bm"); } diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index baa9304cc2..25a19a5a74 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -46,7 +46,7 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "src/proto/grpc/testing/payloads.grpc.pb.h" +#include "src/proto/grpc/testing/payloads.pb.h" #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/histogram.h" diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index e72d30a4ef..dd32a16c87 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -36,7 +36,7 @@ #include <memory> -#include "src/proto/grpc/testing/control.grpc.pb.h" +#include "src/proto/grpc/testing/control.pb.h" #include "test/cpp/qps/histogram.h" namespace grpc { diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h index acb415f0a1..470a394301 100644 --- a/test/cpp/qps/histogram.h +++ b/test/cpp/qps/histogram.h @@ -35,7 +35,7 @@ #define TEST_QPS_HISTOGRAM_H #include <grpc/support/histogram.h> -#include "src/proto/grpc/testing/stats.grpc.pb.h" +#include "src/proto/grpc/testing/stats.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index ddaaa7ca75..bd2c1f0ec6 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -31,6 +31,7 @@ * */ +#include <iostream> #include <memory> #include <set> diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 821d5935be..8fbf37a095 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -38,8 +38,8 @@ #include <grpc/support/cpu.h> #include <vector> -#include "src/proto/grpc/testing/control.grpc.pb.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" +#include "src/proto/grpc/testing/control.pb.h" +#include "src/proto/grpc/testing/messages.pb.h" #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/cpp/qps/usage_timer.h" diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index b3a06aeaf5..b58d91eea6 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -103,24 +103,25 @@ class AsyncQpsServerTest final : public grpc::testing::Server { server_ = builder.BuildAndStart(); - using namespace std::placeholders; - auto process_rpc_bound = - std::bind(process_rpc, config.payload_config(), _1, _2); + std::bind(process_rpc, config.payload_config(), std::placeholders::_1, + std::placeholders::_2); for (int i = 0; i < 15000; i++) { for (int j = 0; j < num_threads; j++) { if (request_unary_function) { - auto request_unary = - std::bind(request_unary_function, &async_service_, _1, _2, _3, - srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + auto request_unary = std::bind( + request_unary_function, &async_service_, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(), + srv_cqs_[j].get(), std::placeholders::_4); contexts_.emplace_back( new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); } if (request_streaming_function) { - auto request_streaming = - std::bind(request_streaming_function, &async_service_, _1, _2, - srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + auto request_streaming = std::bind( + request_streaming_function, &async_service_, + std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(), + srv_cqs_[j].get(), std::placeholders::_3); contexts_.emplace_back(new ServerRpcContextStreamingImpl( request_streaming, process_rpc_bound)); } |