diff options
author | Vijay Pai <vpai@google.com> | 2016-01-07 14:08:09 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2016-01-07 14:08:09 -0800 |
commit | 9f991e252dd1a5e58bc7d5be35d8bada883cab5a (patch) | |
tree | 953f256f30e0974f67ff789b4af4593d72ba53d5 /test/cpp | |
parent | 93beeb889531fde79e14f65e270a595bfb8dc8d4 (diff) |
More changes needed for generic support
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/qps/client.h | 5 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 18 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 27 |
3 files changed, 29 insertions, 21 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 1a0a53d23b..1b5a3d4a07 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -118,8 +118,9 @@ class ClientRequestCreator<ByteBuffer> { gpr_slice s = gpr_slice_from_copied_buffer( buf.get(), payload_config.bytebuf_params().req_size()); Slice slice(s, Slice::STEAL_REF); - std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1)); - req->MoveFrom(bbuf.get()); + *req = ByteBuffer(&slice, 1); + // std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1)); + // req->MoveFrom(bbuf.get()); } else { GPR_ASSERT(false); // not appropriate for this specialization } diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 087ea75bf4..553c97fd68 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -488,9 +488,9 @@ class AsyncStreamingClient GRPC_FINAL } }; -class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { +class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { public: - ClientGenericRpcContextStreamingImpl( + ClientRpcContextGenericStreamingImpl( int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, @@ -502,16 +502,16 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { stub_(stub), req_(req), response_(), - next_state_(&ClientGenericRpcContextStreamingImpl::ReqSent), + next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent), callback_(on_done), start_req_(start_req), start_(Timer::Now()) {} - ~ClientGenericRpcContextStreamingImpl() GRPC_OVERRIDE {} + ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {} bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { return (this->*next_state_)(ok, hist); } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientGenericRpcContextStreamingImpl(channel_id_, stub_, req_, + return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_, start_req_, callback_); } void Start(CompletionQueue* cq) GRPC_OVERRIDE { @@ -528,7 +528,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { return (false); } start_ = Timer::Now(); - next_state_ = &ClientGenericRpcContextStreamingImpl::WriteDone; + next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone; stream_->Write(req_, ClientRpcContext::tag(this)); return true; } @@ -536,7 +536,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { if (!ok) { return (false); } - next_state_ = &ClientGenericRpcContextStreamingImpl::ReadDone; + next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone; stream_->Read(&response_, ClientRpcContext::tag(this)); return true; } @@ -548,7 +548,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { grpc::GenericStub* stub_; ByteBuffer req_; ByteBuffer response_; - bool (ClientGenericRpcContextStreamingImpl::*next_state_)(bool, Histogram*); + bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*); std::function<void(grpc::Status, ByteBuffer*)> callback_; std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, @@ -587,7 +587,7 @@ class GenericAsyncStreamingClient GRPC_FINAL }; static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, const ByteBuffer& req) { - return new ClientGenericRpcContextStreamingImpl( + return new ClientRpcContextGenericStreamingImpl( channel_id, stub, req, GenericAsyncStreamingClient::StartReq, GenericAsyncStreamingClient::CheckDone); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index b1e393dd40..4932271273 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -71,7 +71,7 @@ class AsyncQpsServerTest : public Server { ServerAsyncReaderWriter<ResponseType, RequestType> *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, - std::function<grpc::Status(const ServerConfig &, const RequestType *, + std::function<grpc::Status(const PayloadConfig &, const RequestType *, ResponseType *)> process_rpc) : Server(config) { @@ -94,7 +94,8 @@ class AsyncQpsServerTest : public Server { using namespace std::placeholders; - auto process_rpc_bound = std::bind(process_rpc, config, _1, _2); + auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), + _1, _2); for (int i = 0; i < 10000 / config.async_server_threads(); i++) { for (int j = 0; j < config.async_server_threads(); j++) { @@ -358,9 +359,10 @@ static void RegisterGenericService(ServerBuilder *builder, builder->RegisterAsyncGenericService(service); } -template <class RequestType, class ResponseType> -Status ProcessRPC(const ServerConfig &config, const RequestType *request, - ResponseType *response) { + +static Status ProcessSimpleRPC(const PayloadConfig&, + const SimpleRequest *request, + SimpleResponse *response) { if (request->response_size() > 0) { if (!Server::SetPayload(request->response_type(), request->response_size(), response->mutable_payload())) { @@ -370,9 +372,14 @@ Status ProcessRPC(const ServerConfig &config, const RequestType *request, return Status::OK; } -template <> -Status ProcessRPC(const ServerConfig &config, const ByteBuffer *request, - ByteBuffer *response) { +static Status ProcessGenericRPC(const PayloadConfig& payload_config, + const ByteBuffer *request, + ByteBuffer *response) { + int resp_size = payload_config.bytebuf_params().resp_size(); + std::unique_ptr<char> buf(new char[resp_size]); + gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size); + Slice slice(s, Slice::STEAL_REF); + *response = ByteBuffer(&slice, 1); return Status::OK; } @@ -384,7 +391,7 @@ std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) { config, RegisterBenchmarkService, &BenchmarkService::AsyncService::RequestUnaryCall, &BenchmarkService::AsyncService::RequestStreamingCall, - ProcessRPC<SimpleRequest, SimpleResponse>)); + ProcessSimpleRPC)); } std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) { return std::unique_ptr<Server>( @@ -392,7 +399,7 @@ std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) { grpc::GenericServerContext>( config, RegisterGenericService, nullptr, &grpc::AsyncGenericService::RequestCall, - ProcessRPC<ByteBuffer, ByteBuffer>)); + ProcessGenericRPC)); } } // namespace testing |