/* * * Copyright 2016 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ /* Benchmark gRPC end2end in various configurations */ #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H #include #include #include "src/core/lib/profiling/timers.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" #include "test/cpp/microbenchmarks/fullstack_fixtures.h" namespace grpc { namespace testing { /******************************************************************************* * BENCHMARKING KERNELS */ static void* tag(intptr_t x) { return reinterpret_cast(x); } template static void BM_PumpStreamClientToServer(benchmark::State& state) { EchoTestService::AsyncService service; std::unique_ptr fixture(new Fixture(&service)); { EchoRequest send_request; EchoRequest recv_request; if (state.range(0) > 0) { send_request.set_message(std::string(state.range(0), 'a')); } Status recv_status; ServerContext svr_ctx; ServerAsyncReaderWriter response_rw(&svr_ctx); service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), fixture->cq(), tag(0)); std::unique_ptr stub( EchoTestService::NewStub(fixture->channel())); ClientContext cli_ctx; auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); int need_tags = (1 << 0) | (1 << 1); void* t; bool ok; while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); GPR_ASSERT(ok); int i = static_cast((intptr_t)t); GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } response_rw.Read(&recv_request, tag(0)); while (state.KeepRunning()) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); request_rw->Write(send_request, tag(1)); while (true) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); if (t == tag(0)) { response_rw.Read(&recv_request, tag(0)); } else if (t == tag(1)) { break; } else { GPR_ASSERT(false); } } } request_rw->WritesDone(tag(1)); need_tags = (1 << 0) | (1 << 1); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); int i = static_cast((intptr_t)t); GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } response_rw.Finish(Status::OK, tag(0)); Status final_status; request_rw->Finish(&final_status, tag(1)); need_tags = (1 << 0) | (1 << 1); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); int i = static_cast((intptr_t)t); GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } GPR_ASSERT(final_status.ok()); } fixture->Finish(state); fixture.reset(); state.SetBytesProcessed(state.range(0) * state.iterations()); } template static void BM_PumpStreamServerToClient(benchmark::State& state) { EchoTestService::AsyncService service; std::unique_ptr fixture(new Fixture(&service)); { EchoResponse send_response; EchoResponse recv_response; if (state.range(0) > 0) { send_response.set_message(std::string(state.range(0), 'a')); } Status recv_status; ServerContext svr_ctx; ServerAsyncReaderWriter response_rw(&svr_ctx); service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), fixture->cq(), tag(0)); std::unique_ptr stub( EchoTestService::NewStub(fixture->channel())); ClientContext cli_ctx; auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); int need_tags = (1 << 0) | (1 << 1); void* t; bool ok; while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); GPR_ASSERT(ok); int i = static_cast((intptr_t)t); GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } request_rw->Read(&recv_response, tag(0)); while (state.KeepRunning()) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); response_rw.Write(send_response, tag(1)); while (true) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); if (t == tag(0)) { request_rw->Read(&recv_response, tag(0)); } else if (t == tag(1)) { break; } else { GPR_ASSERT(false); } } } response_rw.Finish(Status::OK, tag(1)); need_tags = (1 << 0) | (1 << 1); while (need_tags) { GPR_ASSERT(fixture->cq()->Next(&t, &ok)); int i = static_cast((intptr_t)t); GPR_ASSERT(need_tags & (1 << i)); need_tags &= ~(1 << i); } } fixture->Finish(state); fixture.reset(); state.SetBytesProcessed(state.range(0) * state.iterations()); } } // namespace testing } // namespace grpc #endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H