diff options
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r-- | test/cpp/qps/client_async.cc | 62 |
1 files changed, 39 insertions, 23 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index fbbc19f58d..06bc3ee9c2 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -57,9 +57,11 @@ namespace grpc { namespace testing { +typedef std::forward_list<grpc_time> deadline_list; + class ClientRpcContext { public: - ClientRpcContext() {} + ClientRpcContext(int ch): channel_id_(ch) {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; @@ -72,6 +74,9 @@ class ClientRpcContext { deadline_list::iterator deadline_posn() const {return deadline_posn_;} void set_deadline_posn(deadline_list::iterator&& it) {deadline_posn_ = it;} virtual void Start() = 0; + int channel_id() const {return channel_id_;} + protected: + int channel_id_; private: deadline_list::iterator deadline_posn_; }; @@ -79,14 +84,14 @@ class ClientRpcContext { template <class RequestType, class ResponseType> class ClientRpcContextUnaryImpl : public ClientRpcContext { public: - ClientRpcContextUnaryImpl( + ClientRpcContextUnaryImpl(int channel_id, TestService::Stub* stub, const RequestType& req, std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : context_(), + : ClientRpcContext(channel_id), context_(), stub_(stub), req_(req), response_(), @@ -109,7 +114,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); + return new ClientRpcContextUnaryImpl(channel_id_, + stub_, req_, start_req_, callback_); } private: @@ -135,15 +141,14 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { response_reader_; }; -typedef std::forward_list<grpc_time> deadline_list; typedef std::forward_list<ClientRpcContext *> context_list; class AsyncClient : public Client { public: explicit AsyncClient(const ClientConfig& config, - std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*, + std::function<ClientRpcContext*(int, CompletionQueue*, TestService::Stub*, const SimpleRequest&)> setup_ctx) : - Client(config), channel_rpc_lock_(config.client_channels()), + Client(config), channel_lock_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), channel_count_(config.client_channels()) { @@ -171,10 +176,10 @@ class AsyncClient : public Client { int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (int ch = 0; ch < channel_count_; ch++) { - auto channel = channels_[ch]; + auto& channel = channels_[ch]; auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - ClientRpcContext *ctx = setup_ctx(cq, channel->get_stub(), request_); + auto ctx = setup_ctx(ch, cq, channel.get_stub(), request_); if (closed_loop_) { // only relevant for closed_loop unary, but harmless for // closed_loop streaming @@ -245,7 +250,10 @@ class AsyncClient : public Client { delete ctx; if (!closed_loop_) { // Put this in the list of idle contexts for this channel - + // Under lock + int ch = clone_ctx->channel_id(); + std::lock_guard<std::mutex> g(channel_lock_[ch]); + contexts_[ch].push_front(ctx); } } issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been @@ -259,7 +267,7 @@ class AsyncClient : public Client { next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) { std::lock_guard<std::mutex> - g(channel_rpc_lock_[next_channel_[thread_idx]]); + g(channel_lock_[next_channel_[thread_idx]]); if ((rpcs_outstanding_[next_channel_[thread_idx]] < max_outstanding_per_channel_) && !contexts_[next_channel_[thread_idx]].empty()) { @@ -267,7 +275,7 @@ class AsyncClient : public Client { auto ctx = contexts_[next_channel_[thread_idx]].begin(); contexts_[next_channel_[thread_idx]].pop_front(); // do the work to issue - ctx->Start(); + (*ctx)->Start(); rpcs_outstanding_[next_channel_[thread_idx]]++; issued = true; } @@ -286,7 +294,7 @@ class AsyncClient : public Client { std::vector<bool> issue_allowed_; // may this thread attempt to issue std::vector<grpc_time> next_issue_; // when should it issue? - std::vector<std::mutex> channel_rpc_lock_; + std::vector<std::mutex> channel_lock_; std::vector<int> rpcs_outstanding_; // per-channel vector std::vector<context_list> contexts_; // per-channel list of idle contexts int max_outstanding_per_channel_; @@ -301,15 +309,18 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } private: - static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub, - const SimpleRequest& req) { + static ClientRpcContext *SetupCtx(int channel_id, + CompletionQueue* cq, + TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, const SimpleRequest& request) { return stub->AsyncUnaryCall(ctx, request, cq); }; - return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - stub, req, start_req, check_done); + return new ClientRpcContextUnaryImpl<SimpleRequest, + SimpleResponse>(channel_id, stub, req, + start_req, check_done); } }; @@ -317,13 +328,14 @@ private: template <class RequestType, class ResponseType> class ClientRpcContextStreamingImpl : public ClientRpcContext { public: - ClientRpcContextStreamingImpl( + ClientRpcContextStreamingImpl(int channel_id, TestService::Stub* stub, const RequestType& req, std::function<std::unique_ptr< grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( TestService::Stub*, grpc::ClientContext*, void*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : context_(), + : ClientRpcContext(channel_id), + context_(), stub_(stub), req_(req), response_(), @@ -337,7 +349,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return (this->*next_state_)(ok, hist); } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); + return new ClientRpcContextStreamingImpl(channel_id_, + stub_, req_, start_req_, callback_); } void Start() GRPC_OVERRIDE {} private: @@ -387,7 +400,8 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } private: - static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub, + static ClientRpcContext *SetupCtx(int channel_id, + CompletionQueue* cq, TestService::Stub* stub, const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, @@ -395,8 +409,10 @@ private: auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - stub, req, start_req, check_done); + return new ClientRpcContextStreamingImpl<SimpleRequest, + SimpleResponse>(channel_id, stub, + req, start_req, + check_done); } }; |