aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-01-07 14:08:09 -0800
committerGravatar Vijay Pai <vpai@google.com>2016-01-07 14:08:09 -0800
commit9f991e252dd1a5e58bc7d5be35d8bada883cab5a (patch)
tree953f256f30e0974f67ff789b4af4593d72ba53d5 /test/cpp
parent93beeb889531fde79e14f65e270a595bfb8dc8d4 (diff)
More changes needed for generic support
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/qps/client.h5
-rw-r--r--test/cpp/qps/client_async.cc18
-rw-r--r--test/cpp/qps/server_async.cc27
3 files changed, 29 insertions, 21 deletions
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 1a0a53d23b..1b5a3d4a07 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -118,8 +118,9 @@ class ClientRequestCreator<ByteBuffer> {
gpr_slice s = gpr_slice_from_copied_buffer(
buf.get(), payload_config.bytebuf_params().req_size());
Slice slice(s, Slice::STEAL_REF);
- std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1));
- req->MoveFrom(bbuf.get());
+ *req = ByteBuffer(&slice, 1);
+ // std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1));
+ // req->MoveFrom(bbuf.get());
} else {
GPR_ASSERT(false); // not appropriate for this specialization
}
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 087ea75bf4..553c97fd68 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -488,9 +488,9 @@ class AsyncStreamingClient GRPC_FINAL
}
};
-class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
+class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
public:
- ClientGenericRpcContextStreamingImpl(
+ ClientRpcContextGenericStreamingImpl(
int channel_id, grpc::GenericStub* stub, const ByteBuffer& req,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*,
@@ -502,16 +502,16 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
stub_(stub),
req_(req),
response_(),
- next_state_(&ClientGenericRpcContextStreamingImpl::ReqSent),
+ next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent),
callback_(on_done),
start_req_(start_req),
start_(Timer::Now()) {}
- ~ClientGenericRpcContextStreamingImpl() GRPC_OVERRIDE {}
+ ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
return (this->*next_state_)(ok, hist);
}
ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
- return new ClientGenericRpcContextStreamingImpl(channel_id_, stub_, req_,
+ return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_,
start_req_, callback_);
}
void Start(CompletionQueue* cq) GRPC_OVERRIDE {
@@ -528,7 +528,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
return (false);
}
start_ = Timer::Now();
- next_state_ = &ClientGenericRpcContextStreamingImpl::WriteDone;
+ next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone;
stream_->Write(req_, ClientRpcContext::tag(this));
return true;
}
@@ -536,7 +536,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
if (!ok) {
return (false);
}
- next_state_ = &ClientGenericRpcContextStreamingImpl::ReadDone;
+ next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone;
stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
}
@@ -548,7 +548,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
grpc::GenericStub* stub_;
ByteBuffer req_;
ByteBuffer response_;
- bool (ClientGenericRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
+ bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ByteBuffer*)> callback_;
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
@@ -587,7 +587,7 @@ class GenericAsyncStreamingClient GRPC_FINAL
};
static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub,
const ByteBuffer& req) {
- return new ClientGenericRpcContextStreamingImpl(
+ return new ClientRpcContextGenericStreamingImpl(
channel_id, stub, req, GenericAsyncStreamingClient::StartReq,
GenericAsyncStreamingClient::CheckDone);
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index b1e393dd40..4932271273 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -71,7 +71,7 @@ class AsyncQpsServerTest : public Server {
ServerAsyncReaderWriter<ResponseType, RequestType> *,
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_function,
- std::function<grpc::Status(const ServerConfig &, const RequestType *,
+ std::function<grpc::Status(const PayloadConfig &, const RequestType *,
ResponseType *)>
process_rpc)
: Server(config) {
@@ -94,7 +94,8 @@ class AsyncQpsServerTest : public Server {
using namespace std::placeholders;
- auto process_rpc_bound = std::bind(process_rpc, config, _1, _2);
+ auto process_rpc_bound = std::bind(process_rpc, config.payload_config(),
+ _1, _2);
for (int i = 0; i < 10000 / config.async_server_threads(); i++) {
for (int j = 0; j < config.async_server_threads(); j++) {
@@ -358,9 +359,10 @@ static void RegisterGenericService(ServerBuilder *builder,
builder->RegisterAsyncGenericService(service);
}
-template <class RequestType, class ResponseType>
-Status ProcessRPC(const ServerConfig &config, const RequestType *request,
- ResponseType *response) {
+
+static Status ProcessSimpleRPC(const PayloadConfig&,
+ const SimpleRequest *request,
+ SimpleResponse *response) {
if (request->response_size() > 0) {
if (!Server::SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
@@ -370,9 +372,14 @@ Status ProcessRPC(const ServerConfig &config, const RequestType *request,
return Status::OK;
}
-template <>
-Status ProcessRPC(const ServerConfig &config, const ByteBuffer *request,
- ByteBuffer *response) {
+static Status ProcessGenericRPC(const PayloadConfig& payload_config,
+ const ByteBuffer *request,
+ ByteBuffer *response) {
+ int resp_size = payload_config.bytebuf_params().resp_size();
+ std::unique_ptr<char> buf(new char[resp_size]);
+ gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size);
+ Slice slice(s, Slice::STEAL_REF);
+ *response = ByteBuffer(&slice, 1);
return Status::OK;
}
@@ -384,7 +391,7 @@ std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) {
config, RegisterBenchmarkService,
&BenchmarkService::AsyncService::RequestUnaryCall,
&BenchmarkService::AsyncService::RequestStreamingCall,
- ProcessRPC<SimpleRequest, SimpleResponse>));
+ ProcessSimpleRPC));
}
std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) {
return std::unique_ptr<Server>(
@@ -392,7 +399,7 @@ std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) {
grpc::GenericServerContext>(
config, RegisterGenericService, nullptr,
&grpc::AsyncGenericService::RequestCall,
- ProcessRPC<ByteBuffer, ByteBuffer>));
+ ProcessGenericRPC));
}
} // namespace testing