aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-10 10:12:02 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-10 10:12:02 -0700
commita603a450a979a54ec3e5d8106be0ffe64a7e256c (patch)
tree8c431dc1c265107c82b8a3b30c15792195d0cfd5
parentef8bdbf4e2a8289fc8bbc51af5ca2465f9ec27c2 (diff)
parent4191ea7d9421cfb2b6da08b9e4f0abfd654bd542 (diff)
Merge github.com:grpc/grpc into we-dont-need-no-backup
-rw-r--r--src/core/transport/stream_op.h6
-rw-r--r--test/cpp/qps/client_async.cc2
-rw-r--r--test/cpp/qps/server_async.cc55
3 files changed, 30 insertions, 33 deletions
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 95497a3cc8..5496504229 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -126,10 +126,8 @@ typedef struct grpc_stream_op {
} data;
} grpc_stream_op;
-/* A stream op buffer is a wrapper around stream operations that is dynamically
- extendable.
- TODO(ctiller): inline a few elements into the struct, to avoid common case
- per-call allocations. */
+/** A stream op buffer is a wrapper around stream operations that is
+ * dynamically extendable. */
typedef struct grpc_stream_op_buffer {
grpc_stream_op *ops;
size_t nops;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 921836e201..1b7a8d26b2 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -62,7 +62,7 @@ typedef std::list<grpc_time> deadline_list;
class ClientRpcContext {
public:
- ClientRpcContext(int ch) : channel_id_(ch) {}
+ explicit ClientRpcContext(int ch) : channel_id_(ch) {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0;
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4b0678bb2c..210aef4fd6 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server {
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
- srv_cq_ = builder.AddCompletionQueue();
+ for (int i = 0; i < config.threads(); i++) {
+ srv_cqs_.emplace_back(std::move(builder.AddCompletionQueue()));
+ }
server_ = builder.BuildAndStart();
using namespace std::placeholders;
- request_unary_ =
- std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_,
- _1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4);
- request_streaming_ =
- std::bind(&TestService::AsyncService::RequestStreamingCall,
- &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3);
- for (int i = 0; i < 100; i++) {
- contexts_.push_front(
- new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
- request_unary_, ProcessRPC));
- contexts_.push_front(
- new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- request_streaming_, ProcessRPC));
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < config.threads(); j++) {
+ auto request_unary = std::bind(
+ &TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
+ _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
+ auto request_streaming = std::bind(
+ &TestService::AsyncService::RequestStreamingCall, &async_service_,
+ _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
+ contexts_.push_front(
+ new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
+ request_unary, ProcessRPC));
+ contexts_.push_front(
+ new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
+ request_streaming, ProcessRPC));
+ }
}
for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cq_->Next(&got_tag, &ok)) {
+ while (srv_cqs_[i]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
@@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server {
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
- srv_cq_->Shutdown();
- bool ok;
- void *got_tag;
- while (srv_cq_->Next(&got_tag, &ok))
- ;
+ for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
+ (*cq)->Shutdown();
+ bool ok;
+ void *got_tag;
+ while ((*cq)->Next(&got_tag, &ok))
+ ;
+ }
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
@@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server {
}
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
- std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
+ std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
TestService::AsyncService async_service_;
- std::function<void(ServerContext *, SimpleRequest *,
- grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
- request_unary_;
- std::function<void(
- ServerContext *,
- grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
- request_streaming_;
std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_;