From 9a3997c8537d168f4b869c3912994c2dc6f476ba Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 30 Jan 2017 13:22:33 -0800 Subject: Add a test verifying number of writes per RPC is reasonable Mostly this is code copied from bm_fullstack and rephrased as a test. I'm resisting the urge to unify it however, as I expect this code will evolve differently over time. --- test/cpp/performance/writes_per_rpc_test.cc | 268 ++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 test/cpp/performance/writes_per_rpc_test.cc (limited to 'test/cpp/performance') diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc new file mode 100644 index 0000000000..7a914c1547 --- /dev/null +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -0,0 +1,268 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern "C" { +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/endpoint_pair.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/server.h" +#include "test/core/util/passthru_endpoint.h" +#include "test/core/util/port.h" +} +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/test_config.h" + +namespace grpc { +namespace testing { + +static void* tag(intptr_t x) { return reinterpret_cast(x); } + +static void ApplyCommonServerBuilderConfig(ServerBuilder* b) { + b->SetMaxReceiveMessageSize(INT_MAX); + b->SetMaxSendMessageSize(INT_MAX); +} + +static void ApplyCommonChannelArguments(ChannelArguments* c) { + c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); + c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); +} + +static class InitializeStuff { + public: + InitializeStuff() { + init_lib_.init(); + rq_ = grpc_resource_quota_create("bm"); + } + + ~InitializeStuff() { init_lib_.shutdown(); } + + grpc_resource_quota* rq() { return rq_; } + + private: + internal::GrpcLibrary init_lib_; + grpc_resource_quota* rq_; +} initialize_stuff; + +class EndpointPairFixture { + public: + EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) { + ServerBuilder b; + cq_ = b.AddCompletionQueue(true); + b.RegisterService(service); + ApplyCommonServerBuilderConfig(&b); + server_ = b.BuildAndStart(); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + /* add server endpoint to server_ */ + { + const grpc_channel_args* server_args = + grpc_server_get_channel_args(server_->c_server()); + grpc_transport* transport = grpc_create_chttp2_transport( + &exec_ctx, server_args, endpoints.server, 0 /* is_client */); + + grpc_pollset** pollsets; + size_t num_pollsets = 0; + grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets); + + for (size_t i = 0; i < num_pollsets; i++) { + grpc_endpoint_add_to_pollset(&exec_ctx, endpoints.server, pollsets[i]); + } + + grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport, + NULL, server_args); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + } + + /* create channel */ + { + ChannelArguments args; + args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); + ApplyCommonChannelArguments(&args); + + grpc_channel_args c_args = args.c_channel_args(); + grpc_transport* transport = + grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1); + GPR_ASSERT(transport); + grpc_channel* channel = grpc_channel_create( + &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + + channel_ = CreateChannelInternal("", channel); + } + + grpc_exec_ctx_finish(&exec_ctx); + } + + virtual ~EndpointPairFixture() { + server_->Shutdown(); + cq_->Shutdown(); + void* tag; + bool ok; + while (cq_->Next(&tag, &ok)) { + } + } + + ServerCompletionQueue* cq() { return cq_.get(); } + std::shared_ptr channel() { return channel_; } + + private: + std::unique_ptr server_; + std::unique_ptr cq_; + std::shared_ptr channel_; +}; + +class InProcessCHTTP2 : public EndpointPairFixture { + public: + InProcessCHTTP2(Service* service) + : EndpointPairFixture(service, MakeEndpoints()) {} + + int writes_performed() const { return stats_.num_writes; } + + private: + grpc_passthru_endpoint_stats stats_; + + grpc_endpoint_pair MakeEndpoints() { + grpc_endpoint_pair p; + grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(), + &stats_); + return p; + } +}; + +static double UnaryPingPong(int request_size, int response_size) { + const int kIterations = 10000; + + EchoTestService::AsyncService service; + std::unique_ptr fixture(new InProcessCHTTP2(&service)); + EchoRequest send_request; + EchoResponse send_response; + EchoResponse recv_response; + if (request_size > 0) { + send_request.set_message(std::string(request_size, 'a')); + } + if (response_size > 0) { + send_response.set_message(std::string(response_size, 'a')); + } + Status recv_status; + struct ServerEnv { + ServerContext ctx; + EchoRequest recv_request; + grpc::ServerAsyncResponseWriter response_writer; + ServerEnv() : response_writer(&ctx) {} + }; + uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; + ServerEnv* server_env[2] = { + reinterpret_cast(server_env_buffer), + reinterpret_cast(server_env_buffer + sizeof(ServerEnv))}; + new (server_env[0]) ServerEnv; + new (server_env[1]) ServerEnv; + service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, + &server_env[0]->response_writer, fixture->cq(), + fixture->cq(), tag(0)); + service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, + &server_env[1]->response_writer, fixture->cq(), + fixture->cq(), tag(1)); + std::unique_ptr stub( + EchoTestService::NewStub(fixture->channel())); + for (int iteration = 0; iteration < kIterations; iteration++) { + recv_response.Clear(); + ClientContext cli_ctx; + std::unique_ptr> response_reader( + stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); + void* t; + bool ok; + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + GPR_ASSERT(t == tag(0) || t == tag(1)); + intptr_t slot = reinterpret_cast(t); + ServerEnv* senv = server_env[slot]; + senv->response_writer.Finish(send_response, Status::OK, tag(3)); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + for (int i = (1 << 3) | (1 << 4); i != 0;) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int tagnum = (int)reinterpret_cast(t); + GPR_ASSERT(i & (1 << tagnum)); + i -= 1 << tagnum; + } + GPR_ASSERT(recv_status.ok()); + + senv->~ServerEnv(); + senv = new (senv) ServerEnv(); + service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, + fixture->cq(), fixture->cq(), tag(slot)); + } + + double writes_per_iteration = + (double)fixture->writes_performed() / (double)kIterations; + + fixture.reset(); + server_env[0]->~ServerEnv(); + server_env[1]->~ServerEnv(); + + return writes_per_iteration; +} + +TEST(WritesPerRpcTest, UnaryPingPong) { + EXPECT_LT(UnaryPingPong(0, 0), 2.05); + EXPECT_LT(UnaryPingPong(1, 0), 2.05); + EXPECT_LT(UnaryPingPong(0, 1), 2.05); + EXPECT_LT(UnaryPingPong(4096, 0), 2.2); + EXPECT_LT(UnaryPingPong(0, 4096), 2.2); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3