diff options
author | Moiz Haidry <42048362+mhaidrygoog@users.noreply.github.com> | 2018-10-12 10:17:24 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-12 10:17:24 -0700 |
commit | 2478ea4289687b7cdfd25a3f836be08f65647896 (patch) | |
tree | 437bdc20980beeeb10408e01cdcb885dadd52e99 /test | |
parent | 5b7c0933867f05ce94b0f373224b4fb3ed15d3c2 (diff) | |
parent | 145a3bf91fe1e7d0aa40f6e1f80f621d26473b1f (diff) |
Merge pull request #16807 from mhaidrygoog/callback_unary_benchmark
Benchmark test for unary callback based RPCs in gRPC
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/qps/BUILD | 1 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 1 | ||||
-rw-r--r-- | test/cpp/qps/client_callback.cc | 219 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 2 |
4 files changed, 223 insertions, 0 deletions
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD index 483b29b1b3..2ef7441371 100644 --- a/test/cpp/qps/BUILD +++ b/test/cpp/qps/BUILD @@ -31,6 +31,7 @@ grpc_cc_library( name = "qps_worker_impl", srcs = [ "client_async.cc", + "client_callback.cc", "client_sync.cc", "qps_server_builder.cc", "qps_worker.cc", diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 0b4b2ff0a9..4ed34e0405 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -533,6 +533,7 @@ class ClientImpl : public Client { std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args); std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args); +std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& args); std::unique_ptr<Client> CreateGenericAsyncStreamingClient( const ClientConfig& args); diff --git a/test/cpp/qps/client_callback.cc b/test/cpp/qps/client_callback.cc new file mode 100644 index 0000000000..87889e36dc --- /dev/null +++ b/test/cpp/qps/client_callback.cc @@ -0,0 +1,219 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <list> +#include <memory> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> +#include <utility> +#include <vector> + +#include <grpc/grpc.h> +#include <grpc/support/cpu.h> +#include <grpc/support/log.h> +#include <grpcpp/alarm.h> +#include <grpcpp/channel.h> +#include <grpcpp/client_context.h> + +#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" +#include "test/cpp/qps/client.h" +#include "test/cpp/qps/usage_timer.h" + +namespace grpc { +namespace testing { + +/** + * Maintains context info per RPC + */ +struct CallbackClientRpcContext { + CallbackClientRpcContext(BenchmarkService::Stub* stub) : stub_(stub) {} + + ~CallbackClientRpcContext() {} + + SimpleResponse response_; + ClientContext context_; + Alarm alarm_; + BenchmarkService::Stub* stub_; +}; + +static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( + const std::shared_ptr<Channel>& ch) { + return BenchmarkService::NewStub(ch); +} + +class CallbackClient + : public ClientImpl<BenchmarkService::Stub, SimpleRequest> { + public: + CallbackClient(const ClientConfig& config) + : ClientImpl<BenchmarkService::Stub, SimpleRequest>( + config, BenchmarkStubCreator) { + num_threads_ = NumThreads(config); + rpcs_done_ = 0; + SetupLoadTest(config, num_threads_); + total_outstanding_rpcs_ = + config.client_channels() * config.outstanding_rpcs_per_channel(); + } + + virtual ~CallbackClient() {} + + protected: + size_t num_threads_; + size_t total_outstanding_rpcs_; + // The below mutex and condition variable is used by main benchmark thread to + // wait on completion of all RPCs before shutdown + std::mutex shutdown_mu_; + std::condition_variable shutdown_cv_; + // Number of rpcs done after thread completion + size_t rpcs_done_; + // Vector of Context data pointers for running a RPC + std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_; + + virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0; + + void ThreadFunc(size_t thread_idx, Thread* t) override { + InitThreadFuncImpl(thread_idx); + ThreadFuncImpl(t, thread_idx); + } + + virtual void ScheduleRpc(Thread* t, size_t thread_idx, + size_t ctx_vector_idx) = 0; + + /** + * The main thread of the benchmark will be waiting on DestroyMultithreading. + * Increment the rpcs_done_ variable to signify that the Callback RPC + * after thread completion is done. When the last outstanding rpc increments + * the counter it should also signal the main thread's conditional variable. + */ + void NotifyMainThreadOfThreadCompletion() { + std::lock_guard<std::mutex> l(shutdown_mu_); + rpcs_done_++; + if (rpcs_done_ == total_outstanding_rpcs_) { + shutdown_cv_.notify_one(); + } + } + + private: + int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = cores_; + gpr_log(GPR_INFO, "Sizing callback client to %d threads", num_threads); + } + return num_threads; + } + + /** + * Wait until all outstanding Callback RPCs are done + */ + void DestroyMultithreading() final { + std::unique_lock<std::mutex> l(shutdown_mu_); + while (rpcs_done_ != total_outstanding_rpcs_) { + shutdown_cv_.wait(l); + } + EndThreads(); + } +}; + +class CallbackUnaryClient final : public CallbackClient { + public: + CallbackUnaryClient(const ClientConfig& config) : CallbackClient(config) { + for (int ch = 0; ch < config.client_channels(); ch++) { + for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { + ctx_.emplace_back( + new CallbackClientRpcContext(channels_[ch].get_stub())); + } + } + StartThreads(num_threads_); + } + ~CallbackUnaryClient() {} + + protected: + bool ThreadFuncImpl(Thread* t, size_t thread_idx) override { + for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_; + vector_idx += num_threads_) { + ScheduleRpc(t, thread_idx, vector_idx); + } + return true; + } + + void InitThreadFuncImpl(size_t thread_idx) override { return; } + + private: + void ScheduleRpc(Thread* t, size_t thread_idx, size_t vector_idx) override { + if (!closed_loop_) { + gpr_timespec next_issue_time = NextIssueTime(thread_idx); + // Start an alarm callback to run the internal callback after + // next_issue_time + ctx_[vector_idx]->alarm_.experimental().Set( + next_issue_time, [this, t, thread_idx, vector_idx](bool ok) { + IssueUnaryCallbackRpc(t, thread_idx, vector_idx); + }); + } else { + IssueUnaryCallbackRpc(t, thread_idx, vector_idx); + } + } + + void IssueUnaryCallbackRpc(Thread* t, size_t thread_idx, size_t vector_idx) { + GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0); + double start = UsageTimer::Now(); + ctx_[vector_idx]->stub_->experimental_async()->UnaryCall( + (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_, + [this, t, thread_idx, start, vector_idx](grpc::Status s) { + // Update Histogram with data from the callback run + HistogramEntry entry; + if (s.ok()) { + entry.set_value((UsageTimer::Now() - start) * 1e9); + } + entry.set_status(s.error_code()); + t->UpdateHistogram(&entry); + + if (ThreadCompleted() || !s.ok()) { + // Notify thread of completion + NotifyMainThreadOfThreadCompletion(); + } else { + // Reallocate ctx for next RPC + ctx_[vector_idx].reset( + new CallbackClientRpcContext(ctx_[vector_idx]->stub_)); + // Schedule a new RPC + ScheduleRpc(t, thread_idx, vector_idx); + } + }); + } +}; + +std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) { + switch (config.rpc_type()) { + case UNARY: + return std::unique_ptr<Client>(new CallbackUnaryClient(config)); + case STREAMING: + case STREAMING_FROM_CLIENT: + case STREAMING_FROM_SERVER: + case STREAMING_BOTH_WAYS: + assert(false); + return nullptr; + default: + assert(false); + return nullptr; + } +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 7ddf3c1cf3..d97d95d8f3 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -60,6 +60,8 @@ static std::unique_ptr<Client> CreateClient(const ClientConfig& config) { return config.payload_config().has_bytebuf_params() ? CreateGenericAsyncStreamingClient(config) : CreateAsyncClient(config); + case ClientType::CALLBACK_CLIENT: + return CreateCallbackClient(config); default: abort(); } |