diff options
author | 2017-09-11 12:11:40 -0700 | |
---|---|---|
committer | 2017-09-11 12:11:40 -0700 | |
commit | bbe282f600ae030659c97bc5fea0d8ff5107ac17 (patch) | |
tree | d9eca449f8ee5883b7a8f610362addd8d855f5c0 /test/cpp/microbenchmarks/bm_chttp2_transport.cc | |
parent | b61a26cff93b0ef97c98dad0c338d539ff44e7e4 (diff) | |
parent | 55c4b31389d5557b88d39bde6d783d68aa747de7 (diff) |
Merge github.com:grpc/grpc into grpc_millis
Diffstat (limited to 'test/cpp/microbenchmarks/bm_chttp2_transport.cc')
-rw-r--r-- | test/cpp/microbenchmarks/bm_chttp2_transport.cc | 166 |
1 files changed, 103 insertions, 63 deletions
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index a08837f0a1..6f9dee7822 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -29,6 +29,7 @@ extern "C" { #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" @@ -154,23 +155,59 @@ class Fixture { grpc_transport *t_; }; -static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template <class F> +std::unique_ptr<Closure> MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + GRPC_CLOSURE_INIT(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr<Closure>(new C(f, sched)); +} + +template <class F> +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + delete static_cast<C *>(arg); + } + }; + auto *c = new C{f}; + return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); +} class Stream { public: Stream(Fixture *f) : f_(f) { - GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); stream_size_ = grpc_transport_stream_size(f->transport()); stream_ = gpr_malloc(stream_size_); arena_ = gpr_arena_create(4096); } ~Stream() { + gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_free(stream_); gpr_arena_destroy(arena_); } void Init(benchmark::State &state) { + GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this, + "test_stream"); + gpr_event_init(&done_); memset(stream_, 0, stream_size_); if ((state.iterations() & 0xffff) == 0) { gpr_arena_destroy(arena_); @@ -181,13 +218,17 @@ class Stream { NULL, arena_); } - void DestroyThen(grpc_closure *closure) { - grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), - static_cast<grpc_stream *>(stream_), closure); + void DestroyThen(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { + destroy_closure_ = closure; +#ifndef NDEBUG + grpc_stream_unref(exec_ctx, &refcount_, "DestroyThen"); +#else + grpc_stream_unref(exec_ctx, &refcount_); +#endif } - void Op(grpc_transport_stream_op_batch *op) { - grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(), + void Op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op) { + grpc_transport_perform_stream_op(exec_ctx, f_->transport(), static_cast<grpc_stream *>(stream_), op); } @@ -196,48 +237,24 @@ class Stream { } private: + static void FinishDestroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + auto stream = static_cast<Stream *>(arg); + grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(), + static_cast<grpc_stream *>(stream->stream_), + stream->destroy_closure_); + gpr_event_set(&stream->done_, (void *)1); + } + Fixture *f_; grpc_stream_refcount refcount_; gpr_arena *arena_; size_t stream_size_; void *stream_; + grpc_closure *destroy_closure_ = nullptr; + gpr_event done_; }; -class Closure : public grpc_closure { - public: - virtual ~Closure() {} -}; - -template <class F> -std::unique_ptr<Closure> MakeClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public Closure { - C(const F &f, grpc_closure_scheduler *sched) : f_(f) { - GRPC_CLOSURE_INIT(this, Execute, this, sched); - } - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast<C *>(arg)->f_(exec_ctx, error); - } - }; - return std::unique_ptr<Closure>(new C(f, sched)); -} - -template <class F> -grpc_closure *MakeOnceClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public grpc_closure { - C(const F &f) : f_(f) {} - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast<C *>(arg)->f_(exec_ctx, error); - delete static_cast<C *>(arg); - } - }; - auto *c = new C{f}; - return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); -} - //////////////////////////////////////////////////////////////////////////////// // Benchmarks // @@ -246,11 +263,18 @@ static void BM_StreamCreateDestroy(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + memset(&op, 0, sizeof(op)); + op.cancel_stream = true; + op.payload = &op_payload; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; std::unique_ptr<Closure> next = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; s.Init(state); - s.DestroyThen(next.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, next.get()); }); GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -314,14 +338,14 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { op.on_complete = done.get(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; - s.Op(&op); + s.Op(exec_ctx, &op); }); done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen(start.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, start.get()); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -348,22 +372,28 @@ static void BM_TransportEmptyOp(benchmark::State &state) { if (!state.KeepRunning()) return; reset_op(); op.on_complete = c.get(); - s.Op(&op); + s.Op(exec_ctx, &op); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + reset_op(); + op.cancel_stream = true; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); } BENCHMARK(BM_TransportEmptyOp); +std::vector<std::unique_ptr<gpr_event>> done_events; + static void BM_TransportStreamSend(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); - Stream s(&f); - s.Init(state); + auto s = std::unique_ptr<Stream>(new Stream(&f)); + s->Init(state); grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload op_payload; memset(&op_payload, 0, sizeof(op_payload)); @@ -390,11 +420,17 @@ static void BM_TransportStreamSend(benchmark::State &state) { grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); } + gpr_event *bm_done = new gpr_event; + gpr_event_init(bm_done); + std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { - if (!state.KeepRunning()) return; + if (!state.KeepRunning()) { + gpr_event_set(bm_done, (void *)1); + return; + } // force outgoing window to be yuge - s.chttp2_stream()->flow_control.remote_window_delta = + s->chttp2_stream()->flow_control.remote_window_delta = 1024 * 1024 * 1024; f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024; grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); @@ -402,23 +438,27 @@ static void BM_TransportStreamSend(benchmark::State &state) { op.on_complete = c.get(); op.send_message = true; op.payload->send_message.send_message = &send_stream.base; - s.Op(&op); + s->Op(exec_ctx, &op); }); reset_op(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; op.on_complete = c.get(); - s.Op(&op); + s->Op(f.exec_ctx(), &op); f.FlushExecCtx(); + gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME)); + done_events.emplace_back(bm_done); + reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s->Op(f.exec_ctx(), &op); + s->DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); + s.reset(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); grpc_slice_buffer_destroy(&send_buffer); @@ -535,7 +575,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.recv_message = true; op.payload->recv_message.recv_message = &recv_stream; op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(&op); + s.Op(exec_ctx, &op); f.PushInput(grpc_slice_ref(incoming_data)); }); @@ -578,7 +618,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.payload->recv_initial_metadata.recv_initial_metadata_ready = do_nothing.get(); op.on_complete = c.get(); - s.Op(&op); + s.Op(f.exec_ctx(), &op); f.PushInput(SLICE_FROM_BUFFER( "\x00\x00\x00\x04\x00\x00\x00\x00\x00" // Generated using: @@ -596,9 +636,9 @@ static void BM_TransportStreamRecv(benchmark::State &state) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); |