diff options
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r-- | test/cpp/qps/client_async.cc | 258 |
1 files changed, 54 insertions, 204 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index a238b60a9c..00bbd8a8a0 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -32,11 +32,8 @@ */ #include <cassert> -#include <forward_list> #include <functional> -#include <list> #include <memory> -#include <mutex> #include <string> #include <thread> #include <vector> @@ -58,55 +55,38 @@ namespace grpc { namespace testing { -typedef std::list<grpc_time> deadline_list; - class ClientRpcContext { public: - ClientRpcContext(int ch) : channel_id_(ch) {} + ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; - virtual ClientRpcContext* StartNewClone() = 0; + virtual void StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } static ClientRpcContext* detag(void* t) { return reinterpret_cast<ClientRpcContext*>(t); } - - deadline_list::iterator deadline_posn() const { return deadline_posn_; } - void set_deadline_posn(const deadline_list::iterator& it) { - deadline_posn_ = it; - } - virtual void Start(CompletionQueue* cq) = 0; - int channel_id() const { return channel_id_; } - - protected: - int channel_id_; - - private: - deadline_list::iterator deadline_posn_; }; template <class RequestType, class ResponseType> class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( - int channel_id, TestService::Stub* stub, const RequestType& req, + TestService::Stub* stub, const RequestType& req, std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req, + TestService::Stub*, grpc::ClientContext*, const RequestType&)> + start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : ClientRpcContext(channel_id), - context_(), + : context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::RespDone), callback_(on_done), - start_req_(start_req) {} - void Start(CompletionQueue* cq) GRPC_OVERRIDE { - start_ = Timer::Now(); - response_reader_ = start_req_(stub_, &context_, req_, cq); + start_req_(start_req), + start_(Timer::Now()), + response_reader_(start_req(stub_, &context_, req_)) { response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); } ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} @@ -118,9 +98,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return ret; } - ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_, - callback_); + void StartNewClone() GRPC_OVERRIDE { + new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); } private: @@ -130,7 +109,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } bool DoCallBack(bool) { callback_(status_, &response_); - return true; // we're done, this'll be ignored + return false; } grpc::ClientContext context_; TestService::Stub* stub_; @@ -139,54 +118,29 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { bool (ClientRpcContextUnaryImpl::*next_state_)(bool); std::function<void(grpc::Status, ResponseType*)> callback_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req_; + TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_; }; -typedef std::forward_list<ClientRpcContext*> context_list; - class AsyncClient : public Client { public: - explicit AsyncClient( - const ClientConfig& config, - std::function<ClientRpcContext*(int, TestService::Stub*, - const SimpleRequest&)> setup_ctx) - : Client(config), - channel_lock_(config.client_channels()), - contexts_(config.client_channels()), - max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), - channel_count_(config.client_channels()), - pref_channel_inc_(config.async_client_threads()) { - SetupLoadTest(config, config.async_client_threads()); - + explicit AsyncClient(const ClientConfig& config, + std::function<void(CompletionQueue*, TestService::Stub*, + const SimpleRequest&)> setup_ctx) + : Client(config) { for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); - if (!closed_loop_) { - rpc_deadlines_.emplace_back(); - next_channel_.push_back(i % channel_count_); - issue_allowed_.push_back(true); - - grpc_time next_issue; - NextIssueTime(i, &next_issue); - next_issue_.push_back(next_issue); - } } - int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (int ch = 0; ch < channel_count_; ch++) { + for (auto channel = channels_.begin(); channel != channels_.end(); + channel++) { auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_); - if (closed_loop_) { - ctx->Start(cq); - } else { - contexts_[ch].push_front(ctx); - } + setup_ctx(cq, channel->get_stub(), request_); } } } @@ -205,126 +159,30 @@ class AsyncClient : public Client { size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - grpc_time deadline, short_deadline; - if (closed_loop_) { - deadline = grpc_time_source::now() + std::chrono::seconds(1); - short_deadline = deadline; - } else { - if (rpc_deadlines_[thread_idx].empty()) { - deadline = grpc_time_source::now() + std::chrono::seconds(1); - } else { - deadline = *(rpc_deadlines_[thread_idx].begin()); - } - short_deadline = - issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline; - } - - bool got_event; - - switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { + switch (cli_cqs_[thread_idx]->AsyncNext( + &got_tag, &ok, + std::chrono::system_clock::now() + std::chrono::seconds(1))) { case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::TIMEOUT: - got_event = false; - break; + return true; case CompletionQueue::GOT_EVENT: - got_event = true; - break; - default: - GPR_ASSERT(false); break; } - if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) && - grpc_time_source::now() > deadline) { - // we have missed some 1-second deadline, which is too much - gpr_log(GPR_INFO, "Missed an RPC deadline, giving up"); - return false; - } - if (got_event) { - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then clone the ctx - ctx->RunNextState(ok, histogram); - ClientRpcContext* clone_ctx = ctx->StartNewClone(); - if (closed_loop_) { - clone_ctx->Start(cli_cqs_[thread_idx].get()); - } else { - // Remove the entry from the rpc deadlines list - rpc_deadlines_[thread_idx].erase(ctx->deadline_posn()); - // Put the clone_ctx in the list of idle contexts for this channel - // Under lock - int ch = clone_ctx->channel_id(); - std::lock_guard<std::mutex> g(channel_lock_[ch]); - contexts_[ch].push_front(clone_ctx); - } - // delete the old version - delete ctx; - } - if (!closed_loop_) - issue_allowed_[thread_idx] = - true; // may be ok now even if it hadn't been - } - if (!closed_loop_ && issue_allowed_[thread_idx] && - grpc_time_source::now() >= next_issue_[thread_idx]) { - // Attempt to issue - bool issued = false; - for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx]; - num_attempts < channel_count_ && !issued; num_attempts++) { - bool can_issue = false; - ClientRpcContext* ctx = nullptr; - { - std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]); - if (!contexts_[channel_attempt].empty()) { - // Get an idle context from the front of the list - ctx = *(contexts_[channel_attempt].begin()); - contexts_[channel_attempt].pop_front(); - can_issue = true; - } - } - if (can_issue) { - // do the work to issue - rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() + - std::chrono::seconds(1)); - auto it = rpc_deadlines_[thread_idx].end(); - --it; - ctx->set_deadline_posn(it); - ctx->Start(cli_cqs_[thread_idx].get()); - issued = true; - // If we did issue, then next time, try our thread's next - // preferred channel - next_channel_[thread_idx] += pref_channel_inc_; - if (next_channel_[thread_idx] >= channel_count_) - next_channel_[thread_idx] = (thread_idx % channel_count_); - } else { - // Do a modular increment of channel attempt if we couldn't issue - channel_attempt = (channel_attempt + 1) % channel_count_; - } - } - if (issued) { - // We issued one; see when we can issue the next - grpc_time next_issue; - NextIssueTime(thread_idx, &next_issue); - next_issue_[thread_idx] = next_issue; - } else { - issue_allowed_[thread_idx] = false; - } + + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState(ok, histogram) == false) { + // call the callback and then delete it + ctx->RunNextState(ok, histogram); + ctx->StartNewClone(); + delete ctx; } + return true; } private: std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; - - std::vector<deadline_list> rpc_deadlines_; // per thread deadlines - std::vector<int> next_channel_; // per thread round-robin channel ctr - std::vector<bool> issue_allowed_; // may this thread attempt to issue - std::vector<grpc_time> next_issue_; // when should it issue? - - std::vector<std::mutex> channel_lock_; - std::vector<context_list> contexts_; // per-channel list of idle contexts - int max_outstanding_per_channel_; - int channel_count_; - int pref_channel_inc_; }; class AsyncUnaryClient GRPC_FINAL : public AsyncClient { @@ -336,15 +194,15 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } private: - static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, - const SimpleRequest& req) { + static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request, CompletionQueue* cq) { + auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request) { return stub->AsyncUnaryCall(ctx, request, cq); }; - return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, start_req, check_done); + new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + stub, req, start_req, check_done); } }; @@ -352,30 +210,26 @@ template <class RequestType, class ResponseType> class ClientRpcContextStreamingImpl : public ClientRpcContext { public: ClientRpcContextStreamingImpl( - int channel_id, TestService::Stub* stub, const RequestType& req, - std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter< - RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*, - CompletionQueue*, void*)> start_req, + TestService::Stub* stub, const RequestType& req, + std::function<std::unique_ptr< + grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( + TestService::Stub*, grpc::ClientContext*, void*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : ClientRpcContext(channel_id), - context_(), + : context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextStreamingImpl::ReqSent), callback_(on_done), start_req_(start_req), - start_(Timer::Now()) {} + start_(Timer::Now()), + stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {} ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { return (this->*next_state_)(ok, hist); } - ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_, - start_req_, callback_); - } - void Start(CompletionQueue* cq) GRPC_OVERRIDE { - stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); + void StartNewClone() GRPC_OVERRIDE { + new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); } private: @@ -409,8 +263,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function<void(grpc::Status, ResponseType*)> callback_; std::function< std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> - start_req_; + TestService::Stub*, grpc::ClientContext*, void*)> start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> @@ -421,25 +274,22 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { public: explicit AsyncStreamingClient(const ClientConfig& config) : AsyncClient(config, SetupCtx) { - // async streaming currently only supported closed loop - GPR_ASSERT(config.load_type() == CLOSED_LOOP); - StartThreads(config.async_client_threads()); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } private: - static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, - const SimpleRequest& req) { + static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - CompletionQueue* cq, void* tag) { + auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, + void* tag) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, start_req, check_done); + new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( + stub, req, start_req, check_done); } }; |