aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-06-19 11:21:02 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-06-19 11:21:02 -0700
commite7183b77a69ba6f28ed1e9aad2639a7bcaaa66a2 (patch)
tree69656ab72706a1348b64296a1efce14369ae74c5 /test/cpp
parent6a23105aa93454b3fdddff092dfe26c96eda0d6e (diff)
parent070a8eeb281a2659501a60b1bbc86798fcb652c4 (diff)
Merge github.com:grpc/grpc into cq-drop
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/async_end2end_test.cc2
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc14
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc24
-rw-r--r--test/cpp/microbenchmarks/bm_closure.cc92
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc2
-rw-r--r--test/cpp/microbenchmarks/bm_pollset.cc10
-rw-r--r--test/cpp/qps/client_async.cc47
-rw-r--r--test/cpp/qps/qps_json_driver.cc10
-rw-r--r--test/cpp/qps/server_async.cc18
9 files changed, 137 insertions, 82 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 4d61fc620c..a2a6e36709 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -196,7 +196,7 @@ bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
// This class disables the server builder plugins that may add sync services to
// the server. If there are sync services, UnimplementedRpc test will triger
-// the sync unkown rpc routine on the server side, rather than the async one
+// the sync unknown rpc routine on the server side, rather than the async one
// that needs to be tested here.
class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
public:
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 0838680f27..508f7f94d6 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -445,7 +445,7 @@ void SetPollsetSet(grpc_exec_ctx *exec_ctx, grpc_transport *self,
/* implementation of grpc_transport_perform_stream_op */
void PerformStreamOp(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_transport_stream_op_batch *op) {
- grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
}
/* implementation of grpc_transport_perform_op */
@@ -492,7 +492,7 @@ class SendEmptyMetadata {
public:
SendEmptyMetadata() {
memset(&op_, 0, sizeof(op_));
- op_.on_complete = grpc_closure_init(&closure_, DoNothing, nullptr,
+ op_.on_complete = GRPC_CLOSURE_INIT(&closure_, DoNothing, nullptr,
grpc_schedule_on_exec_ctx);
op_.send_initial_metadata = true;
op_.payload = &op_payload_;
@@ -643,16 +643,16 @@ static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
if (op->recv_initial_metadata) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
if (op->recv_message) {
- grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
}
- grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
}
static void StartTransportOp(grpc_exec_ctx *exec_ctx,
@@ -661,7 +661,7 @@ static void StartTransportOp(grpc_exec_ctx *exec_ctx,
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
- grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
static grpc_error *InitCallElem(grpc_exec_ctx *exec_ctx,
@@ -677,7 +677,7 @@ static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx,
static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *then_sched_closure) {
- grpc_closure_sched(exec_ctx, then_sched_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, then_sched_closure, GRPC_ERROR_NONE);
}
grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 42843632c5..567ef1cf24 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -61,7 +61,7 @@ class DummyEndpoint : public grpc_endpoint {
return;
}
grpc_slice_buffer_add(slices_, slice);
- grpc_closure_sched(exec_ctx, read_cb_, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, read_cb_, GRPC_ERROR_NONE);
read_cb_ = nullptr;
}
@@ -78,7 +78,7 @@ class DummyEndpoint : public grpc_endpoint {
if (have_slice_) {
have_slice_ = false;
grpc_slice_buffer_add(slices, buffered_slice_);
- grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE);
return;
}
read_cb_ = cb;
@@ -92,7 +92,7 @@ class DummyEndpoint : public grpc_endpoint {
static void write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_slice_buffer *slices, grpc_closure *cb) {
- grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE);
}
static grpc_workqueue *get_workqueue(grpc_endpoint *ep) { return NULL; }
@@ -107,7 +107,7 @@ class DummyEndpoint : public grpc_endpoint {
grpc_error *why) {
grpc_resource_user_shutdown(exec_ctx,
static_cast<DummyEndpoint *>(ep)->ru_);
- grpc_closure_sched(exec_ctx, static_cast<DummyEndpoint *>(ep)->read_cb_,
+ GRPC_CLOSURE_SCHED(exec_ctx, static_cast<DummyEndpoint *>(ep)->read_cb_,
why);
}
@@ -213,7 +213,7 @@ std::unique_ptr<Closure> MakeClosure(
F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
struct C : public Closure {
C(const F &f, grpc_closure_scheduler *sched) : f_(f) {
- grpc_closure_init(this, Execute, this, sched);
+ GRPC_CLOSURE_INIT(this, Execute, this, sched);
}
F f_;
static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@@ -235,7 +235,7 @@ grpc_closure *MakeOnceClosure(
}
};
auto *c = new C{f};
- return grpc_closure_init(c, C::Execute, c, sched);
+ return GRPC_CLOSURE_INIT(c, C::Execute, c, sched);
}
////////////////////////////////////////////////////////////////////////////////
@@ -252,7 +252,7 @@ static void BM_StreamCreateDestroy(benchmark::State &state) {
s.Init(state);
s.DestroyThen(next.get());
});
- grpc_closure_run(f.exec_ctx(), next.get(), GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
track_counters.Finish(state);
}
@@ -322,7 +322,7 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) {
s.Op(&op);
s.DestroyThen(start.get());
});
- grpc_closure_sched(f.exec_ctx(), start.get(), GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
grpc_metadata_batch_destroy(f.exec_ctx(), &b);
track_counters.Finish(state);
@@ -348,7 +348,7 @@ static void BM_TransportEmptyOp(benchmark::State &state) {
op.on_complete = c.get();
s.Op(&op);
});
- grpc_closure_sched(f.exec_ctx(), c.get(), GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
s.DestroyThen(
MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
@@ -538,14 +538,14 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
GPR_ASSERT(!state.KeepRunning());
return;
}
- grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(exec_ctx, drain.get(), GRPC_ERROR_NONE);
});
drain = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
do {
if (received == recv_stream->length) {
grpc_byte_stream_destroy(exec_ctx, recv_stream);
- grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, c.get(), GRPC_ERROR_NONE);
return;
}
} while (grpc_byte_stream_next(exec_ctx, recv_stream,
@@ -561,7 +561,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(exec_ctx, recv_slice);
- grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(exec_ctx, drain.get(), GRPC_ERROR_NONE);
});
reset_op();
diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc
index e6cc21ecaf..41649b8a73 100644
--- a/test/cpp/microbenchmarks/bm_closure.cc
+++ b/test/cpp/microbenchmarks/bm_closure.cc
@@ -61,7 +61,7 @@ static void BM_ClosureInitAgainstExecCtx(benchmark::State& state) {
grpc_closure c;
while (state.KeepRunning()) {
benchmark::DoNotOptimize(
- grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx));
+ GRPC_CLOSURE_INIT(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx));
}
track_counters.Finish(state);
}
@@ -73,7 +73,7 @@ static void BM_ClosureInitAgainstCombiner(benchmark::State& state) {
grpc_closure c;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- benchmark::DoNotOptimize(grpc_closure_init(
+ benchmark::DoNotOptimize(GRPC_CLOSURE_INIT(
&c, DoNothing, NULL, grpc_combiner_scheduler(combiner)));
}
GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
@@ -85,10 +85,10 @@ BENCHMARK(BM_ClosureInitAgainstCombiner);
static void BM_ClosureRunOnExecCtx(benchmark::State& state) {
TrackCounters track_counters;
grpc_closure c;
- grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_run(&exec_ctx, &c, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(&exec_ctx, &c, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -100,7 +100,7 @@ static void BM_ClosureCreateAndRun(benchmark::State& state) {
TrackCounters track_counters;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_run(&exec_ctx, grpc_closure_create(DoNothing, NULL,
+ GRPC_CLOSURE_RUN(&exec_ctx, GRPC_CLOSURE_CREATE(DoNothing, NULL,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
@@ -114,7 +114,7 @@ static void BM_ClosureInitAndRun(benchmark::State& state) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure c;
while (state.KeepRunning()) {
- grpc_closure_run(&exec_ctx, grpc_closure_init(&c, DoNothing, NULL,
+ GRPC_CLOSURE_RUN(&exec_ctx, GRPC_CLOSURE_INIT(&c, DoNothing, NULL,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
@@ -126,10 +126,10 @@ BENCHMARK(BM_ClosureInitAndRun);
static void BM_ClosureSchedOnExecCtx(benchmark::State& state) {
TrackCounters track_counters;
grpc_closure c;
- grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_sched(&exec_ctx, &c, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -141,12 +141,12 @@ static void BM_ClosureSched2OnExecCtx(benchmark::State& state) {
TrackCounters track_counters;
grpc_closure c1;
grpc_closure c2;
- grpc_closure_init(&c1, DoNothing, NULL, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&c2, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&c1, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&c2, DoNothing, NULL, grpc_schedule_on_exec_ctx);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -159,14 +159,14 @@ static void BM_ClosureSched3OnExecCtx(benchmark::State& state) {
grpc_closure c1;
grpc_closure c2;
grpc_closure c3;
- grpc_closure_init(&c1, DoNothing, NULL, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&c2, DoNothing, NULL, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&c3, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&c1, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&c2, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&c3, DoNothing, NULL, grpc_schedule_on_exec_ctx);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c3, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c3, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -246,10 +246,10 @@ static void BM_ClosureSchedOnCombiner(benchmark::State& state) {
TrackCounters track_counters;
grpc_combiner* combiner = grpc_combiner_create();
grpc_closure c;
- grpc_closure_init(&c, DoNothing, NULL, grpc_combiner_scheduler(combiner));
+ GRPC_CLOSURE_INIT(&c, DoNothing, NULL, grpc_combiner_scheduler(combiner));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_sched(&exec_ctx, &c, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
@@ -263,12 +263,12 @@ static void BM_ClosureSched2OnCombiner(benchmark::State& state) {
grpc_combiner* combiner = grpc_combiner_create();
grpc_closure c1;
grpc_closure c2;
- grpc_closure_init(&c1, DoNothing, NULL, grpc_combiner_scheduler(combiner));
- grpc_closure_init(&c2, DoNothing, NULL, grpc_combiner_scheduler(combiner));
+ GRPC_CLOSURE_INIT(&c1, DoNothing, NULL, grpc_combiner_scheduler(combiner));
+ GRPC_CLOSURE_INIT(&c2, DoNothing, NULL, grpc_combiner_scheduler(combiner));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
@@ -283,14 +283,14 @@ static void BM_ClosureSched3OnCombiner(benchmark::State& state) {
grpc_closure c1;
grpc_closure c2;
grpc_closure c3;
- grpc_closure_init(&c1, DoNothing, NULL, grpc_combiner_scheduler(combiner));
- grpc_closure_init(&c2, DoNothing, NULL, grpc_combiner_scheduler(combiner));
- grpc_closure_init(&c3, DoNothing, NULL, grpc_combiner_scheduler(combiner));
+ GRPC_CLOSURE_INIT(&c1, DoNothing, NULL, grpc_combiner_scheduler(combiner));
+ GRPC_CLOSURE_INIT(&c2, DoNothing, NULL, grpc_combiner_scheduler(combiner));
+ GRPC_CLOSURE_INIT(&c3, DoNothing, NULL, grpc_combiner_scheduler(combiner));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c3, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c3, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
@@ -305,12 +305,12 @@ static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) {
grpc_combiner* combiner2 = grpc_combiner_create();
grpc_closure c1;
grpc_closure c2;
- grpc_closure_init(&c1, DoNothing, NULL, grpc_combiner_scheduler(combiner1));
- grpc_closure_init(&c2, DoNothing, NULL, grpc_combiner_scheduler(combiner2));
+ GRPC_CLOSURE_INIT(&c1, DoNothing, NULL, grpc_combiner_scheduler(combiner1));
+ GRPC_CLOSURE_INIT(&c2, DoNothing, NULL, grpc_combiner_scheduler(combiner2));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
GRPC_COMBINER_UNREF(&exec_ctx, combiner1, "finished");
@@ -328,16 +328,16 @@ static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) {
grpc_closure c2;
grpc_closure c3;
grpc_closure c4;
- grpc_closure_init(&c1, DoNothing, NULL, grpc_combiner_scheduler(combiner1));
- grpc_closure_init(&c2, DoNothing, NULL, grpc_combiner_scheduler(combiner2));
- grpc_closure_init(&c3, DoNothing, NULL, grpc_combiner_scheduler(combiner1));
- grpc_closure_init(&c4, DoNothing, NULL, grpc_combiner_scheduler(combiner2));
+ GRPC_CLOSURE_INIT(&c1, DoNothing, NULL, grpc_combiner_scheduler(combiner1));
+ GRPC_CLOSURE_INIT(&c2, DoNothing, NULL, grpc_combiner_scheduler(combiner2));
+ GRPC_CLOSURE_INIT(&c3, DoNothing, NULL, grpc_combiner_scheduler(combiner1));
+ GRPC_CLOSURE_INIT(&c4, DoNothing, NULL, grpc_combiner_scheduler(combiner2));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (state.KeepRunning()) {
- grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c3, GRPC_ERROR_NONE);
- grpc_closure_sched(&exec_ctx, &c4, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c3, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &c4, GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
GRPC_COMBINER_UNREF(&exec_ctx, combiner1, "finished");
@@ -353,16 +353,16 @@ class Rescheduler {
public:
Rescheduler(benchmark::State& state, grpc_closure_scheduler* scheduler)
: state_(state) {
- grpc_closure_init(&closure_, Step, this, scheduler);
+ GRPC_CLOSURE_INIT(&closure_, Step, this, scheduler);
}
void ScheduleFirst(grpc_exec_ctx* exec_ctx) {
- grpc_closure_sched(exec_ctx, &closure_, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &closure_, GRPC_ERROR_NONE);
}
void ScheduleFirstAgainstDifferentScheduler(
grpc_exec_ctx* exec_ctx, grpc_closure_scheduler* scheduler) {
- grpc_closure_sched(exec_ctx, grpc_closure_create(Step, this, scheduler),
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(Step, this, scheduler),
GRPC_ERROR_NONE);
}
@@ -373,7 +373,7 @@ class Rescheduler {
static void Step(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
Rescheduler* self = static_cast<Rescheduler*>(arg);
if (self->state_.KeepRunning()) {
- grpc_closure_sched(exec_ctx, &self->closure_, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &self->closure_, GRPC_ERROR_NONE);
}
}
};
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
index 3c41f18266..1e3830a556 100644
--- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -44,7 +44,7 @@ static grpc_event_engine_vtable g_vtable;
static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
grpc_closure* closure) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
}
static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc
index 543b24b77a..683f4703c2 100644
--- a/test/cpp/microbenchmarks/bm_pollset.cc
+++ b/test/cpp/microbenchmarks/bm_pollset.cc
@@ -54,7 +54,7 @@ static void BM_CreateDestroyPollset(benchmark::State& state) {
gpr_mu* mu;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure shutdown_ps_closure;
- grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
+ GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx);
while (state.KeepRunning()) {
memset(ps, 0, ps_sz);
@@ -124,7 +124,7 @@ static void BM_PollEmptyPollset(benchmark::State& state) {
GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
grpc_closure shutdown_ps_closure;
- grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
+ GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure);
gpr_mu_unlock(mu);
@@ -151,7 +151,7 @@ static void BM_PollAddFd(benchmark::State& state) {
}
grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx");
grpc_closure shutdown_ps_closure;
- grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
+ GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx);
gpr_mu_lock(mu);
grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure);
@@ -171,7 +171,7 @@ template <class F>
Closure* MakeClosure(F f, grpc_closure_scheduler* scheduler) {
struct C : public Closure {
C(F f, grpc_closure_scheduler* scheduler) : f_(f) {
- grpc_closure_init(this, C::cbfn, this, scheduler);
+ GRPC_CLOSURE_INIT(this, C::cbfn, this, scheduler);
}
static void cbfn(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
C* p = static_cast<C*>(arg);
@@ -250,7 +250,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done");
wakeup_fd.read_fd = 0;
grpc_closure shutdown_ps_closure;
- grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
+ GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure);
gpr_mu_unlock(mu);
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 03e116e26c..f7dda0f758 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -55,6 +55,11 @@ class ClientRpcContext {
}
virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
+ void lock() { mu_.lock(); }
+ void unlock() { mu_.unlock(); }
+
+ private:
+ std::mutex mu_;
};
template <class RequestType, class ResponseType>
@@ -106,6 +111,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -163,8 +169,14 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
num_async_threads_(NumThreads(config)) {
SetupLoadTest(config, num_async_threads_);
- for (int i = 0; i < num_async_threads_; i++) {
+ int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
+ int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator
+ for (int i = 0; i < num_cqs; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
+ }
+
+ for (int i = 0; i < num_async_threads_; i++) {
+ cq_.emplace_back(i % cli_cqs_.size());
next_issuers_.emplace_back(NextIssuer(i));
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
@@ -231,20 +243,36 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
- if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
// Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
+ // We want to delete the context. However, it is possible that
+ // another thread that just initiated an action on this
+ // context still has its lock even though the action on the
+ // context has completed. To delay for that, just grab the
+ // lock for serialization. Take a new scope.
+ { std::lock_guard<ClientRpcContext> lctx(*ctx); }
delete ctx;
return true;
- } else if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[thread_idx].get());
- // delete the old version
+ }
+ bool del = false;
+
+ // Create a new scope for a lock_guard'ed region
+ {
+ std::lock_guard<ClientRpcContext> lctx(*ctx);
+ if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ // set the old version to delete
+ del = true;
+ }
+ }
+ if (del) {
delete ctx;
}
return true;
@@ -255,6 +283,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
+ std::vector<int> cq_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
@@ -377,6 +406,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingPingPongImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_);
}
@@ -515,6 +545,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromClientImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -632,6 +663,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromServerImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq);
}
@@ -774,6 +806,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextGenericStreamingImpl(
stub_, req_, next_issue_, start_req_, callback_);
+ std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_);
}
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index a946992100..590c22ec29 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -16,6 +16,7 @@
*
*/
+#include <fstream>
#include <iostream>
#include <memory>
#include <set>
@@ -57,6 +58,8 @@ DEFINE_string(qps_server_target_override, "",
"Override QPS server target to configure in client configs."
"Only applicable if there is a single benchmark server.");
+DEFINE_string(json_file_out, "", "File to write the JSON output to.");
+
namespace grpc {
namespace testing {
@@ -88,6 +91,13 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
*success = result->server_success(i);
}
+ if (FLAGS_json_file_out != "") {
+ std::ofstream json_outfile;
+ json_outfile.open(FLAGS_json_file_out);
+ json_outfile << "{\"qps\": " << result->summary().qps() << "}\n";
+ json_outfile.close();
+ }
+
return result;
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 9cb281740f..96d7e5ef74 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -16,6 +16,7 @@
*
*/
+#include <algorithm>
#include <forward_list>
#include <functional>
#include <memory>
@@ -89,9 +90,14 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
}
- for (int i = 0; i < num_threads; i++) {
+ int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
+ int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
+ for (int i = 0; i < num_cqs; i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
+ for (int i = 0; i < num_threads; i++) {
+ cq_.emplace_back(i % srv_cqs_.size());
+ }
if (config.resource_quota_size() > 0) {
builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest")
@@ -105,7 +111,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::placeholders::_2);
for (int i = 0; i < 5000; i++) {
- for (int j = 0; j < num_threads; j++) {
+ for (int j = 0; j < num_cqs; j++) {
if (request_unary_function) {
auto request_unary = std::bind(
request_unary_function, &async_service_, std::placeholders::_1,
@@ -190,7 +196,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
@@ -199,6 +205,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
if (shutdown_state_[thread_idx]->shutdown) {
return;
}
+ std::lock_guard<ServerRpcContext> l2(*ctx);
const bool still_going = ctx->RunNextState(ok);
// if this RPC context is done, refresh it
if (!still_going) {
@@ -211,9 +218,13 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
class ServerRpcContext {
public:
ServerRpcContext() {}
+ void lock() { mu_.lock(); }
+ void unlock() { mu_.unlock(); }
virtual ~ServerRpcContext(){};
virtual bool RunNextState(bool) = 0; // next state, return false if done
virtual void Reset() = 0; // start this back at a clean state
+ private:
+ std::mutex mu_;
};
static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void *>(func);
@@ -503,6 +514,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
+ std::vector<int> cq_;
ServiceType async_service_;
std::vector<std::unique_ptr<ServerRpcContext>> contexts_;