diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/compiler/cpp_generator.cc | 105 | ||||
-rw-r--r-- | src/core/surface/server.c | 67 | ||||
-rw-r--r-- | src/core/surface/server.h | 3 | ||||
-rw-r--r-- | src/core/surface/server_create.c | 5 | ||||
-rw-r--r-- | src/cpp/server/async_generic_service.cc | 10 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 71 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 9 |
7 files changed, 146 insertions, 124 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 735e7e58a8..46c842a7d6 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -162,6 +162,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, "class CompletionQueue;\n" "class ChannelInterface;\n" "class RpcService;\n" + "class ServerCompletionQueue;\n" "class ServerContext;\n"; if (HasUnaryCalls(file)) { temp.append( @@ -260,7 +261,7 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer, "std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>> " "$Method$(::grpc::ClientContext* context);\n"); printer->Print(*vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " + "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " "$Request$, $Response$>> " "Async$Method$(::grpc::ClientContext* context, " "::grpc::CompletionQueue* cq, void* tag);\n"); @@ -318,30 +319,37 @@ void PrintHeaderServerMethodAsync( (*vars)["Response"] = grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, $Request$* request, " - "::grpc::ServerAsyncResponseWriter< $Response$>* response, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, $Request$* request, " + "::grpc::ServerAsyncResponseWriter< $Response$>* response, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (ClientOnlyStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, " - "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (ServerOnlyStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, $Request$* request, " - "::grpc::ServerAsyncWriter< $Response$>* writer, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, $Request$* request, " + "::grpc::ServerAsyncWriter< $Response$>* writer, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, "void Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } } @@ -403,7 +411,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer, " public:\n"); printer->Indent(); (*vars)["MethodCount"] = as_string(service->method_count()); - printer->Print("explicit AsyncService(::grpc::CompletionQueue* cq);\n"); + printer->Print("explicit AsyncService();\n"); printer->Print("~AsyncService() {};\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderServerMethodAsync(printer, service->method(i), vars); @@ -686,36 +694,43 @@ void PrintSourceServerAsyncMethod( (*vars)["Response"] = grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "$Request$* request, " - "::grpc::ServerAsyncResponseWriter< $Response$>* response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "$Request$* request, " + "::grpc::ServerAsyncResponseWriter< $Response$>* response, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestAsyncUnary($Idx$, context, " - "request, response, cq, tag);\n"); + "request, response, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (ClientOnlyStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestClientStreaming($Idx$, " - "context, reader, cq, tag);\n"); + "context, reader, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (ServerOnlyStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "$Request$* request, " - "::grpc::ServerAsyncWriter< $Response$>* writer, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " AsynchronousService::RequestServerStreaming($Idx$, " - "context, request, writer, cq, tag);\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "$Request$* request, " + "::grpc::ServerAsyncWriter< $Response$>* writer, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); + printer->Print( + *vars, + " AsynchronousService::RequestServerStreaming($Idx$, " + "context, request, writer, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (BidiStreaming(method)) { printer->Print( @@ -723,10 +738,11 @@ void PrintSourceServerAsyncMethod( "void $ns$$Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " - "::grpc::CompletionQueue* cq, void *tag) {\n"); + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestBidiStreaming($Idx$, " - "context, stream, cq, tag);\n"); + "context, stream, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } } @@ -788,9 +804,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, (*vars)["MethodCount"] = as_string(service->method_count()); printer->Print(*vars, - "$ns$$Service$::AsyncService::AsyncService(::grpc::" - "CompletionQueue* cq) : " - "::grpc::AsynchronousService(cq, " + "$ns$$Service$::AsyncService::AsyncService() : " + "::grpc::AsynchronousService(" "$prefix$$Service$_method_names, $MethodCount$) " "{}\n\n"); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 01644b4471..96c1b7c3eb 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct { requested_call_type type; void *tag; + grpc_completion_queue *cq_bound_to_call; + grpc_completion_queue *cq_for_notification; + grpc_call **call; union { struct { - grpc_completion_queue *cq_bind; - grpc_call **call; grpc_call_details *details; grpc_metadata_array *initial_metadata; } batch; struct { - grpc_completion_queue *cq_bind; - grpc_call **call; registered_method *registered_method; gpr_timespec *deadline; grpc_metadata_array *initial_metadata; @@ -103,7 +102,6 @@ struct registered_method { char *host; call_data *pending; requested_call_array requested; - grpc_completion_queue *cq; registered_method *next; }; @@ -130,7 +128,6 @@ struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; grpc_channel_args *channel_args; - grpc_completion_queue *unregistered_cq; grpc_completion_queue **cqs; grpc_pollset **pollsets; @@ -602,7 +599,8 @@ static const grpc_channel_filter server_surface_filter = { destroy_channel_elem, "server", }; -static void addcq(grpc_server *server, grpc_completion_queue *cq) { +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq) { size_t i, n; for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; @@ -614,8 +612,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { server->cqs[n] = cq; } -grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, - grpc_channel_filter **filters, +grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args) { size_t i; @@ -626,12 +623,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); memset(server, 0, sizeof(grpc_server)); - if (cq) addcq(server, cq); gpr_mu_init(&server->mu); gpr_cv_init(&server->cv); - server->unregistered_cq = cq; /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = @@ -667,8 +662,7 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, - grpc_completion_queue *cq_new_rpc) { + const char *host) { registered_method *m; if (!method) { gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); @@ -681,13 +675,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method, return NULL; } } - addcq(server, cq_new_rpc); m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; - m->cq = cq_new_rpc; server->registered_methods = m; return m; } @@ -1012,17 +1004,18 @@ static grpc_call_error queue_call_request(grpc_server *server, } } -grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq_bind, - void *tag) { +grpc_call_error grpc_server_request_call( + grpc_server *server, grpc_call **call, grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; - grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; - rc.data.batch.cq_bind = cq_bind; - rc.data.batch.call = call; + rc.cq_bound_to_call = cq_bound_to_call; + rc.cq_for_notification = cq_for_notification; + rc.call = call; rc.data.batch.details = details; rc.data.batch.initial_metadata = initial_metadata; return queue_call_request(server, &rc); @@ -1031,14 +1024,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_bind, void *tag) { + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; registered_method *registered_method = rm; - grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; - rc.data.registered.cq_bind = cq_bind; - rc.data.registered.call = call; + rc.cq_bound_to_call = cq_bound_to_call; + rc.cq_for_notification = cq_for_notification; + rc.call = call; rc.data.registered.registered_method = registered_method; rc.data.registered.deadline = deadline; rc.data.registered.initial_metadata = initial_metadata; @@ -1076,6 +1071,9 @@ static void begin_call(grpc_server *server, call_data *calld, fill in the metadata array passed by the client, we need to perform an ioreq op, that should complete immediately. */ + grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); + *rc->call = calld->call; + calld->cq_new = rc->cq_for_notification; switch (rc->type) { case BATCH_CALL: cpstr(&rc->data.batch.details->host, @@ -1083,18 +1081,13 @@ static void begin_call(grpc_server *server, call_data *calld, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); - *rc->data.batch.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; - calld->cq_new = server->unregistered_cq; publish = publish_registered_or_batch; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind); - *rc->data.registered.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; r++; @@ -1103,7 +1096,6 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } - calld->cq_new = rc->data.registered.registered_method->cq; publish = publish_registered_or_batch; break; } @@ -1114,20 +1106,17 @@ static void begin_call(grpc_server *server, call_data *calld, } static void fail_call(grpc_server *server, requested_call *rc) { + *rc->call = NULL; switch (rc->type) { case BATCH_CALL: - *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, - GRPC_OP_ERROR); break; case REGISTERED_CALL: - *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL, - do_nothing, NULL, GRPC_OP_ERROR); break; } + grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); } static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 2cfa38fa43..c6331033e0 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -39,8 +39,7 @@ #include "src/core/transport/transport.h" /* Create a server */ -grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, - grpc_channel_filter **filters, +grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args); diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index f629c7c72d..b7390675ad 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -35,7 +35,6 @@ #include "src/core/surface/completion_queue.h" #include "src/core/surface/server.h" -grpc_server *grpc_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args) { - return grpc_server_create_from_filters(cq, NULL, 0, args); +grpc_server *grpc_server_create(const grpc_channel_args *args) { + return grpc_server_create_from_filters(NULL, 0, args); } diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc index 07cb933715..2e99afcb5f 100644 --- a/src/cpp/server/async_generic_service.cc +++ b/src/cpp/server/async_generic_service.cc @@ -39,12 +39,10 @@ namespace grpc { void AsyncGenericService::RequestCall( GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, - CompletionQueue* cq, void* tag) { - server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag); -} - -CompletionQueue* AsyncGenericService::completion_queue() { - return &server_->cq_; + CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, + void* tag) { + server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq, + tag); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 08c956601c..e9c4f4eaaf 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -78,7 +78,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - void Request(grpc_server* server) { + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(!in_flight_); in_flight_ = true; cq_ = grpc_completion_queue_create(); @@ -86,7 +86,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_server_request_registered_call( server, tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, - this)); + notify_cq, this)); } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { @@ -179,16 +179,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) { +static grpc_server* CreateServer(int max_message_size) { if (max_message_size > 0) { grpc_arg arg; arg.type = GRPC_ARG_INTEGER; arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); arg.value.integer = max_message_size; grpc_channel_args args = {1, &arg}; - return grpc_server_create(cq, &args); + return grpc_server_create(&args); } else { - return grpc_server_create(cq, nullptr); + return grpc_server_create(nullptr); } } @@ -199,9 +199,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), - server_(CreateServer(cq_.cq(), max_message_size)), + server_(CreateServer(max_message_size)), thread_pool_(thread_pool), - thread_pool_owned_(thread_pool_owned) {} + thread_pool_owned_(thread_pool_owned) { + grpc_server_register_completion_queue(server_, cq_.cq()); +} Server::~Server() { { @@ -221,8 +223,7 @@ Server::~Server() { bool Server::RegisterService(RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod* method = service->GetMethod(i); - void* tag = - grpc_server_register_method(server_, method->name(), nullptr, cq_.cq()); + void* tag = grpc_server_register_method(server_, method->name(), nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -240,9 +241,8 @@ bool Server::RegisterAsyncService(AsynchronousService* service) { service->dispatch_impl_ = this; service->request_args_ = new void*[service->method_count_]; for (size_t i = 0; i < service->method_count_; ++i) { - void* tag = - grpc_server_register_method(server_, service->method_names_[i], nullptr, - service->completion_queue()->cq()); + void* tag = grpc_server_register_method(server_, service->method_names_[i], + nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", service->method_names_[i]); @@ -273,7 +273,7 @@ bool Server::Start() { // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { - m->Request(server_); + m->Request(server_, cq_.cq()); } ScheduleCallback(); @@ -316,12 +316,13 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { public: AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, CompletionQueue* cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : tag_(tag), request_(request), stream_(stream), - cq_(cq), + call_cq_(call_cq), + notification_cq_(notification_cq), ctx_(ctx), generic_ctx_(nullptr), server_(server), @@ -329,18 +330,22 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { payload_(nullptr) { memset(&array_, 0, sizeof(array_)); grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); grpc_server_request_registered_call( server->server_, registered_method, &call_, &call_details_.deadline, - &array_, request ? &payload_ : nullptr, cq->cq(), this); + &array_, request ? &payload_ : nullptr, call_cq->cq(), + notification_cq->cq(), this); } AsyncRequest(Server* server, GenericServerContext* ctx, - ServerAsyncStreamingInterface* stream, CompletionQueue* cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : tag_(tag), request_(nullptr), stream_(stream), - cq_(cq), + call_cq_(call_cq), + notification_cq_(notification_cq), ctx_(nullptr), generic_ctx_(ctx), server_(server), @@ -348,8 +353,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { payload_(nullptr) { memset(&array_, 0, sizeof(array_)); grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); grpc_server_request_call(server->server_, &call_, &call_details_, &array_, - cq->cq(), this); + call_cq->cq(), notification_cq->cq(), this); } ~AsyncRequest() { @@ -392,8 +399,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } } ctx->call_ = call_; - ctx->cq_ = cq_; - Call call(call_, server_, cq_, server_->max_message_size_); + ctx->cq_ = call_cq_; + Call call(call_, server_, call_cq_, server_->max_message_size_); if (orig_status && call_) { ctx->BeginCompletionOp(&call); } @@ -407,7 +414,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; grpc::protobuf::Message* const request_; ServerAsyncStreamingInterface* const stream_; - CompletionQueue* const cq_; + CompletionQueue* const call_cq_; + ServerCompletionQueue* const notification_cq_; ServerContext* const ctx_; GenericServerContext* const generic_ctx_; Server* const server_; @@ -420,14 +428,19 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { void Server::RequestAsyncCall(void* registered_method, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { - new AsyncRequest(this, registered_method, context, request, stream, cq, tag); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new AsyncRequest(this, registered_method, context, request, stream, call_cq, + notification_cq, tag); } void Server::RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { - new AsyncRequest(this, context, stream, cq, tag); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new AsyncRequest(this, context, stream, call_cq, notification_cq, tag); } void Server::ScheduleCallback() { @@ -446,7 +459,7 @@ void Server::RunRpc() { ScheduleCallback(); if (ok) { SyncRequest::CallData cd(this, mrd); - mrd->Request(server_); + mrd->Request(server_, cq_.cq()); cd.Run(); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index e48d1eeb42..4bcbd82952 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -44,6 +44,12 @@ namespace grpc { ServerBuilder::ServerBuilder() : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} +std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { + ServerCompletionQueue* cq = new ServerCompletionQueue(); + cqs_.push_back(cq); + return std::unique_ptr<ServerCompletionQueue>(cq); +} + void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); } @@ -88,6 +94,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } std::unique_ptr<Server> server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); + for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq()); + } for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService(*service)) { |