aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/qps_worker.cc
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-02-01 08:17:27 -0800
committerGravatar Vijay Pai <vpai@google.com>2016-02-01 08:17:27 -0800
commit03ba4d746a34855c2d6498f23b06f8a0ae6592eb (patch)
tree9035598a15c07b1afaceb0ebaa8663104393760f /test/cpp/qps/qps_worker.cc
parentf524844da919716bc5563bd60f33836d7ba78877 (diff)
parentc1fdfec641b2b27c553a8b0bb00b47e56e23bfa1 (diff)
Merge branch 'master' into corelimit2
Diffstat (limited to 'test/cpp/qps/qps_worker.cc')
-rw-r--r--test/cpp/qps/qps_worker.cc39
1 files changed, 36 insertions, 3 deletions
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index d259c20fb7..5cb5850fd4 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -61,6 +61,11 @@ namespace grpc {
namespace testing {
static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
+ gpr_log(GPR_INFO, "Starting client of type %s %s %d",
+ ClientType_Name(config.client_type()).c_str(),
+ RpcType_Name(config.rpc_type()).c_str(),
+ config.payload_config().has_bytebuf_params());
+
switch (config.client_type()) {
case ClientType::SYNC_CLIENT:
return (config.rpc_type() == RpcType::UNARY)
@@ -79,11 +84,20 @@ static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
}
static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
+ gpr_log(GPR_INFO, "Starting server of type %s",
+ ServerType_Name(config.server_type()).c_str());
+
+ if (config.core_limit() > 0) {
+ LimitCores(config.core_limit());
+ }
+
switch (config.server_type()) {
case ServerType::SYNC_SERVER:
return CreateSynchronousServer(config);
case ServerType::ASYNC_SERVER:
return CreateAsyncServer(config);
+ case ServerType::ASYNC_GENERIC_SERVER:
+ return CreateAsyncGenericServer(config);
default:
abort();
}
@@ -92,7 +106,8 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
public:
- explicit WorkerServiceImpl() : acquired_(false) {}
+ explicit WorkerServiceImpl(int server_port)
+ : acquired_(false), server_port_(server_port) {}
Status RunClient(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream)
@@ -163,22 +178,29 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
+ gpr_log(GPR_INFO, "RunClientBody: about to create client");
auto client = CreateClient(args.setup());
if (!client) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
+ gpr_log(GPR_INFO, "RunClientBody: client created");
ClientStatus status;
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
}
+ gpr_log(GPR_INFO, "RunClientBody: creation status reported");
while (stream->Read(&args)) {
+ gpr_log(GPR_INFO, "RunClientBody: Message read");
if (!args.has_mark()) {
+ gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
return Status(StatusCode::INVALID_ARGUMENT, "");
}
*status.mutable_stats() = client->Mark(args.mark().reset());
stream->Write(status);
+ gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
+ gpr_log(GPR_INFO, "RunClientBody: Returning");
return Status::OK;
}
@@ -191,33 +213,44 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
+ if (server_port_ != 0) {
+ args.mutable_setup()->set_port(server_port_);
+ }
+ gpr_log(GPR_INFO, "RunServerBody: about to create server");
auto server = CreateServer(args.setup());
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
+ gpr_log(GPR_INFO, "RunServerBody: server created");
ServerStatus status;
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
}
+ gpr_log(GPR_INFO, "RunServerBody: creation status reported");
while (stream->Read(&args)) {
+ gpr_log(GPR_INFO, "RunServerBody: Message read");
if (!args.has_mark()) {
+ gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
return Status(StatusCode::INVALID_ARGUMENT, "");
}
*status.mutable_stats() = server->Mark(args.mark().reset());
stream->Write(status);
+ gpr_log(GPR_INFO, "RunServerBody: Mark response given");
}
+ gpr_log(GPR_INFO, "RunServerBody: Returning");
return Status::OK;
}
std::mutex mu_;
bool acquired_;
+ int server_port_;
};
-QpsWorker::QpsWorker(int driver_port) {
- impl_.reset(new WorkerServiceImpl());
+QpsWorker::QpsWorker(int driver_port, int server_port) {
+ impl_.reset(new WorkerServiceImpl(server_port));
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", driver_port);