From f9e6adf998ed36479ccbb8eb3cdc58b02cc161dd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 6 May 2015 11:45:59 -0700 Subject: Completion queue binding for new requests API change Move completion queue binding for new requests to the new request request time, not server instantiation time. --- src/cpp/server/async_generic_service.cc | 10 ++--- src/cpp/server/server.cc | 71 +++++++++++++++++++-------------- src/cpp/server/server_builder.cc | 9 +++++ 3 files changed, 55 insertions(+), 35 deletions(-) (limited to 'src/cpp') diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc index 07cb933715..2e99afcb5f 100644 --- a/src/cpp/server/async_generic_service.cc +++ b/src/cpp/server/async_generic_service.cc @@ -39,12 +39,10 @@ namespace grpc { void AsyncGenericService::RequestCall( GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, - CompletionQueue* cq, void* tag) { - server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag); -} - -CompletionQueue* AsyncGenericService::completion_queue() { - return &server_->cq_; + CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, + void* tag) { + server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq, + tag); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 08c956601c..e9c4f4eaaf 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -78,7 +78,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - void Request(grpc_server* server) { + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(!in_flight_); in_flight_ = true; cq_ = grpc_completion_queue_create(); @@ -86,7 +86,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_server_request_registered_call( server, tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, - this)); + notify_cq, this)); } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { @@ -179,16 +179,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) { +static grpc_server* CreateServer(int max_message_size) { if (max_message_size > 0) { grpc_arg arg; arg.type = GRPC_ARG_INTEGER; arg.key = const_cast(GRPC_ARG_MAX_MESSAGE_LENGTH); arg.value.integer = max_message_size; grpc_channel_args args = {1, &arg}; - return grpc_server_create(cq, &args); + return grpc_server_create(&args); } else { - return grpc_server_create(cq, nullptr); + return grpc_server_create(nullptr); } } @@ -199,9 +199,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list), - server_(CreateServer(cq_.cq(), max_message_size)), + server_(CreateServer(max_message_size)), thread_pool_(thread_pool), - thread_pool_owned_(thread_pool_owned) {} + thread_pool_owned_(thread_pool_owned) { + grpc_server_register_completion_queue(server_, cq_.cq()); +} Server::~Server() { { @@ -221,8 +223,7 @@ Server::~Server() { bool Server::RegisterService(RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod* method = service->GetMethod(i); - void* tag = - grpc_server_register_method(server_, method->name(), nullptr, cq_.cq()); + void* tag = grpc_server_register_method(server_, method->name(), nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -240,9 +241,8 @@ bool Server::RegisterAsyncService(AsynchronousService* service) { service->dispatch_impl_ = this; service->request_args_ = new void*[service->method_count_]; for (size_t i = 0; i < service->method_count_; ++i) { - void* tag = - grpc_server_register_method(server_, service->method_names_[i], nullptr, - service->completion_queue()->cq()); + void* tag = grpc_server_register_method(server_, service->method_names_[i], + nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", service->method_names_[i]); @@ -273,7 +273,7 @@ bool Server::Start() { // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { - m->Request(server_); + m->Request(server_, cq_.cq()); } ScheduleCallback(); @@ -316,12 +316,13 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { public: AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, CompletionQueue* cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : tag_(tag), request_(request), stream_(stream), - cq_(cq), + call_cq_(call_cq), + notification_cq_(notification_cq), ctx_(ctx), generic_ctx_(nullptr), server_(server), @@ -329,18 +330,22 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { payload_(nullptr) { memset(&array_, 0, sizeof(array_)); grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); grpc_server_request_registered_call( server->server_, registered_method, &call_, &call_details_.deadline, - &array_, request ? &payload_ : nullptr, cq->cq(), this); + &array_, request ? &payload_ : nullptr, call_cq->cq(), + notification_cq->cq(), this); } AsyncRequest(Server* server, GenericServerContext* ctx, - ServerAsyncStreamingInterface* stream, CompletionQueue* cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : tag_(tag), request_(nullptr), stream_(stream), - cq_(cq), + call_cq_(call_cq), + notification_cq_(notification_cq), ctx_(nullptr), generic_ctx_(ctx), server_(server), @@ -348,8 +353,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { payload_(nullptr) { memset(&array_, 0, sizeof(array_)); grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); grpc_server_request_call(server->server_, &call_, &call_details_, &array_, - cq->cq(), this); + call_cq->cq(), notification_cq->cq(), this); } ~AsyncRequest() { @@ -392,8 +399,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } } ctx->call_ = call_; - ctx->cq_ = cq_; - Call call(call_, server_, cq_, server_->max_message_size_); + ctx->cq_ = call_cq_; + Call call(call_, server_, call_cq_, server_->max_message_size_); if (orig_status && call_) { ctx->BeginCompletionOp(&call); } @@ -407,7 +414,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; grpc::protobuf::Message* const request_; ServerAsyncStreamingInterface* const stream_; - CompletionQueue* const cq_; + CompletionQueue* const call_cq_; + ServerCompletionQueue* const notification_cq_; ServerContext* const ctx_; GenericServerContext* const generic_ctx_; Server* const server_; @@ -420,14 +428,19 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { void Server::RequestAsyncCall(void* registered_method, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { - new AsyncRequest(this, registered_method, context, request, stream, cq, tag); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new AsyncRequest(this, registered_method, context, request, stream, call_cq, + notification_cq, tag); } void Server::RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { - new AsyncRequest(this, context, stream, cq, tag); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new AsyncRequest(this, context, stream, call_cq, notification_cq, tag); } void Server::ScheduleCallback() { @@ -446,7 +459,7 @@ void Server::RunRpc() { ScheduleCallback(); if (ok) { SyncRequest::CallData cd(this, mrd); - mrd->Request(server_); + mrd->Request(server_, cq_.cq()); cd.Run(); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index e48d1eeb42..4bcbd82952 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -44,6 +44,12 @@ namespace grpc { ServerBuilder::ServerBuilder() : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} +std::unique_ptr ServerBuilder::AddCompletionQueue() { + ServerCompletionQueue* cq = new ServerCompletionQueue(); + cqs_.push_back(cq); + return std::unique_ptr(cq); +} + void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); } @@ -88,6 +94,9 @@ std::unique_ptr ServerBuilder::BuildAndStart() { } std::unique_ptr server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); + for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq()); + } for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService(*service)) { -- cgit v1.2.3