diff options
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r-- | test/cpp/qps/client_async.cc | 258 |
1 files changed, 204 insertions, 54 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 00bbd8a8a0..921836e201 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -32,8 +32,11 @@ */ #include <cassert> +#include <forward_list> #include <functional> +#include <list> #include <memory> +#include <mutex> #include <string> #include <thread> #include <vector> @@ -55,38 +58,55 @@ namespace grpc { namespace testing { +typedef std::list<grpc_time> deadline_list; + class ClientRpcContext { public: - ClientRpcContext() {} + ClientRpcContext(int ch) : channel_id_(ch) {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; - virtual void StartNewClone() = 0; + virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } static ClientRpcContext* detag(void* t) { return reinterpret_cast<ClientRpcContext*>(t); } + + deadline_list::iterator deadline_posn() const { return deadline_posn_; } + void set_deadline_posn(const deadline_list::iterator& it) { + deadline_posn_ = it; + } + virtual void Start(CompletionQueue* cq) = 0; + int channel_id() const { return channel_id_; } + + protected: + int channel_id_; + + private: + deadline_list::iterator deadline_posn_; }; template <class RequestType, class ResponseType> class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( - TestService::Stub* stub, const RequestType& req, + int channel_id, TestService::Stub* stub, const RequestType& req, std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&)> - start_req, + TestService::Stub*, grpc::ClientContext*, const RequestType&, + CompletionQueue*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : context_(), + : ClientRpcContext(channel_id), + context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::RespDone), callback_(on_done), - start_req_(start_req), - start_(Timer::Now()), - response_reader_(start_req(stub_, &context_, req_)) { + start_req_(start_req) {} + void Start(CompletionQueue* cq) GRPC_OVERRIDE { + start_ = Timer::Now(); + response_reader_ = start_req_(stub_, &context_, req_, cq); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); } ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} @@ -98,8 +118,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return ret; } - void StartNewClone() GRPC_OVERRIDE { - new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_, + callback_); } private: @@ -109,7 +130,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } bool DoCallBack(bool) { callback_(status_, &response_); - return false; + return true; // we're done, this'll be ignored } grpc::ClientContext context_; TestService::Stub* stub_; @@ -118,29 +139,54 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { bool (ClientRpcContextUnaryImpl::*next_state_)(bool); std::function<void(grpc::Status, ResponseType*)> callback_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req_; + TestService::Stub*, grpc::ClientContext*, const RequestType&, + CompletionQueue*)> start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_; }; +typedef std::forward_list<ClientRpcContext*> context_list; + class AsyncClient : public Client { public: - explicit AsyncClient(const ClientConfig& config, - std::function<void(CompletionQueue*, TestService::Stub*, - const SimpleRequest&)> setup_ctx) - : Client(config) { + explicit AsyncClient( + const ClientConfig& config, + std::function<ClientRpcContext*(int, TestService::Stub*, + const SimpleRequest&)> setup_ctx) + : Client(config), + channel_lock_(config.client_channels()), + contexts_(config.client_channels()), + max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), + channel_count_(config.client_channels()), + pref_channel_inc_(config.async_client_threads()) { + SetupLoadTest(config, config.async_client_threads()); + for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); + if (!closed_loop_) { + rpc_deadlines_.emplace_back(); + next_channel_.push_back(i % channel_count_); + issue_allowed_.push_back(true); + + grpc_time next_issue; + NextIssueTime(i, &next_issue); + next_issue_.push_back(next_issue); + } } + int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (auto channel = channels_.begin(); channel != channels_.end(); - channel++) { + for (int ch = 0; ch < channel_count_; ch++) { auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - setup_ctx(cq, channel->get_stub(), request_); + auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_); + if (closed_loop_) { + ctx->Start(cq); + } else { + contexts_[ch].push_front(ctx); + } } } } @@ -159,30 +205,126 @@ class AsyncClient : public Client { size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext( - &got_tag, &ok, - std::chrono::system_clock::now() + std::chrono::seconds(1))) { + grpc_time deadline, short_deadline; + if (closed_loop_) { + deadline = grpc_time_source::now() + std::chrono::seconds(1); + short_deadline = deadline; + } else { + if (rpc_deadlines_[thread_idx].empty()) { + deadline = grpc_time_source::now() + std::chrono::seconds(1); + } else { + deadline = *(rpc_deadlines_[thread_idx].begin()); + } + short_deadline = + issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline; + } + + bool got_event; + + switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::TIMEOUT: - return true; + got_event = false; + break; case CompletionQueue::GOT_EVENT: + got_event = true; + break; + default: + GPR_ASSERT(false); break; } - - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then delete it - ctx->RunNextState(ok, histogram); - ctx->StartNewClone(); - delete ctx; + if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) && + grpc_time_source::now() > deadline) { + // we have missed some 1-second deadline, which is worth noting + gpr_log(GPR_INFO, "Missed an RPC deadline"); + // Don't give up, as there might be some truly heavy tails + } + if (got_event) { + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState(ok, histogram) == false) { + // call the callback and then clone the ctx + ctx->RunNextState(ok, histogram); + ClientRpcContext* clone_ctx = ctx->StartNewClone(); + if (closed_loop_) { + clone_ctx->Start(cli_cqs_[thread_idx].get()); + } else { + // Remove the entry from the rpc deadlines list + rpc_deadlines_[thread_idx].erase(ctx->deadline_posn()); + // Put the clone_ctx in the list of idle contexts for this channel + // Under lock + int ch = clone_ctx->channel_id(); + std::lock_guard<std::mutex> g(channel_lock_[ch]); + contexts_[ch].push_front(clone_ctx); + } + // delete the old version + delete ctx; + } + if (!closed_loop_) + issue_allowed_[thread_idx] = + true; // may be ok now even if it hadn't been + } + if (!closed_loop_ && issue_allowed_[thread_idx] && + grpc_time_source::now() >= next_issue_[thread_idx]) { + // Attempt to issue + bool issued = false; + for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx]; + num_attempts < channel_count_ && !issued; num_attempts++) { + bool can_issue = false; + ClientRpcContext* ctx = nullptr; + { + std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]); + if (!contexts_[channel_attempt].empty()) { + // Get an idle context from the front of the list + ctx = *(contexts_[channel_attempt].begin()); + contexts_[channel_attempt].pop_front(); + can_issue = true; + } + } + if (can_issue) { + // do the work to issue + rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() + + std::chrono::seconds(1)); + auto it = rpc_deadlines_[thread_idx].end(); + --it; + ctx->set_deadline_posn(it); + ctx->Start(cli_cqs_[thread_idx].get()); + issued = true; + // If we did issue, then next time, try our thread's next + // preferred channel + next_channel_[thread_idx] += pref_channel_inc_; + if (next_channel_[thread_idx] >= channel_count_) + next_channel_[thread_idx] = (thread_idx % channel_count_); + } else { + // Do a modular increment of channel attempt if we couldn't issue + channel_attempt = (channel_attempt + 1) % channel_count_; + } + } + if (issued) { + // We issued one; see when we can issue the next + grpc_time next_issue; + NextIssueTime(thread_idx, &next_issue); + next_issue_[thread_idx] = next_issue; + } else { + issue_allowed_[thread_idx] = false; + } } - return true; } private: std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; + + std::vector<deadline_list> rpc_deadlines_; // per thread deadlines + std::vector<int> next_channel_; // per thread round-robin channel ctr + std::vector<bool> issue_allowed_; // may this thread attempt to issue + std::vector<grpc_time> next_issue_; // when should it issue? + + std::vector<std::mutex> channel_lock_; + std::vector<context_list> contexts_; // per-channel list of idle contexts + int max_outstanding_per_channel_; + int channel_count_; + int pref_channel_inc_; }; class AsyncUnaryClient GRPC_FINAL : public AsyncClient { @@ -194,15 +336,15 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } private: - static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, - const SimpleRequest& req) { + static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request) { + auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request, CompletionQueue* cq) { return stub->AsyncUnaryCall(ctx, request, cq); }; - new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - stub, req, start_req, check_done); + return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + channel_id, stub, req, start_req, check_done); } }; @@ -210,26 +352,30 @@ template <class RequestType, class ResponseType> class ClientRpcContextStreamingImpl : public ClientRpcContext { public: ClientRpcContextStreamingImpl( - TestService::Stub* stub, const RequestType& req, - std::function<std::unique_ptr< - grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - TestService::Stub*, grpc::ClientContext*, void*)> start_req, + int channel_id, TestService::Stub* stub, const RequestType& req, + std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter< + RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*, + CompletionQueue*, void*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : context_(), + : ClientRpcContext(channel_id), + context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextStreamingImpl::ReqSent), callback_(on_done), start_req_(start_req), - start_(Timer::Now()), - stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {} + start_(Timer::Now()) {} ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { return (this->*next_state_)(ok, hist); } - void StartNewClone() GRPC_OVERRIDE { - new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_, + start_req_, callback_); + } + void Start(CompletionQueue* cq) GRPC_OVERRIDE { + stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); } private: @@ -263,7 +409,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function<void(grpc::Status, ResponseType*)> callback_; std::function< std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - TestService::Stub*, grpc::ClientContext*, void*)> start_req_; + TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> @@ -274,22 +421,25 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { public: explicit AsyncStreamingClient(const ClientConfig& config) : AsyncClient(config, SetupCtx) { + // async streaming currently only supported closed loop + GPR_ASSERT(config.load_type() == CLOSED_LOOP); + StartThreads(config.async_client_threads()); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } private: - static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, - const SimpleRequest& req) { + static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, - void* tag) { + auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, + CompletionQueue* cq, void* tag) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - stub, req, start_req, check_done); + return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( + channel_id, stub, req, start_req, check_done); } }; |