diff options
author | 2015-02-26 16:35:35 -0800 | |
---|---|---|
committer | 2015-02-26 16:35:35 -0800 | |
commit | dea740f3297ea78af9d5b76f89aedee869a74963 (patch) | |
tree | 0a40ed8e4612f889b8aa47c7d97b5eca9d85d187 /test/cpp/qps/client_async.cc | |
parent | edfd1023aebfe79bc97d4386db6e55940c686028 (diff) |
New multithreaded async C++ tests. The server is architected the way
that it should be with multiple threads waiting on a single
completion queue.
The client currently uses a separate completion
queue per-thread, as trying to do a single unified queue was leading
to crashes for me. I need to figure that out.
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r-- | test/cpp/qps/client_async.cc | 340 |
1 files changed, 340 insertions, 0 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc new file mode 100644 index 0000000000..13db4febae --- /dev/null +++ b/test/cpp/qps/client_async.cc @@ -0,0 +1,340 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <cassert> +#include <functional> +#include <memory> +#include <string> +#include <thread> +#include <vector> +#include <sstream> + +#include <grpc/grpc.h> +#include <grpc/support/histogram.h> +#include <grpc/support/log.h> +#include <gflags/gflags.h> +#include <grpc++/async_unary_call.h> +#include <grpc++/client_context.h> +#include <grpc++/status.h> +#include "test/core/util/grpc_profiler.h" +#include "test/cpp/util/create_test_channel.h" +#include "test/cpp/qps/qpstest.pb.h" + +DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); +DEFINE_int32(server_port, 0, "Server port."); +DEFINE_string(server_host, "127.0.0.1", "Server host."); +DEFINE_int32(client_threads, 4, "Number of client threads."); + +// We have a configurable number of channels for sending RPCs. +// RPCs are sent round-robin on the available channels by the +// various threads. Interesting cases are 1 global channel or +// 1 per-thread channel, but we can support any number. +// The channels are assigned round-robin on an RPC by RPC basis +// rather than just at initialization time in order to also measure the +// impact of cache thrashing caused by channel changes. This is an issue +// if you are not in one of the above "interesting cases" +DEFINE_int32(client_channels, 4, "Number of client channels."); + +DEFINE_int32(num_rpcs, 1000, "Number of RPCs per thread."); +DEFINE_int32(payload_size, 1, "Payload size in bytes"); + +// Alternatively, specify parameters for test as a workload so that multiple +// tests are initiated back-to-back. This is convenient for keeping a borg +// allocation consistent. This is a space-separated list of +// [threads channels num_rpcs payload_size ]* +DEFINE_string(workload, "", "Workload parameters"); + +using grpc::ChannelInterface; +using grpc::CreateTestChannel; +using grpc::testing::ServerStats; +using grpc::testing::SimpleRequest; +using grpc::testing::SimpleResponse; +using grpc::testing::StatsRequest; +using grpc::testing::TestService; + +// In some distros, gflags is in the namespace google, and in some others, +// in gflags. This hack is enabling us to find both. +namespace google { } +namespace gflags { } +using namespace google; +using namespace gflags; + +static double now() { + gpr_timespec tv = gpr_now(); + return 1e9 * tv.tv_sec + tv.tv_nsec; +} + + class ClientRpcContext { + public: + ClientRpcContext() {} + virtual ~ClientRpcContext() {} + virtual bool operator()() = 0; // do next state, return false if steps done + static void *tag(ClientRpcContext *c) {return reinterpret_cast<void *>(c);} + static ClientRpcContext *detag(void *t) { + return reinterpret_cast<ClientRpcContext *>(t); + } + virtual void report_stats(gpr_histogram *hist) = 0; + }; + template <class RequestType, class ResponseType> + class ClientRpcContextUnaryImpl : public ClientRpcContext { + public: + ClientRpcContextUnaryImpl(const RequestType& req, + std::function<grpc::ClientAsyncResponseReader< + ResponseType> *(grpc::ClientContext *, + const RequestType&, void *)> start_req, + std::function<void(grpc::Status, ResponseType *)> on_done): + context_(), req_(req), response_(), + next_state_(&ClientRpcContextUnaryImpl::ReqSent), + callback_(on_done), + start_(now()), + response_reader_(start_req(&context_, req_, + ClientRpcContext::tag(this))) { + } + ~ClientRpcContextUnaryImpl() override {} + bool operator()() override {return (this->*next_state_)();} + void report_stats(gpr_histogram *hist) override { + gpr_histogram_add(hist, now()-start_); + } + private: + bool ReqSent() { + next_state_ = &ClientRpcContextUnaryImpl::RespDone; + response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); + return true; + } + bool RespDone() { + next_state_ = &ClientRpcContextUnaryImpl::DoCallBack; + return false; + } + bool DoCallBack() { + callback_(status_, &response_); + return false; + } + grpc::ClientContext context_; + RequestType req_; + ResponseType response_; + bool (ClientRpcContextUnaryImpl::*next_state_)(); + std::function<void(grpc::Status, ResponseType *)> callback_; + grpc::Status status_; + double start_; + std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_; + }; + +static void RunTest(const int client_threads, const int client_channels, + const int num_rpcs, const int payload_size) { + gpr_log(GPR_INFO, + "QPS test with parameters\n" + "enable_ssl = %d\n" + "client_channels = %d\n" + "client_threads = %d\n" + "num_rpcs = %d\n" + "payload_size = %d\n" + "server_host:server_port = %s:%d\n\n", + FLAGS_enable_ssl, client_channels, client_threads, num_rpcs, + payload_size, FLAGS_server_host.c_str(), FLAGS_server_port); + + std::ostringstream oss; + oss << FLAGS_server_host << ":" << FLAGS_server_port; + + class ClientChannelInfo { + public: + explicit ClientChannelInfo(const grpc::string &server) + : channel_(CreateTestChannel(server, FLAGS_enable_ssl)), + stub_(TestService::NewStub(channel_)) {} + ChannelInterface *get_channel() { return channel_.get(); } + TestService::Stub *get_stub() { return stub_.get(); } + + private: + std::shared_ptr<ChannelInterface> channel_; + std::unique_ptr<TestService::Stub> stub_; + }; + + std::vector<ClientChannelInfo> channels; + for (int i = 0; i < client_channels; i++) { + channels.push_back(ClientChannelInfo(oss.str())); + } + + std::vector<std::thread> threads; // Will add threads when ready to execute + std::vector<::gpr_histogram *> thread_stats(client_threads); + + TestService::Stub *stub_stats = channels[0].get_stub(); + grpc::ClientContext context_stats_begin; + StatsRequest stats_request; + ServerStats server_stats_begin; + stats_request.set_test_num(0); + grpc::Status status_beg = stub_stats->CollectServerStats( + &context_stats_begin, stats_request, &server_stats_begin); + + grpc_profiler_start("qps_client_async.prof"); + + auto CheckDone = [=](grpc::Status s, SimpleResponse *response) { + GPR_ASSERT(s.IsOk() && + (response->payload().type() == + grpc::testing::PayloadType::COMPRESSABLE) && + (response->payload().body().length() == + static_cast<size_t>(payload_size))); + }; + + for (int i = 0; i < client_threads; i++) { + gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); + GPR_ASSERT(hist != NULL); + thread_stats[i] = hist; + + threads.push_back( + std::thread([hist, client_threads, client_channels, num_rpcs, + payload_size, &channels, &CheckDone](int channel_num) { + using namespace std::placeholders; + SimpleRequest request; + request.set_response_type( + grpc::testing::PayloadType::COMPRESSABLE); + request.set_response_size(payload_size); + + grpc::CompletionQueue cli_cq; + + int rpcs_sent=0; + while (rpcs_sent < num_rpcs) { + rpcs_sent++; + TestService::Stub *stub = + channels[channel_num].get_stub(); + grpc::ClientContext context; + auto start_req = std::bind(static_cast<grpc::ClientAsyncResponseReader<SimpleResponse>*(TestService::Stub::*)(grpc::ClientContext *,const SimpleRequest &,grpc::CompletionQueue *,void *)> + (&TestService::Stub::UnaryCall), + stub, _1, _2, &cli_cq, _3); + new ClientRpcContextUnaryImpl<SimpleRequest, + SimpleResponse>(request, + start_req, + CheckDone); + void *got_tag; + bool ok; + + // Need to call 2 next for every 1 RPC (1 for req done, 1 for resp done) + cli_cq.Next(&got_tag,&ok); + if (!ok) + break; + ClientRpcContext *ctx = ClientRpcContext::detag(got_tag); + if ((*ctx)() == false) { + // call the callback and then delete it + (*ctx)(); + delete ctx; + } + cli_cq.Next(&got_tag,&ok); + if (!ok) + break; + ctx = ClientRpcContext::detag(got_tag); + if ((*ctx)() == false) { + // call the callback and then delete it + ctx->report_stats(hist); + (*ctx)(); + delete ctx; + } + // Now do runtime round-robin assignment of the next + // channel number + channel_num += client_threads; + channel_num %= client_channels; + } + }, + i % client_channels)); + } + + gpr_histogram *hist = gpr_histogram_create(0.01, 60e9); + GPR_ASSERT(hist != NULL); + for (auto &t : threads) { + t.join(); + } + + grpc_profiler_stop(); + + for (int i = 0; i < client_threads; i++) { + gpr_histogram *h = thread_stats[i]; + gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f", + i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90), + gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99), + gpr_histogram_percentile(h, 99.9)); + gpr_histogram_merge(hist, h); + gpr_histogram_destroy(h); + } + + gpr_log( + GPR_INFO, + "latency across %d threads with %d channels and %d payload " + "(50/90/95/99/99.9): %f / %f / %f / %f / %f", + client_threads, client_channels, payload_size, + gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90), + gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99), + gpr_histogram_percentile(hist, 99.9)); + gpr_histogram_destroy(hist); + + grpc::ClientContext context_stats_end; + ServerStats server_stats_end; + grpc::Status status_end = stub_stats->CollectServerStats( + &context_stats_end, stats_request, &server_stats_end); + + double elapsed = server_stats_end.time_now() - server_stats_begin.time_now(); + int total_rpcs = client_threads * num_rpcs; + double utime = server_stats_end.time_user() - server_stats_begin.time_user(); + double stime = + server_stats_end.time_system() - server_stats_begin.time_system(); + gpr_log(GPR_INFO, + "Elapsed time: %.3f\n" + "RPC Count: %d\n" + "QPS: %.3f\n" + "System time: %.3f\n" + "User time: %.3f\n" + "Resource usage: %.1f%%\n", + elapsed, total_rpcs, total_rpcs / elapsed, stime, utime, + (stime + utime) / elapsed * 100.0); +} + +int main(int argc, char **argv) { + grpc_init(); + ParseCommandLineFlags(&argc, &argv, true); + + GPR_ASSERT(FLAGS_server_port); + + if (FLAGS_workload.length() == 0) { + RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs, + FLAGS_payload_size); + } else { + std::istringstream workload(FLAGS_workload); + int client_threads, client_channels, num_rpcs, payload_size; + workload >> client_threads; + while (!workload.eof()) { + workload >> client_channels >> num_rpcs >> payload_size; + RunTest(client_threads, client_channels, num_rpcs, payload_size); + workload >> client_threads; + } + gpr_log(GPR_INFO, "Done with specified workload."); + } + + grpc_shutdown(); + return 0; +} |