diff options
author | Craig Tiller <ctiller@google.com> | 2015-05-06 11:45:59 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-05-06 11:45:59 -0700 |
commit | f9e6adf998ed36479ccbb8eb3cdc58b02cc161dd (patch) | |
tree | b1c9c0efd3bfc4984effb9747b0f09e208a1d768 | |
parent | 97c5559040204dcff338df79b16390014fbc82c9 (diff) |
Completion queue binding for new requests API change
Move completion queue binding for new requests to the new request
request time, not server instantiation time.
47 files changed, 366 insertions, 305 deletions
@@ -305,7 +305,7 @@ E = @echo Q = @ endif -VERSION = 0.7.0.0 +VERSION = 0.8.0.0 CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES)) CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS) diff --git a/build.json b/build.json index 10fd72d99e..217d84cdea 100644 --- a/build.json +++ b/build.json @@ -6,7 +6,7 @@ "#": "The public version number of the library.", "version": { "major": 0, - "minor": 7, + "minor": 8, "micro": 0, "build": 0 } diff --git a/include/grpc++/async_generic_service.h b/include/grpc++/async_generic_service.h index 911d31cb1f..b435c6e73d 100644 --- a/include/grpc++/async_generic_service.h +++ b/include/grpc++/async_generic_service.h @@ -65,10 +65,8 @@ class AsyncGenericService GRPC_FINAL { void RequestCall(GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, - CompletionQueue* cq, void* tag); - - // The new rpc event should be obtained from this completion queue. - CompletionQueue* completion_queue(); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag); private: friend class Server; diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 5c2b1cce93..e8429c8f41 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -58,6 +58,7 @@ class ServerReaderWriter; class CompletionQueue; class Server; +class ServerBuilder; class ServerContext; class CompletionQueueTag { @@ -137,6 +138,12 @@ class CompletionQueue : public GrpcLibrary { grpc_completion_queue* cq_; // owned }; +class ServerCompletionQueue : public CompletionQueue { + private: + friend class ServerBuilder; + ServerCompletionQueue() {} +}; + } // namespace grpc #endif // GRPCXX_COMPLETION_QUEUE_H diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index 7cd3ddad6b..bc39bb82ac 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -39,8 +39,10 @@ namespace grpc { class Call; +class CompletionQueue; class RpcService; class Server; +class ServerCompletionQueue; class ServerContext; class Status; @@ -70,52 +72,55 @@ class AsynchronousService { ServerContext* context, ::grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) = 0; + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) = 0; }; - AsynchronousService(CompletionQueue* cq, const char** method_names, - size_t method_count) - : cq_(cq), - dispatch_impl_(nullptr), + AsynchronousService(const char** method_names, size_t method_count) + : dispatch_impl_(nullptr), method_names_(method_names), method_count_(method_count), request_args_(nullptr) {} ~AsynchronousService() { delete[] request_args_; } - CompletionQueue* completion_queue() const { return cq_; } - protected: void RequestAsyncUnary(int index, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestClientStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestServerStreaming(int index, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestBidiStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } private: friend class Server; - CompletionQueue* const cq_; DispatchImpl* dispatch_impl_; const char** const method_names_; size_t method_count_; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index b2b9044dca..50a2416321 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -101,11 +101,15 @@ class Server GRPC_FINAL : public GrpcLibrary, void RequestAsyncCall(void* registered_method, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) GRPC_OVERRIDE; + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) GRPC_OVERRIDE; void RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag); + CompletionQueue* cq, + ServerCompletionQueue* notification_cq, + void* tag); const int max_message_size_; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 7155c7fd46..ecee475e3e 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -46,6 +46,7 @@ class AsynchronousService; class CompletionQueue; class RpcService; class Server; +class ServerCompletionQueue; class ServerCredentials; class SynchronousService; class ThreadPoolInterface; @@ -82,6 +83,11 @@ class ServerBuilder { // Does not take ownership. void SetThreadPool(ThreadPoolInterface* thread_pool); + // Add a completion queue for handling asynchronous services + // Caller is required to keep this completion queue live until calling + // BuildAndStart() + std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(); + // Return a running server which is ready for processing rpcs. std::unique_ptr<Server> BuildAndStart(); @@ -96,6 +102,7 @@ class ServerBuilder { std::vector<RpcService*> services_; std::vector<AsynchronousService*> async_services_; std::vector<Port> ports_; + std::vector<ServerCompletionQueue*> cqs_; std::shared_ptr<ServerCredentials> creds_; AsyncGenericService* generic_service_; ThreadPoolInterface* thread_pool_; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 9bb826f323..be12356414 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -460,7 +460,8 @@ void grpc_call_destroy(grpc_call *call); grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, - grpc_completion_queue *cq_bound_to_call, void *tag_new); + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag_new); /* Registers a method in the server. Methods to this (host, method) pair will not be reported by @@ -470,21 +471,26 @@ grpc_call_error grpc_server_request_call( Must be called before grpc_server_start. Returns NULL on failure. */ void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, - grpc_completion_queue *new_call_cq); + const char *host); /* Request notification of a new pre-registered call */ grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_bound_to_call, void *tag_new); + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag_new); /* Create a server. Additional configuration for each incoming channel can be specified with args. If no additional configuration is needed, args can be NULL. See grpc_channel_args for more. */ -grpc_server *grpc_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args); +grpc_server *grpc_server_create(const grpc_channel_args *args); + +/* Register a completion queue with the server. Must be done for any completion + queue that is passed to grpc_server_request_* call. Must be performed prior + to grpc_server_start. */ +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq); /* Add a HTTP2 over plaintext over tcp listener. Returns bound port number on success, 0 on failure. diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 735e7e58a8..46c842a7d6 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -162,6 +162,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, "class CompletionQueue;\n" "class ChannelInterface;\n" "class RpcService;\n" + "class ServerCompletionQueue;\n" "class ServerContext;\n"; if (HasUnaryCalls(file)) { temp.append( @@ -260,7 +261,7 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer, "std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>> " "$Method$(::grpc::ClientContext* context);\n"); printer->Print(*vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " + "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " "$Request$, $Response$>> " "Async$Method$(::grpc::ClientContext* context, " "::grpc::CompletionQueue* cq, void* tag);\n"); @@ -318,30 +319,37 @@ void PrintHeaderServerMethodAsync( (*vars)["Response"] = grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, $Request$* request, " - "::grpc::ServerAsyncResponseWriter< $Response$>* response, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, $Request$* request, " + "::grpc::ServerAsyncResponseWriter< $Response$>* response, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (ClientOnlyStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, " - "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (ServerOnlyStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, $Request$* request, " - "::grpc::ServerAsyncWriter< $Response$>* writer, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, $Request$* request, " + "::grpc::ServerAsyncWriter< $Response$>* writer, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, "void Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } } @@ -403,7 +411,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer, " public:\n"); printer->Indent(); (*vars)["MethodCount"] = as_string(service->method_count()); - printer->Print("explicit AsyncService(::grpc::CompletionQueue* cq);\n"); + printer->Print("explicit AsyncService();\n"); printer->Print("~AsyncService() {};\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderServerMethodAsync(printer, service->method(i), vars); @@ -686,36 +694,43 @@ void PrintSourceServerAsyncMethod( (*vars)["Response"] = grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "$Request$* request, " - "::grpc::ServerAsyncResponseWriter< $Response$>* response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "$Request$* request, " + "::grpc::ServerAsyncResponseWriter< $Response$>* response, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestAsyncUnary($Idx$, context, " - "request, response, cq, tag);\n"); + "request, response, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (ClientOnlyStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestClientStreaming($Idx$, " - "context, reader, cq, tag);\n"); + "context, reader, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (ServerOnlyStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "$Request$* request, " - "::grpc::ServerAsyncWriter< $Response$>* writer, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " AsynchronousService::RequestServerStreaming($Idx$, " - "context, request, writer, cq, tag);\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "$Request$* request, " + "::grpc::ServerAsyncWriter< $Response$>* writer, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); + printer->Print( + *vars, + " AsynchronousService::RequestServerStreaming($Idx$, " + "context, request, writer, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (BidiStreaming(method)) { printer->Print( @@ -723,10 +738,11 @@ void PrintSourceServerAsyncMethod( "void $ns$$Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " - "::grpc::CompletionQueue* cq, void *tag) {\n"); + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestBidiStreaming($Idx$, " - "context, stream, cq, tag);\n"); + "context, stream, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } } @@ -788,9 +804,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, (*vars)["MethodCount"] = as_string(service->method_count()); printer->Print(*vars, - "$ns$$Service$::AsyncService::AsyncService(::grpc::" - "CompletionQueue* cq) : " - "::grpc::AsynchronousService(cq, " + "$ns$$Service$::AsyncService::AsyncService() : " + "::grpc::AsynchronousService(" "$prefix$$Service$_method_names, $MethodCount$) " "{}\n\n"); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 01644b4471..96c1b7c3eb 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct { requested_call_type type; void *tag; + grpc_completion_queue *cq_bound_to_call; + grpc_completion_queue *cq_for_notification; + grpc_call **call; union { struct { - grpc_completion_queue *cq_bind; - grpc_call **call; grpc_call_details *details; grpc_metadata_array *initial_metadata; } batch; struct { - grpc_completion_queue *cq_bind; - grpc_call **call; registered_method *registered_method; gpr_timespec *deadline; grpc_metadata_array *initial_metadata; @@ -103,7 +102,6 @@ struct registered_method { char *host; call_data *pending; requested_call_array requested; - grpc_completion_queue *cq; registered_method *next; }; @@ -130,7 +128,6 @@ struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; grpc_channel_args *channel_args; - grpc_completion_queue *unregistered_cq; grpc_completion_queue **cqs; grpc_pollset **pollsets; @@ -602,7 +599,8 @@ static const grpc_channel_filter server_surface_filter = { destroy_channel_elem, "server", }; -static void addcq(grpc_server *server, grpc_completion_queue *cq) { +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq) { size_t i, n; for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; @@ -614,8 +612,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { server->cqs[n] = cq; } -grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, - grpc_channel_filter **filters, +grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args) { size_t i; @@ -626,12 +623,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); memset(server, 0, sizeof(grpc_server)); - if (cq) addcq(server, cq); gpr_mu_init(&server->mu); gpr_cv_init(&server->cv); - server->unregistered_cq = cq; /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = @@ -667,8 +662,7 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, - grpc_completion_queue *cq_new_rpc) { + const char *host) { registered_method *m; if (!method) { gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); @@ -681,13 +675,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method, return NULL; } } - addcq(server, cq_new_rpc); m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; - m->cq = cq_new_rpc; server->registered_methods = m; return m; } @@ -1012,17 +1004,18 @@ static grpc_call_error queue_call_request(grpc_server *server, } } -grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq_bind, - void *tag) { +grpc_call_error grpc_server_request_call( + grpc_server *server, grpc_call **call, grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; - grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; - rc.data.batch.cq_bind = cq_bind; - rc.data.batch.call = call; + rc.cq_bound_to_call = cq_bound_to_call; + rc.cq_for_notification = cq_for_notification; + rc.call = call; rc.data.batch.details = details; rc.data.batch.initial_metadata = initial_metadata; return queue_call_request(server, &rc); @@ -1031,14 +1024,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_bind, void *tag) { + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; registered_method *registered_method = rm; - grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; - rc.data.registered.cq_bind = cq_bind; - rc.data.registered.call = call; + rc.cq_bound_to_call = cq_bound_to_call; + rc.cq_for_notification = cq_for_notification; + rc.call = call; rc.data.registered.registered_method = registered_method; rc.data.registered.deadline = deadline; rc.data.registered.initial_metadata = initial_metadata; @@ -1076,6 +1071,9 @@ static void begin_call(grpc_server *server, call_data *calld, fill in the metadata array passed by the client, we need to perform an ioreq op, that should complete immediately. */ + grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); + *rc->call = calld->call; + calld->cq_new = rc->cq_for_notification; switch (rc->type) { case BATCH_CALL: cpstr(&rc->data.batch.details->host, @@ -1083,18 +1081,13 @@ static void begin_call(grpc_server *server, call_data *calld, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); - *rc->data.batch.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; - calld->cq_new = server->unregistered_cq; publish = publish_registered_or_batch; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind); - *rc->data.registered.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; r++; @@ -1103,7 +1096,6 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } - calld->cq_new = rc->data.registered.registered_method->cq; publish = publish_registered_or_batch; break; } @@ -1114,20 +1106,17 @@ static void begin_call(grpc_server *server, call_data *calld, } static void fail_call(grpc_server *server, requested_call *rc) { + *rc->call = NULL; switch (rc->type) { case BATCH_CALL: - *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, - GRPC_OP_ERROR); break; case REGISTERED_CALL: - *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL, - do_nothing, NULL, GRPC_OP_ERROR); break; } + grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); } static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 2cfa38fa43..c6331033e0 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -39,8 +39,7 @@ #include "src/core/transport/transport.h" /* Create a server */ -grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, - grpc_channel_filter **filters, +grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args); diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index f629c7c72d..b7390675ad 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -35,7 +35,6 @@ #include "src/core/surface/completion_queue.h" #include "src/core/surface/server.h" -grpc_server *grpc_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args) { - return grpc_server_create_from_filters(cq, NULL, 0, args); +grpc_server *grpc_server_create(const grpc_channel_args *args) { + return grpc_server_create_from_filters(NULL, 0, args); } diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc index 07cb933715..2e99afcb5f 100644 --- a/src/cpp/server/async_generic_service.cc +++ b/src/cpp/server/async_generic_service.cc @@ -39,12 +39,10 @@ namespace grpc { void AsyncGenericService::RequestCall( GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, - CompletionQueue* cq, void* tag) { - server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag); -} - -CompletionQueue* AsyncGenericService::completion_queue() { - return &server_->cq_; + CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, + void* tag) { + server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq, + tag); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 08c956601c..e9c4f4eaaf 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -78,7 +78,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - void Request(grpc_server* server) { + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(!in_flight_); in_flight_ = true; cq_ = grpc_completion_queue_create(); @@ -86,7 +86,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_server_request_registered_call( server, tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, - this)); + notify_cq, this)); } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { @@ -179,16 +179,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) { +static grpc_server* CreateServer(int max_message_size) { if (max_message_size > 0) { grpc_arg arg; arg.type = GRPC_ARG_INTEGER; arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); arg.value.integer = max_message_size; grpc_channel_args args = {1, &arg}; - return grpc_server_create(cq, &args); + return grpc_server_create(&args); } else { - return grpc_server_create(cq, nullptr); + return grpc_server_create(nullptr); } } @@ -199,9 +199,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), - server_(CreateServer(cq_.cq(), max_message_size)), + server_(CreateServer(max_message_size)), thread_pool_(thread_pool), - thread_pool_owned_(thread_pool_owned) {} + thread_pool_owned_(thread_pool_owned) { + grpc_server_register_completion_queue(server_, cq_.cq()); +} Server::~Server() { { @@ -221,8 +223,7 @@ Server::~Server() { bool Server::RegisterService(RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod* method = service->GetMethod(i); - void* tag = - grpc_server_register_method(server_, method->name(), nullptr, cq_.cq()); + void* tag = grpc_server_register_method(server_, method->name(), nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -240,9 +241,8 @@ bool Server::RegisterAsyncService(AsynchronousService* service) { service->dispatch_impl_ = this; service->request_args_ = new void*[service->method_count_]; for (size_t i = 0; i < service->method_count_; ++i) { - void* tag = - grpc_server_register_method(server_, service->method_names_[i], nullptr, - service->completion_queue()->cq()); + void* tag = grpc_server_register_method(server_, service->method_names_[i], + nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", service->method_names_[i]); @@ -273,7 +273,7 @@ bool Server::Start() { // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { - m->Request(server_); + m->Request(server_, cq_.cq()); } ScheduleCallback(); @@ -316,12 +316,13 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { public: AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, CompletionQueue* cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : tag_(tag), request_(request), stream_(stream), - cq_(cq), + call_cq_(call_cq), + notification_cq_(notification_cq), ctx_(ctx), generic_ctx_(nullptr), server_(server), @@ -329,18 +330,22 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { payload_(nullptr) { memset(&array_, 0, sizeof(array_)); grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); grpc_server_request_registered_call( server->server_, registered_method, &call_, &call_details_.deadline, - &array_, request ? &payload_ : nullptr, cq->cq(), this); + &array_, request ? &payload_ : nullptr, call_cq->cq(), + notification_cq->cq(), this); } AsyncRequest(Server* server, GenericServerContext* ctx, - ServerAsyncStreamingInterface* stream, CompletionQueue* cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : tag_(tag), request_(nullptr), stream_(stream), - cq_(cq), + call_cq_(call_cq), + notification_cq_(notification_cq), ctx_(nullptr), generic_ctx_(ctx), server_(server), @@ -348,8 +353,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { payload_(nullptr) { memset(&array_, 0, sizeof(array_)); grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); grpc_server_request_call(server->server_, &call_, &call_details_, &array_, - cq->cq(), this); + call_cq->cq(), notification_cq->cq(), this); } ~AsyncRequest() { @@ -392,8 +399,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } } ctx->call_ = call_; - ctx->cq_ = cq_; - Call call(call_, server_, cq_, server_->max_message_size_); + ctx->cq_ = call_cq_; + Call call(call_, server_, call_cq_, server_->max_message_size_); if (orig_status && call_) { ctx->BeginCompletionOp(&call); } @@ -407,7 +414,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; grpc::protobuf::Message* const request_; ServerAsyncStreamingInterface* const stream_; - CompletionQueue* const cq_; + CompletionQueue* const call_cq_; + ServerCompletionQueue* const notification_cq_; ServerContext* const ctx_; GenericServerContext* const generic_ctx_; Server* const server_; @@ -420,14 +428,19 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { void Server::RequestAsyncCall(void* registered_method, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { - new AsyncRequest(this, registered_method, context, request, stream, cq, tag); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new AsyncRequest(this, registered_method, context, request, stream, call_cq, + notification_cq, tag); } void Server::RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { - new AsyncRequest(this, context, stream, cq, tag); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new AsyncRequest(this, context, stream, call_cq, notification_cq, tag); } void Server::ScheduleCallback() { @@ -446,7 +459,7 @@ void Server::RunRpc() { ScheduleCallback(); if (ok) { SyncRequest::CallData cd(this, mrd); - mrd->Request(server_); + mrd->Request(server_, cq_.cq()); cd.Run(); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index e48d1eeb42..4bcbd82952 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -44,6 +44,12 @@ namespace grpc { ServerBuilder::ServerBuilder() : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} +std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { + ServerCompletionQueue* cq = new ServerCompletionQueue(); + cqs_.push_back(cq); + return std::unique_ptr<ServerCompletionQueue>(cq); +} + void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); } @@ -88,6 +94,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } std::unique_ptr<Server> server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); + for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq()); + } for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService(*service)) { diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 5e278ca66c..ad97084502 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -99,7 +99,8 @@ void test_connect(const char *server_host, const char *client_host, int port, /* Create server. */ server_cq = grpc_completion_queue_create(); - server = grpc_server_create(server_cq, NULL); + server = grpc_server_create(NULL); + grpc_server_register_completion_queue(server, server_cq); GPR_ASSERT((got_port = grpc_server_add_http2_port(server, server_hostport)) > 0); if (port == 0) { @@ -155,10 +156,10 @@ void test_connect(const char *server_host, const char *client_host, int port, if (expect_ok) { /* Check for a successful request. */ - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(server, &s, - &call_details, - &request_metadata_recv, - server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(server, &s, &call_details, + &request_metadata_recv, server_cq, + server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c index 929f1f50db..c94ee94d53 100644 --- a/test/core/end2end/fixtures/chttp2_fake_security.c +++ b/test/core/end2end/fixtures/chttp2_fake_security.c @@ -82,8 +82,8 @@ static void chttp2_init_server_secure_fullstack( if (f->server) { grpc_server_destroy(f->server); } - f->server = - grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); grpc_server_credentials_release(server_creds); grpc_server_start(f->server); diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c index d7de5e5434..f92b40efeb 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_fullstack.c @@ -83,7 +83,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f, if (f->server) { grpc_server_destroy(f->server); } - f->server = grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); } diff --git a/test/core/end2end/fixtures/chttp2_fullstack_uds.c b/test/core/end2end/fixtures/chttp2_fullstack_uds.c index 53803b0f1d..876782df84 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack_uds.c +++ b/test/core/end2end/fixtures/chttp2_fullstack_uds.c @@ -88,7 +88,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f, if (f->server) { grpc_server_destroy(f->server); } - f->server = grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); } diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c index 9c4086d79d..36ac4e46a3 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c @@ -85,8 +85,8 @@ static void chttp2_init_server_secure_fullstack( if (f->server) { grpc_server_destroy(f->server); } - f->server = - grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); grpc_server_credentials_release(server_creds); grpc_server_start(f->server); diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c index e9e1c5f838..4bfd923e83 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c @@ -83,8 +83,8 @@ static void chttp2_init_server_secure_fullstack( if (f->server) { grpc_server_destroy(f->server); } - f->server = - grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); grpc_server_credentials_release(server_creds); grpc_server_start(f->server); diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index d19ceb178b..43ebf7eed5 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -117,8 +117,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_endpoint_pair *sfd = f->fixture_data; GPR_ASSERT(!f->server); - f->server = - grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); + f->server = grpc_server_create_from_filters(NULL, 0, server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); grpc_server_start(f->server); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index ddde585b83..385d5a4e81 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -117,8 +117,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_endpoint_pair *sfd = f->fixture_data; GPR_ASSERT(!f->server); - f->server = - grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); + f->server = grpc_server_create_from_filters(NULL, 0, server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); grpc_server_start(f->server); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 21057969d9..0adc437db0 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -161,9 +161,10 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( - f.server, &s, &call_details, - &request_metadata_recv, f.server_cq, tag(2))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(2))); cq_expect_completion(v_server, tag(2), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c index f8733ef444..0b20a97559 100644 --- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c +++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c @@ -163,9 +163,10 @@ static void test_cancel_after_accept_and_writes_closed( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( - f.server, &s, &call_details, - &request_metadata_recv, f.server_cq, tag(2))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(2))); cq_expect_completion(v_server, tag(2), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c index 67c769c08b..13bf31584d 100644 --- a/test/core/end2end/tests/census_simple_request.c +++ b/test/core/end2end/tests/census_simple_request.c @@ -142,10 +142,10 @@ static void test_body(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c index c8e22ce11c..29c023c72a 100644 --- a/test/core/end2end/tests/disappearing_server.c +++ b/test/core/end2end/tests/disappearing_server.c @@ -133,10 +133,10 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, - &call_details, - &request_metadata_recv, - f->server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f->server, &s, &call_details, + &request_metadata_recv, f->server_cq, + f->server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c index 2c2d2e895b..c293551663 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c @@ -148,10 +148,10 @@ static void test_early_server_shutdown_finishes_inflight_calls( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c index 96978a8cb9..8801dae98a 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c @@ -115,10 +115,10 @@ static void test_early_server_shutdown_finishes_tags( /* upon shutdown, the server should finish all requested calls indicating no new call */ - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); grpc_server_shutdown(f.server); cq_expect_completion(v_server, tag(101), GRPC_OP_ERROR); cq_verify(v_server); diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index d084530a9c..2a8cf098eb 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -147,10 +147,10 @@ static void test_early_server_shutdown_finishes_inflight_calls( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index d9d9e934cb..98bcf9ada9 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -165,10 +165,10 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index 6e95a6c5f8..e25b115d33 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -145,10 +145,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); @@ -254,10 +254,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { "foo.test.google.fr:1234", deadline); GPR_ASSERT(c2); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s1, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s1, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -342,10 +342,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { cq_expect_completion(v_client, tag(live_call + 1), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s2, - &call_details, - &request_metadata_recv, - f.server_cq, tag(201))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s2, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(201))); cq_expect_completion(v_server, tag(201), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c index 6291f773b3..3f8112d341 100644 --- a/test/core/end2end/tests/max_message_length.c +++ b/test/core/end2end/tests/max_message_length.c @@ -164,10 +164,10 @@ static void test_max_message_length(grpc_end2end_test_config config) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c index fe02f25875..c125664115 100644 --- a/test/core/end2end/tests/ping_pong_streaming.c +++ b/test/core/end2end/tests/ping_pong_streaming.c @@ -153,10 +153,10 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(100))); cq_expect_completion(v_server, tag(100), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c index 05b7a1dad0..04c3d5293c 100644 --- a/test/core/end2end/tests/registered_call.c +++ b/test/core/end2end/tests/registered_call.c @@ -146,10 +146,10 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c index 0169d52059..281091cdf9 100644 --- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c @@ -181,10 +181,10 @@ static void test_request_response_with_metadata_and_payload( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c index dc49242d39..1590aa23fa 100644 --- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c @@ -167,10 +167,10 @@ static void test_request_response_with_metadata_and_payload( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c index 92036590a7..b94b6761eb 100644 --- a/test/core/end2end/tests/request_response_with_payload.c +++ b/test/core/end2end/tests/request_response_with_payload.c @@ -159,10 +159,10 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c index c5b4e0c57e..bf8309914e 100644 --- a/test/core/end2end/tests/request_with_large_metadata.c +++ b/test/core/end2end/tests/request_with_large_metadata.c @@ -163,10 +163,10 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 63b7c5ee40..5fe69e9109 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -154,10 +154,10 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index 0dbb35d454..e025fd1a1e 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -141,10 +141,10 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, config.init_server(f, server_args); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, - &call_details, - &request_metadata_recv, - f->server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f->server, &s, &call_details, + &request_metadata_recv, f->server_cq, + f->server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 4d4d48a211..271bdc56ca 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -147,10 +147,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c index 538291a5f2..3b5393f660 100644 --- a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c +++ b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c @@ -147,10 +147,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/fling/server.c b/test/core/fling/server.c index 63c7bd7f88..8eab534177 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -89,7 +89,7 @@ typedef struct { static void request_call(void) { grpc_metadata_array_init(&request_metadata_recv); grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, - cq, tag(FLING_SERVER_NEW_REQUEST)); + cq, cq, tag(FLING_SERVER_NEW_REQUEST)); } static void handle_unary_method(void) { @@ -206,13 +206,14 @@ int main(int argc, char **argv) { test_server1_cert}; grpc_server_credentials *ssl_creds = grpc_ssl_server_credentials_create(NULL, &pem_key_cert_pair, 1); - server = grpc_server_create(cq, NULL); + server = grpc_server_create(NULL); GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds)); grpc_server_credentials_release(ssl_creds); } else { - server = grpc_server_create(cq, NULL); + server = grpc_server_create(NULL); GPR_ASSERT(grpc_server_add_http2_port(server, addr)); } + grpc_server_register_completion_queue(server, cq); grpc_server_start(server); gpr_free(addr_buf); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 6c0dfadbb9..d7c190dade 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -91,7 +91,7 @@ void verify_timed_ok( class AsyncEnd2endTest : public ::testing::Test { protected: - AsyncEnd2endTest() : service_(&srv_cq_) {} + AsyncEnd2endTest() {} void SetUp() GRPC_OVERRIDE { int port = grpc_pick_unused_port_or_die(); @@ -100,6 +100,7 @@ class AsyncEnd2endTest : public ::testing::Test { ServerBuilder builder; builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials()); builder.RegisterAsyncService(&service_); + srv_cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); } @@ -108,10 +109,10 @@ class AsyncEnd2endTest : public ::testing::Test { void* ignored_tag; bool ignored_ok; cli_cq_.Shutdown(); - srv_cq_.Shutdown(); + srv_cq_->Shutdown(); while (cli_cq_.Next(&ignored_tag, &ignored_ok)) ; - while (srv_cq_.Next(&ignored_tag, &ignored_ok)) + while (srv_cq_->Next(&ignored_tag, &ignored_ok)) ; } @@ -121,9 +122,9 @@ class AsyncEnd2endTest : public ::testing::Test { stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); } - void server_ok(int i) { verify_ok(&srv_cq_, i, true); } + void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } void client_ok(int i) { verify_ok(&cli_cq_, i, true); } - void server_fail(int i) { verify_ok(&srv_cq_, i, false); } + void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } void client_fail(int i) { verify_ok(&cli_cq_, i, false); } void SendRpc(int num_rpcs) { @@ -142,8 +143,8 @@ class AsyncEnd2endTest : public ::testing::Test { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, + srv_cq_.get(), srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -162,7 +163,7 @@ class AsyncEnd2endTest : public ::testing::Test { } CompletionQueue cli_cq_; - CompletionQueue srv_cq_; + std::unique_ptr<ServerCompletionQueue> srv_cq_; std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; std::unique_ptr<Server> server_; grpc::cpp::test::util::TestService::AsyncService service_; @@ -200,19 +201,19 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) { std::chrono::system_clock::time_point time_now( std::chrono::system_clock::now()), time_limit(std::chrono::system_clock::now() + std::chrono::seconds(5)); - verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); + verify_timed_ok(srv_cq_.get(), -1, true, time_now, CompletionQueue::TIMEOUT); verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); - verify_timed_ok(&srv_cq_, 2, true, time_limit); + verify_timed_ok(srv_cq_.get(), 2, true, time_limit); EXPECT_EQ(send_request.message(), recv_request.message()); verify_timed_ok(&cli_cq_, 1, true, time_limit); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - verify_timed_ok(&srv_cq_, 3, true); + verify_timed_ok(srv_cq_.get(), 3, true); response_reader->Finish(&recv_response, &recv_status, tag(4)); verify_timed_ok(&cli_cq_, 4, true); @@ -238,7 +239,8 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1))); - service_.RequestRequestStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); + service_.RequestRequestStream(&srv_ctx, &srv_stream, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); client_ok(1); @@ -291,8 +293,8 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, &srv_cq_, - tag(2)); + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + srv_cq_.get(), srv_cq_.get(), tag(2)); server_ok(2); client_ok(1); @@ -342,7 +344,8 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> > cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); + service_.RequestBidiStream(&srv_ctx, &srv_stream, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); client_ok(1); @@ -400,8 +403,8 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); @@ -442,8 +445,8 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); @@ -490,8 +493,8 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); @@ -551,8 +554,8 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 103f613f70..80e43fd854 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -109,6 +109,7 @@ class GenericEnd2endTest : public ::testing::Test { ServerBuilder builder; builder.AddListeningPort(server_address_.str(), InsecureServerCredentials()); builder.RegisterAsyncGenericService(&generic_service_); + srv_cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); } @@ -117,10 +118,10 @@ class GenericEnd2endTest : public ::testing::Test { void* ignored_tag; bool ignored_ok; cli_cq_.Shutdown(); - srv_cq_.Shutdown(); + srv_cq_->Shutdown(); while (cli_cq_.Next(&ignored_tag, &ignored_ok)) ; - while (srv_cq_.Next(&ignored_tag, &ignored_ok)) + while (srv_cq_->Next(&ignored_tag, &ignored_ok)) ; } @@ -130,9 +131,9 @@ class GenericEnd2endTest : public ::testing::Test { generic_stub_.reset(new GenericStub(channel)); } - void server_ok(int i) { verify_ok(&srv_cq_, i, true); } + void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } void client_ok(int i) { verify_ok(&cli_cq_, i, true); } - void server_fail(int i) { verify_ok(&srv_cq_, i, false); } + void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } void client_fail(int i) { verify_ok(&cli_cq_, i, false); } void SendRpc(int num_rpcs) { @@ -160,9 +161,10 @@ class GenericEnd2endTest : public ::testing::Test { call->WritesDone(tag(3)); client_ok(3); - generic_service_.RequestCall(&srv_ctx, &stream, &srv_cq_, tag(4)); + generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), + srv_cq_.get(), tag(4)); - verify_ok(generic_service_.completion_queue(), 4, true); + verify_ok(srv_cq_.get(), 4, true); EXPECT_EQ(server_address_.str(), srv_ctx.host()); EXPECT_EQ(kMethodName, srv_ctx.method()); ByteBuffer recv_buffer; @@ -193,7 +195,7 @@ class GenericEnd2endTest : public ::testing::Test { } CompletionQueue cli_cq_; - CompletionQueue srv_cq_; + std::unique_ptr<ServerCompletionQueue> srv_cq_; std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; std::unique_ptr<grpc::GenericStub> generic_stub_; std::unique_ptr<Server> server_; @@ -230,9 +232,10 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); client_ok(1); - generic_service_.RequestCall(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); + generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), + srv_cq_.get(), tag(2)); - verify_ok(generic_service_.completion_queue(), 2, true); + verify_ok(srv_cq_.get(), 2, true); EXPECT_EQ(server_address_.str(), srv_ctx.host()); EXPECT_EQ(kMethodName, srv_ctx.method()); diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index b19c443c82..6cb3192908 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -63,9 +63,7 @@ namespace testing { class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig& config, int port) - : srv_cq_(), async_service_(&srv_cq_), server_(nullptr), - shutdown_(false) { + AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) { char* server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -74,15 +72,17 @@ class AsyncQpsServerTest : public Server { gpr_free(server_address); builder.RegisterAsyncService(&async_service_); + srv_cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); using namespace std::placeholders; - request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall, - &async_service_, _1, _2, _3, &srv_cq_, _4); + request_unary_ = + std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, + _1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4); request_streaming_ = - std::bind(&TestService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, &srv_cq_, _3); + std::bind(&TestService::AsyncService::RequestStreamingCall, + &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3); for (int i = 0; i < 100; i++) { contexts_.push_front( new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( @@ -96,7 +96,7 @@ class AsyncQpsServerTest : public Server { // Wait until work is available or we are shutting down bool ok; void* got_tag; - while (srv_cq_.Next(&got_tag, &ok)) { + while (srv_cq_->Next(&got_tag, &ok)) { ServerRpcContext* ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke if (ctx->RunNextState(ok) == false) { @@ -116,7 +116,7 @@ class AsyncQpsServerTest : public Server { { std::lock_guard<std::mutex> g(shutdown_mutex_); shutdown_ = true; - srv_cq_.Shutdown(); + srv_cq_->Shutdown(); } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); @@ -290,10 +290,10 @@ class AsyncQpsServerTest : public Server { } return Status::OK; } - CompletionQueue srv_cq_; - TestService::AsyncService async_service_; std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; + std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_; + TestService::AsyncService async_service_; std::function<void(ServerContext*, SimpleRequest*, grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)> request_unary_; |