aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-05-06 11:45:59 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-05-06 11:45:59 -0700
commitf9e6adf998ed36479ccbb8eb3cdc58b02cc161dd (patch)
treeb1c9c0efd3bfc4984effb9747b0f09e208a1d768 /src/cpp/server
parent97c5559040204dcff338df79b16390014fbc82c9 (diff)
Completion queue binding for new requests API change
Move completion queue binding for new requests to the new request request time, not server instantiation time.
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/async_generic_service.cc10
-rw-r--r--src/cpp/server/server.cc71
-rw-r--r--src/cpp/server/server_builder.cc9
3 files changed, 55 insertions, 35 deletions
diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc
index 07cb933715..2e99afcb5f 100644
--- a/src/cpp/server/async_generic_service.cc
+++ b/src/cpp/server/async_generic_service.cc
@@ -39,12 +39,10 @@ namespace grpc {
void AsyncGenericService::RequestCall(
GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer,
- CompletionQueue* cq, void* tag) {
- server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag);
-}
-
-CompletionQueue* AsyncGenericService::completion_queue() {
- return &server_->cq_;
+ CompletionQueue* call_cq, ServerCompletionQueue* notification_cq,
+ void* tag) {
+ server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq,
+ tag);
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 08c956601c..e9c4f4eaaf 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -78,7 +78,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- void Request(grpc_server* server) {
+ void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(!in_flight_);
in_flight_ = true;
cq_ = grpc_completion_queue_create();
@@ -86,7 +86,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
- this));
+ notify_cq, this));
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
@@ -179,16 +179,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
-grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) {
+static grpc_server* CreateServer(int max_message_size) {
if (max_message_size > 0) {
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.value.integer = max_message_size;
grpc_channel_args args = {1, &arg};
- return grpc_server_create(cq, &args);
+ return grpc_server_create(&args);
} else {
- return grpc_server_create(cq, nullptr);
+ return grpc_server_create(nullptr);
}
}
@@ -199,9 +199,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
- server_(CreateServer(cq_.cq(), max_message_size)),
+ server_(CreateServer(max_message_size)),
thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned) {}
+ thread_pool_owned_(thread_pool_owned) {
+ grpc_server_register_completion_queue(server_, cq_.cq());
+}
Server::~Server() {
{
@@ -221,8 +223,7 @@ Server::~Server() {
bool Server::RegisterService(RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
- void* tag =
- grpc_server_register_method(server_, method->name(), nullptr, cq_.cq());
+ void* tag = grpc_server_register_method(server_, method->name(), nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
@@ -240,9 +241,8 @@ bool Server::RegisterAsyncService(AsynchronousService* service) {
service->dispatch_impl_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
- void* tag =
- grpc_server_register_method(server_, service->method_names_[i], nullptr,
- service->completion_queue()->cq());
+ void* tag = grpc_server_register_method(server_, service->method_names_[i],
+ nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
service->method_names_[i]);
@@ -273,7 +273,7 @@ bool Server::Start() {
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
- m->Request(server_);
+ m->Request(server_, cq_.cq());
}
ScheduleCallback();
@@ -316,12 +316,13 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
- void* tag)
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
: tag_(tag),
request_(request),
stream_(stream),
- cq_(cq),
+ call_cq_(call_cq),
+ notification_cq_(notification_cq),
ctx_(ctx),
generic_ctx_(nullptr),
server_(server),
@@ -329,18 +330,22 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
grpc_server_request_registered_call(
server->server_, registered_method, &call_, &call_details_.deadline,
- &array_, request ? &payload_ : nullptr, cq->cq(), this);
+ &array_, request ? &payload_ : nullptr, call_cq->cq(),
+ notification_cq->cq(), this);
}
AsyncRequest(Server* server, GenericServerContext* ctx,
- ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
- void* tag)
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
: tag_(tag),
request_(nullptr),
stream_(stream),
- cq_(cq),
+ call_cq_(call_cq),
+ notification_cq_(notification_cq),
ctx_(nullptr),
generic_ctx_(ctx),
server_(server),
@@ -348,8 +353,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
- cq->cq(), this);
+ call_cq->cq(), notification_cq->cq(), this);
}
~AsyncRequest() {
@@ -392,8 +399,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
}
}
ctx->call_ = call_;
- ctx->cq_ = cq_;
- Call call(call_, server_, cq_, server_->max_message_size_);
+ ctx->cq_ = call_cq_;
+ Call call(call_, server_, call_cq_, server_->max_message_size_);
if (orig_status && call_) {
ctx->BeginCompletionOp(&call);
}
@@ -407,7 +414,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
grpc::protobuf::Message* const request_;
ServerAsyncStreamingInterface* const stream_;
- CompletionQueue* const cq_;
+ CompletionQueue* const call_cq_;
+ ServerCompletionQueue* const notification_cq_;
ServerContext* const ctx_;
GenericServerContext* const generic_ctx_;
Server* const server_;
@@ -420,14 +428,19 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
- new AsyncRequest(this, registered_method, context, request, stream, cq, tag);
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
+ new AsyncRequest(this, registered_method, context, request, stream, call_cq,
+ notification_cq, tag);
}
void Server::RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
- new AsyncRequest(this, context, stream, cq, tag);
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
+ new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
}
void Server::ScheduleCallback() {
@@ -446,7 +459,7 @@ void Server::RunRpc() {
ScheduleCallback();
if (ok) {
SyncRequest::CallData cd(this, mrd);
- mrd->Request(server_);
+ mrd->Request(server_, cq_.cq());
cd.Run();
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index e48d1eeb42..4bcbd82952 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -44,6 +44,12 @@ namespace grpc {
ServerBuilder::ServerBuilder()
: max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {}
+std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
+ ServerCompletionQueue* cq = new ServerCompletionQueue();
+ cqs_.push_back(cq);
+ return std::unique_ptr<ServerCompletionQueue>(cq);
+}
+
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service());
}
@@ -88,6 +94,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_));
+ for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
+ grpc_server_register_completion_queue(server->server_, (*cq)->cq());
+ }
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService(*service)) {