aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--build.json2
-rw-r--r--include/grpc++/async_generic_service.h6
-rw-r--r--include/grpc++/completion_queue.h7
-rw-r--r--include/grpc++/impl/service_type.h37
-rw-r--r--include/grpc++/server.h8
-rw-r--r--include/grpc++/server_builder.h7
-rw-r--r--include/grpc/grpc.h18
-rw-r--r--src/compiler/cpp_generator.cc105
-rw-r--r--src/core/surface/server.c67
-rw-r--r--src/core/surface/server.h3
-rw-r--r--src/core/surface/server_create.c5
-rw-r--r--src/cpp/server/async_generic_service.cc10
-rw-r--r--src/cpp/server/server.cc71
-rw-r--r--src/cpp/server/server_builder.cc9
-rw-r--r--test/core/end2end/dualstack_socket_test.c11
-rw-r--r--test/core/end2end/fixtures/chttp2_fake_security.c4
-rw-r--r--test/core/end2end/fixtures/chttp2_fullstack.c3
-rw-r--r--test/core/end2end/fixtures/chttp2_fullstack_uds.c3
-rw-r--r--test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c4
-rw-r--r--test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c4
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair.c4
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c4
-rw-r--r--test/core/end2end/tests/cancel_after_accept.c7
-rw-r--r--test/core/end2end/tests/cancel_after_accept_and_writes_closed.c7
-rw-r--r--test/core/end2end/tests/census_simple_request.c8
-rw-r--r--test/core/end2end/tests/disappearing_server.c8
-rw-r--r--test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c8
-rw-r--r--test/core/end2end/tests/early_server_shutdown_finishes_tags.c8
-rw-r--r--test/core/end2end/tests/graceful_server_shutdown.c8
-rw-r--r--test/core/end2end/tests/invoke_large_request.c8
-rw-r--r--test/core/end2end/tests/max_concurrent_streams.c24
-rw-r--r--test/core/end2end/tests/max_message_length.c8
-rw-r--r--test/core/end2end/tests/ping_pong_streaming.c8
-rw-r--r--test/core/end2end/tests/registered_call.c8
-rw-r--r--test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c8
-rw-r--r--test/core/end2end/tests/request_response_with_metadata_and_payload.c8
-rw-r--r--test/core/end2end/tests/request_response_with_payload.c8
-rw-r--r--test/core/end2end/tests/request_with_large_metadata.c8
-rw-r--r--test/core/end2end/tests/request_with_payload.c8
-rw-r--r--test/core/end2end/tests/simple_delayed_request.c8
-rw-r--r--test/core/end2end/tests/simple_request.c8
-rw-r--r--test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c8
-rw-r--r--test/core/fling/server.c7
-rw-r--r--test/cpp/end2end/async_end2end_test.cc53
-rw-r--r--test/cpp/end2end/generic_end2end_test.cc21
-rw-r--r--test/cpp/qps/server_async.cc22
47 files changed, 366 insertions, 305 deletions
diff --git a/Makefile b/Makefile
index 244a211652..5f95aabb92 100644
--- a/Makefile
+++ b/Makefile
@@ -305,7 +305,7 @@ E = @echo
Q = @
endif
-VERSION = 0.7.0.0
+VERSION = 0.8.0.0
CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES))
CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)
diff --git a/build.json b/build.json
index 10fd72d99e..217d84cdea 100644
--- a/build.json
+++ b/build.json
@@ -6,7 +6,7 @@
"#": "The public version number of the library.",
"version": {
"major": 0,
- "minor": 7,
+ "minor": 8,
"micro": 0,
"build": 0
}
diff --git a/include/grpc++/async_generic_service.h b/include/grpc++/async_generic_service.h
index 911d31cb1f..b435c6e73d 100644
--- a/include/grpc++/async_generic_service.h
+++ b/include/grpc++/async_generic_service.h
@@ -65,10 +65,8 @@ class AsyncGenericService GRPC_FINAL {
void RequestCall(GenericServerContext* ctx,
GenericServerAsyncReaderWriter* reader_writer,
- CompletionQueue* cq, void* tag);
-
- // The new rpc event should be obtained from this completion queue.
- CompletionQueue* completion_queue();
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag);
private:
friend class Server;
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 5c2b1cce93..e8429c8f41 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -58,6 +58,7 @@ class ServerReaderWriter;
class CompletionQueue;
class Server;
+class ServerBuilder;
class ServerContext;
class CompletionQueueTag {
@@ -137,6 +138,12 @@ class CompletionQueue : public GrpcLibrary {
grpc_completion_queue* cq_; // owned
};
+class ServerCompletionQueue : public CompletionQueue {
+ private:
+ friend class ServerBuilder;
+ ServerCompletionQueue() {}
+};
+
} // namespace grpc
#endif // GRPCXX_COMPLETION_QUEUE_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index 7cd3ddad6b..bc39bb82ac 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -39,8 +39,10 @@
namespace grpc {
class Call;
+class CompletionQueue;
class RpcService;
class Server;
+class ServerCompletionQueue;
class ServerContext;
class Status;
@@ -70,52 +72,55 @@ class AsynchronousService {
ServerContext* context,
::grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) = 0;
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) = 0;
};
- AsynchronousService(CompletionQueue* cq, const char** method_names,
- size_t method_count)
- : cq_(cq),
- dispatch_impl_(nullptr),
+ AsynchronousService(const char** method_names, size_t method_count)
+ : dispatch_impl_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
~AsynchronousService() { delete[] request_args_; }
- CompletionQueue* completion_queue() const { return cq_; }
-
protected:
void RequestAsyncUnary(int index, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestServerStreaming(int index, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
private:
friend class Server;
- CompletionQueue* const cq_;
DispatchImpl* dispatch_impl_;
const char** const method_names_;
size_t method_count_;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index b2b9044dca..50a2416321 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -101,11 +101,15 @@ class Server GRPC_FINAL : public GrpcLibrary,
void RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) GRPC_OVERRIDE;
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) GRPC_OVERRIDE;
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag);
+ CompletionQueue* cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag);
const int max_message_size_;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 7155c7fd46..ecee475e3e 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -46,6 +46,7 @@ class AsynchronousService;
class CompletionQueue;
class RpcService;
class Server;
+class ServerCompletionQueue;
class ServerCredentials;
class SynchronousService;
class ThreadPoolInterface;
@@ -82,6 +83,11 @@ class ServerBuilder {
// Does not take ownership.
void SetThreadPool(ThreadPoolInterface* thread_pool);
+ // Add a completion queue for handling asynchronous services
+ // Caller is required to keep this completion queue live until calling
+ // BuildAndStart()
+ std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
+
// Return a running server which is ready for processing rpcs.
std::unique_ptr<Server> BuildAndStart();
@@ -96,6 +102,7 @@ class ServerBuilder {
std::vector<RpcService*> services_;
std::vector<AsynchronousService*> async_services_;
std::vector<Port> ports_;
+ std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
AsyncGenericService* generic_service_;
ThreadPoolInterface* thread_pool_;
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 9bb826f323..be12356414 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -460,7 +460,8 @@ void grpc_call_destroy(grpc_call *call);
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata,
- grpc_completion_queue *cq_bound_to_call, void *tag_new);
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag_new);
/* Registers a method in the server.
Methods to this (host, method) pair will not be reported by
@@ -470,21 +471,26 @@ grpc_call_error grpc_server_request_call(
Must be called before grpc_server_start.
Returns NULL on failure. */
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host,
- grpc_completion_queue *new_call_cq);
+ const char *host);
/* Request notification of a new pre-registered call */
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata,
grpc_byte_buffer **optional_payload,
- grpc_completion_queue *cq_bound_to_call, void *tag_new);
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag_new);
/* Create a server. Additional configuration for each incoming channel can
be specified with args. If no additional configuration is needed, args can
be NULL. See grpc_channel_args for more. */
-grpc_server *grpc_server_create(grpc_completion_queue *cq,
- const grpc_channel_args *args);
+grpc_server *grpc_server_create(const grpc_channel_args *args);
+
+/* Register a completion queue with the server. Must be done for any completion
+ queue that is passed to grpc_server_request_* call. Must be performed prior
+ to grpc_server_start. */
+void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq);
/* Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 735e7e58a8..46c842a7d6 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -162,6 +162,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
"class CompletionQueue;\n"
"class ChannelInterface;\n"
"class RpcService;\n"
+ "class ServerCompletionQueue;\n"
"class ServerContext;\n";
if (HasUnaryCalls(file)) {
temp.append(
@@ -260,7 +261,7 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>> "
"$Method$(::grpc::ClientContext* context);\n");
printer->Print(*vars,
- "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
+ "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>> "
"Async$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag);\n");
@@ -318,30 +319,37 @@ void PrintHeaderServerMethodAsync(
(*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) {
- printer->Print(*vars,
- "void Request$Method$("
- "::grpc::ServerContext* context, $Request$* request, "
- "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
- "::grpc::CompletionQueue* cq, void *tag);\n");
+ printer->Print(
+ *vars,
+ "void Request$Method$("
+ "::grpc::ServerContext* context, $Request$* request, "
+ "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
+ "::grpc::CompletionQueue* new_call_cq, "
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
} else if (ClientOnlyStreaming(method)) {
- printer->Print(*vars,
- "void Request$Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
- "::grpc::CompletionQueue* cq, void *tag);\n");
+ printer->Print(
+ *vars,
+ "void Request$Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
+ "::grpc::CompletionQueue* new_call_cq, "
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
} else if (ServerOnlyStreaming(method)) {
- printer->Print(*vars,
- "void Request$Method$("
- "::grpc::ServerContext* context, $Request$* request, "
- "::grpc::ServerAsyncWriter< $Response$>* writer, "
- "::grpc::CompletionQueue* cq, void *tag);\n");
+ printer->Print(
+ *vars,
+ "void Request$Method$("
+ "::grpc::ServerContext* context, $Request$* request, "
+ "::grpc::ServerAsyncWriter< $Response$>* writer, "
+ "::grpc::CompletionQueue* new_call_cq, "
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
- "::grpc::CompletionQueue* cq, void *tag);\n");
+ "::grpc::CompletionQueue* new_call_cq, "
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
}
}
@@ -403,7 +411,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
" public:\n");
printer->Indent();
(*vars)["MethodCount"] = as_string(service->method_count());
- printer->Print("explicit AsyncService(::grpc::CompletionQueue* cq);\n");
+ printer->Print("explicit AsyncService();\n");
printer->Print("~AsyncService() {};\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethodAsync(printer, service->method(i), vars);
@@ -686,36 +694,43 @@ void PrintSourceServerAsyncMethod(
(*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) {
- printer->Print(*vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "$Request$* request, "
- "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(
+ *vars,
+ "void $ns$$Service$::AsyncService::Request$Method$("
+ "::grpc::ServerContext* context, "
+ "$Request$* request, "
+ "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
+ "::grpc::CompletionQueue* new_call_cq, "
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" AsynchronousService::RequestAsyncUnary($Idx$, context, "
- "request, response, cq, tag);\n");
+ "request, response, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n");
} else if (ClientOnlyStreaming(method)) {
- printer->Print(*vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(
+ *vars,
+ "void $ns$$Service$::AsyncService::Request$Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
+ "::grpc::CompletionQueue* new_call_cq, "
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" AsynchronousService::RequestClientStreaming($Idx$, "
- "context, reader, cq, tag);\n");
+ "context, reader, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n");
} else if (ServerOnlyStreaming(method)) {
- printer->Print(*vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "$Request$* request, "
- "::grpc::ServerAsyncWriter< $Response$>* writer, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestServerStreaming($Idx$, "
- "context, request, writer, cq, tag);\n");
+ printer->Print(
+ *vars,
+ "void $ns$$Service$::AsyncService::Request$Method$("
+ "::grpc::ServerContext* context, "
+ "$Request$* request, "
+ "::grpc::ServerAsyncWriter< $Response$>* writer, "
+ "::grpc::CompletionQueue* new_call_cq, "
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(
+ *vars,
+ " AsynchronousService::RequestServerStreaming($Idx$, "
+ "context, request, writer, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n");
} else if (BidiStreaming(method)) {
printer->Print(
@@ -723,10 +738,11 @@ void PrintSourceServerAsyncMethod(
"void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
- "::grpc::CompletionQueue* cq, void *tag) {\n");
+ "::grpc::CompletionQueue* new_call_cq, "
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" AsynchronousService::RequestBidiStreaming($Idx$, "
- "context, stream, cq, tag);\n");
+ "context, stream, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n");
}
}
@@ -788,9 +804,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
(*vars)["MethodCount"] = as_string(service->method_count());
printer->Print(*vars,
- "$ns$$Service$::AsyncService::AsyncService(::grpc::"
- "CompletionQueue* cq) : "
- "::grpc::AsynchronousService(cq, "
+ "$ns$$Service$::AsyncService::AsyncService() : "
+ "::grpc::AsynchronousService("
"$prefix$$Service$_method_names, $MethodCount$) "
"{}\n\n");
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 01644b4471..96c1b7c3eb 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct {
requested_call_type type;
void *tag;
+ grpc_completion_queue *cq_bound_to_call;
+ grpc_completion_queue *cq_for_notification;
+ grpc_call **call;
union {
struct {
- grpc_completion_queue *cq_bind;
- grpc_call **call;
grpc_call_details *details;
grpc_metadata_array *initial_metadata;
} batch;
struct {
- grpc_completion_queue *cq_bind;
- grpc_call **call;
registered_method *registered_method;
gpr_timespec *deadline;
grpc_metadata_array *initial_metadata;
@@ -103,7 +102,6 @@ struct registered_method {
char *host;
call_data *pending;
requested_call_array requested;
- grpc_completion_queue *cq;
registered_method *next;
};
@@ -130,7 +128,6 @@ struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args;
- grpc_completion_queue *unregistered_cq;
grpc_completion_queue **cqs;
grpc_pollset **pollsets;
@@ -602,7 +599,8 @@ static const grpc_channel_filter server_surface_filter = {
destroy_channel_elem, "server",
};
-static void addcq(grpc_server *server, grpc_completion_queue *cq) {
+void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq) {
size_t i, n;
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
@@ -614,8 +612,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) {
server->cqs[n] = cq;
}
-grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
- grpc_channel_filter **filters,
+grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args) {
size_t i;
@@ -626,12 +623,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
memset(server, 0, sizeof(grpc_server));
- if (cq) addcq(server, cq);
gpr_mu_init(&server->mu);
gpr_cv_init(&server->cv);
- server->unregistered_cq = cq;
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
@@ -667,8 +662,7 @@ static int streq(const char *a, const char *b) {
}
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host,
- grpc_completion_queue *cq_new_rpc) {
+ const char *host) {
registered_method *m;
if (!method) {
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
@@ -681,13 +675,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
return NULL;
}
}
- addcq(server, cq_new_rpc);
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
- m->cq = cq_new_rpc;
server->registered_methods = m;
return m;
}
@@ -1012,17 +1004,18 @@ static grpc_call_error queue_call_request(grpc_server *server,
}
}
-grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
- grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- grpc_completion_queue *cq_bind,
- void *tag) {
+grpc_call_error grpc_server_request_call(
+ grpc_server *server, grpc_call **call, grpc_call_details *details,
+ grpc_metadata_array *initial_metadata,
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
- grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE);
rc.type = BATCH_CALL;
rc.tag = tag;
- rc.data.batch.cq_bind = cq_bind;
- rc.data.batch.call = call;
+ rc.cq_bound_to_call = cq_bound_to_call;
+ rc.cq_for_notification = cq_for_notification;
+ rc.call = call;
rc.data.batch.details = details;
rc.data.batch.initial_metadata = initial_metadata;
return queue_call_request(server, &rc);
@@ -1031,14 +1024,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
- grpc_completion_queue *cq_bind, void *tag) {
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
- grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE);
rc.type = REGISTERED_CALL;
rc.tag = tag;
- rc.data.registered.cq_bind = cq_bind;
- rc.data.registered.call = call;
+ rc.cq_bound_to_call = cq_bound_to_call;
+ rc.cq_for_notification = cq_for_notification;
+ rc.call = call;
rc.data.registered.registered_method = registered_method;
rc.data.registered.deadline = deadline;
rc.data.registered.initial_metadata = initial_metadata;
@@ -1076,6 +1071,9 @@ static void begin_call(grpc_server *server, call_data *calld,
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
+ grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
+ *rc->call = calld->call;
+ calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
case BATCH_CALL:
cpstr(&rc->data.batch.details->host,
@@ -1083,18 +1081,13 @@ static void begin_call(grpc_server *server, call_data *calld,
cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path);
rc->data.batch.details->deadline = calld->deadline;
- grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
- *rc->data.batch.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
r++;
- calld->cq_new = server->unregistered_cq;
publish = publish_registered_or_batch;
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
- grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
- *rc->data.registered.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata;
r++;
@@ -1103,7 +1096,6 @@ static void begin_call(grpc_server *server, call_data *calld,
r->data.recv_message = rc->data.registered.optional_payload;
r++;
}
- calld->cq_new = rc->data.registered.registered_method->cq;
publish = publish_registered_or_batch;
break;
}
@@ -1114,20 +1106,17 @@ static void begin_call(grpc_server *server, call_data *calld,
}
static void fail_call(grpc_server *server, requested_call *rc) {
+ *rc->call = NULL;
switch (rc->type) {
case BATCH_CALL:
- *rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
- grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
- GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
- *rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
- grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL,
- do_nothing, NULL, GRPC_OP_ERROR);
break;
}
+ grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, do_nothing, NULL,
+ GRPC_OP_ERROR);
}
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 2cfa38fa43..c6331033e0 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -39,8 +39,7 @@
#include "src/core/transport/transport.h"
/* Create a server */
-grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
- grpc_channel_filter **filters,
+grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args);
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index f629c7c72d..b7390675ad 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -35,7 +35,6 @@
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h"
-grpc_server *grpc_server_create(grpc_completion_queue *cq,
- const grpc_channel_args *args) {
- return grpc_server_create_from_filters(cq, NULL, 0, args);
+grpc_server *grpc_server_create(const grpc_channel_args *args) {
+ return grpc_server_create_from_filters(NULL, 0, args);
}
diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc
index 07cb933715..2e99afcb5f 100644
--- a/src/cpp/server/async_generic_service.cc
+++ b/src/cpp/server/async_generic_service.cc
@@ -39,12 +39,10 @@ namespace grpc {
void AsyncGenericService::RequestCall(
GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer,
- CompletionQueue* cq, void* tag) {
- server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag);
-}
-
-CompletionQueue* AsyncGenericService::completion_queue() {
- return &server_->cq_;
+ CompletionQueue* call_cq, ServerCompletionQueue* notification_cq,
+ void* tag) {
+ server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq,
+ tag);
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 08c956601c..e9c4f4eaaf 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -78,7 +78,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- void Request(grpc_server* server) {
+ void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(!in_flight_);
in_flight_ = true;
cq_ = grpc_completion_queue_create();
@@ -86,7 +86,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
- this));
+ notify_cq, this));
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
@@ -179,16 +179,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
-grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) {
+static grpc_server* CreateServer(int max_message_size) {
if (max_message_size > 0) {
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.value.integer = max_message_size;
grpc_channel_args args = {1, &arg};
- return grpc_server_create(cq, &args);
+ return grpc_server_create(&args);
} else {
- return grpc_server_create(cq, nullptr);
+ return grpc_server_create(nullptr);
}
}
@@ -199,9 +199,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
- server_(CreateServer(cq_.cq(), max_message_size)),
+ server_(CreateServer(max_message_size)),
thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned) {}
+ thread_pool_owned_(thread_pool_owned) {
+ grpc_server_register_completion_queue(server_, cq_.cq());
+}
Server::~Server() {
{
@@ -221,8 +223,7 @@ Server::~Server() {
bool Server::RegisterService(RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
- void* tag =
- grpc_server_register_method(server_, method->name(), nullptr, cq_.cq());
+ void* tag = grpc_server_register_method(server_, method->name(), nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
@@ -240,9 +241,8 @@ bool Server::RegisterAsyncService(AsynchronousService* service) {
service->dispatch_impl_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
- void* tag =
- grpc_server_register_method(server_, service->method_names_[i], nullptr,
- service->completion_queue()->cq());
+ void* tag = grpc_server_register_method(server_, service->method_names_[i],
+ nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
service->method_names_[i]);
@@ -273,7 +273,7 @@ bool Server::Start() {
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
- m->Request(server_);
+ m->Request(server_, cq_.cq());
}
ScheduleCallback();
@@ -316,12 +316,13 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
- void* tag)
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
: tag_(tag),
request_(request),
stream_(stream),
- cq_(cq),
+ call_cq_(call_cq),
+ notification_cq_(notification_cq),
ctx_(ctx),
generic_ctx_(nullptr),
server_(server),
@@ -329,18 +330,22 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
grpc_server_request_registered_call(
server->server_, registered_method, &call_, &call_details_.deadline,
- &array_, request ? &payload_ : nullptr, cq->cq(), this);
+ &array_, request ? &payload_ : nullptr, call_cq->cq(),
+ notification_cq->cq(), this);
}
AsyncRequest(Server* server, GenericServerContext* ctx,
- ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
- void* tag)
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
: tag_(tag),
request_(nullptr),
stream_(stream),
- cq_(cq),
+ call_cq_(call_cq),
+ notification_cq_(notification_cq),
ctx_(nullptr),
generic_ctx_(ctx),
server_(server),
@@ -348,8 +353,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
- cq->cq(), this);
+ call_cq->cq(), notification_cq->cq(), this);
}
~AsyncRequest() {
@@ -392,8 +399,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
}
}
ctx->call_ = call_;
- ctx->cq_ = cq_;
- Call call(call_, server_, cq_, server_->max_message_size_);
+ ctx->cq_ = call_cq_;
+ Call call(call_, server_, call_cq_, server_->max_message_size_);
if (orig_status && call_) {
ctx->BeginCompletionOp(&call);
}
@@ -407,7 +414,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
grpc::protobuf::Message* const request_;
ServerAsyncStreamingInterface* const stream_;
- CompletionQueue* const cq_;
+ CompletionQueue* const call_cq_;
+ ServerCompletionQueue* const notification_cq_;
ServerContext* const ctx_;
GenericServerContext* const generic_ctx_;
Server* const server_;
@@ -420,14 +428,19 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
- new AsyncRequest(this, registered_method, context, request, stream, cq, tag);
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
+ new AsyncRequest(this, registered_method, context, request, stream, call_cq,
+ notification_cq, tag);
}
void Server::RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
- new AsyncRequest(this, context, stream, cq, tag);
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
+ new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
}
void Server::ScheduleCallback() {
@@ -446,7 +459,7 @@ void Server::RunRpc() {
ScheduleCallback();
if (ok) {
SyncRequest::CallData cd(this, mrd);
- mrd->Request(server_);
+ mrd->Request(server_, cq_.cq());
cd.Run();
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index e48d1eeb42..4bcbd82952 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -44,6 +44,12 @@ namespace grpc {
ServerBuilder::ServerBuilder()
: max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {}
+std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
+ ServerCompletionQueue* cq = new ServerCompletionQueue();
+ cqs_.push_back(cq);
+ return std::unique_ptr<ServerCompletionQueue>(cq);
+}
+
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service());
}
@@ -88,6 +94,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_));
+ for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
+ grpc_server_register_completion_queue(server->server_, (*cq)->cq());
+ }
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService(*service)) {
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index 5e278ca66c..ad97084502 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -99,7 +99,8 @@ void test_connect(const char *server_host, const char *client_host, int port,
/* Create server. */
server_cq = grpc_completion_queue_create();
- server = grpc_server_create(server_cq, NULL);
+ server = grpc_server_create(NULL);
+ grpc_server_register_completion_queue(server, server_cq);
GPR_ASSERT((got_port = grpc_server_add_http2_port(server, server_hostport)) >
0);
if (port == 0) {
@@ -155,10 +156,10 @@ void test_connect(const char *server_host, const char *client_host, int port,
if (expect_ok) {
/* Check for a successful request. */
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(server, &s,
- &call_details,
- &request_metadata_recv,
- server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(server, &s, &call_details,
+ &request_metadata_recv, server_cq,
+ server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c
index 929f1f50db..c94ee94d53 100644
--- a/test/core/end2end/fixtures/chttp2_fake_security.c
+++ b/test/core/end2end/fixtures/chttp2_fake_security.c
@@ -82,8 +82,8 @@ static void chttp2_init_server_secure_fullstack(
if (f->server) {
grpc_server_destroy(f->server);
}
- f->server =
- grpc_server_create(f->server_cq, server_args);
+ f->server = grpc_server_create(server_args);
+ grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds));
grpc_server_credentials_release(server_creds);
grpc_server_start(f->server);
diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c
index d7de5e5434..f92b40efeb 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack.c
@@ -83,7 +83,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
if (f->server) {
grpc_server_destroy(f->server);
}
- f->server = grpc_server_create(f->server_cq, server_args);
+ f->server = grpc_server_create(server_args);
+ grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server);
}
diff --git a/test/core/end2end/fixtures/chttp2_fullstack_uds.c b/test/core/end2end/fixtures/chttp2_fullstack_uds.c
index 53803b0f1d..876782df84 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack_uds.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack_uds.c
@@ -88,7 +88,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
if (f->server) {
grpc_server_destroy(f->server);
}
- f->server = grpc_server_create(f->server_cq, server_args);
+ f->server = grpc_server_create(server_args);
+ grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server);
}
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
index 9c4086d79d..36ac4e46a3 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
@@ -85,8 +85,8 @@ static void chttp2_init_server_secure_fullstack(
if (f->server) {
grpc_server_destroy(f->server);
}
- f->server =
- grpc_server_create(f->server_cq, server_args);
+ f->server = grpc_server_create(server_args);
+ grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds));
grpc_server_credentials_release(server_creds);
grpc_server_start(f->server);
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
index e9e1c5f838..4bfd923e83 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
@@ -83,8 +83,8 @@ static void chttp2_init_server_secure_fullstack(
if (f->server) {
grpc_server_destroy(f->server);
}
- f->server =
- grpc_server_create(f->server_cq, server_args);
+ f->server = grpc_server_create(server_args);
+ grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds));
grpc_server_credentials_release(server_creds);
grpc_server_start(f->server);
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c
index d19ceb178b..43ebf7eed5 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair.c
@@ -117,8 +117,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) {
grpc_endpoint_pair *sfd = f->fixture_data;
GPR_ASSERT(!f->server);
- f->server =
- grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args);
+ f->server = grpc_server_create_from_filters(NULL, 0, server_args);
+ grpc_server_register_completion_queue(f->server, f->server_cq);
grpc_server_start(f->server);
grpc_create_chttp2_transport(server_setup_transport, f, server_args,
sfd->server, NULL, 0, grpc_mdctx_create(), 0);
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
index ddde585b83..385d5a4e81 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
@@ -117,8 +117,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) {
grpc_endpoint_pair *sfd = f->fixture_data;
GPR_ASSERT(!f->server);
- f->server =
- grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args);
+ f->server = grpc_server_create_from_filters(NULL, 0, server_args);
+ grpc_server_register_completion_queue(f->server, f->server_cq);
grpc_server_start(f->server);
grpc_create_chttp2_transport(server_setup_transport, f, server_args,
sfd->server, NULL, 0, grpc_mdctx_create(), 0);
diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c
index 21057969d9..0adc437db0 100644
--- a/test/core/end2end/tests/cancel_after_accept.c
+++ b/test/core/end2end/tests/cancel_after_accept.c
@@ -161,9 +161,10 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
- f.server, &s, &call_details,
- &request_metadata_recv, f.server_cq, tag(2)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(2)));
cq_expect_completion(v_server, tag(2), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
index f8733ef444..0b20a97559 100644
--- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
+++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
@@ -163,9 +163,10 @@ static void test_cancel_after_accept_and_writes_closed(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
- f.server, &s, &call_details,
- &request_metadata_recv, f.server_cq, tag(2)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(2)));
cq_expect_completion(v_server, tag(2), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c
index 67c769c08b..13bf31584d 100644
--- a/test/core/end2end/tests/census_simple_request.c
+++ b/test/core/end2end/tests/census_simple_request.c
@@ -142,10 +142,10 @@ static void test_body(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c
index c8e22ce11c..29c023c72a 100644
--- a/test/core/end2end/tests/disappearing_server.c
+++ b/test/core/end2end/tests/disappearing_server.c
@@ -133,10 +133,10 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s,
- &call_details,
- &request_metadata_recv,
- f->server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f->server, &s, &call_details,
+ &request_metadata_recv, f->server_cq,
+ f->server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
index 2c2d2e895b..c293551663 100644
--- a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
+++ b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
@@ -148,10 +148,10 @@ static void test_early_server_shutdown_finishes_inflight_calls(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
index 96978a8cb9..8801dae98a 100644
--- a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
+++ b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
@@ -115,10 +115,10 @@ static void test_early_server_shutdown_finishes_tags(
/* upon shutdown, the server should finish all requested calls indicating
no new call */
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
grpc_server_shutdown(f.server);
cq_expect_completion(v_server, tag(101), GRPC_OP_ERROR);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c
index d084530a9c..2a8cf098eb 100644
--- a/test/core/end2end/tests/graceful_server_shutdown.c
+++ b/test/core/end2end/tests/graceful_server_shutdown.c
@@ -147,10 +147,10 @@ static void test_early_server_shutdown_finishes_inflight_calls(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c
index d9d9e934cb..98bcf9ada9 100644
--- a/test/core/end2end/tests/invoke_large_request.c
+++ b/test/core/end2end/tests/invoke_large_request.c
@@ -165,10 +165,10 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c
index 6e95a6c5f8..e25b115d33 100644
--- a/test/core/end2end/tests/max_concurrent_streams.c
+++ b/test/core/end2end/tests/max_concurrent_streams.c
@@ -145,10 +145,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
@@ -254,10 +254,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
"foo.test.google.fr:1234", deadline);
GPR_ASSERT(c2);
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s1,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s1, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -342,10 +342,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
cq_expect_completion(v_client, tag(live_call + 1), GRPC_OP_OK);
cq_verify(v_client);
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s2,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(201)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s2, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(201)));
cq_expect_completion(v_server, tag(201), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c
index 6291f773b3..3f8112d341 100644
--- a/test/core/end2end/tests/max_message_length.c
+++ b/test/core/end2end/tests/max_message_length.c
@@ -164,10 +164,10 @@ static void test_max_message_length(grpc_end2end_test_config config) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c
index fe02f25875..c125664115 100644
--- a/test/core/end2end/tests/ping_pong_streaming.c
+++ b/test/core/end2end/tests/ping_pong_streaming.c
@@ -153,10 +153,10 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(100)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(100)));
cq_expect_completion(v_server, tag(100), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c
index 05b7a1dad0..04c3d5293c 100644
--- a/test/core/end2end/tests/registered_call.c
+++ b/test/core/end2end/tests/registered_call.c
@@ -146,10 +146,10 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
index 0169d52059..281091cdf9 100644
--- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
@@ -181,10 +181,10 @@ static void test_request_response_with_metadata_and_payload(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
index dc49242d39..1590aa23fa 100644
--- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
@@ -167,10 +167,10 @@ static void test_request_response_with_metadata_and_payload(
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c
index 92036590a7..b94b6761eb 100644
--- a/test/core/end2end/tests/request_response_with_payload.c
+++ b/test/core/end2end/tests/request_response_with_payload.c
@@ -159,10 +159,10 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c
index c5b4e0c57e..bf8309914e 100644
--- a/test/core/end2end/tests/request_with_large_metadata.c
+++ b/test/core/end2end/tests/request_with_large_metadata.c
@@ -163,10 +163,10 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c
index 63b7c5ee40..5fe69e9109 100644
--- a/test/core/end2end/tests/request_with_payload.c
+++ b/test/core/end2end/tests/request_with_payload.c
@@ -154,10 +154,10 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c
index 0dbb35d454..e025fd1a1e 100644
--- a/test/core/end2end/tests/simple_delayed_request.c
+++ b/test/core/end2end/tests/simple_delayed_request.c
@@ -141,10 +141,10 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
config.init_server(f, server_args);
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s,
- &call_details,
- &request_metadata_recv,
- f->server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f->server, &s, &call_details,
+ &request_metadata_recv, f->server_cq,
+ f->server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c
index 4d4d48a211..271bdc56ca 100644
--- a/test/core/end2end/tests/simple_request.c
+++ b/test/core/end2end/tests/simple_request.c
@@ -147,10 +147,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
index 538291a5f2..3b5393f660 100644
--- a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
+++ b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
@@ -147,10 +147,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
- &call_details,
- &request_metadata_recv,
- f.server_cq, tag(101)));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.server_cq,
+ f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/fling/server.c b/test/core/fling/server.c
index 63c7bd7f88..8eab534177 100644
--- a/test/core/fling/server.c
+++ b/test/core/fling/server.c
@@ -89,7 +89,7 @@ typedef struct {
static void request_call(void) {
grpc_metadata_array_init(&request_metadata_recv);
grpc_server_request_call(server, &call, &call_details, &request_metadata_recv,
- cq, tag(FLING_SERVER_NEW_REQUEST));
+ cq, cq, tag(FLING_SERVER_NEW_REQUEST));
}
static void handle_unary_method(void) {
@@ -206,13 +206,14 @@ int main(int argc, char **argv) {
test_server1_cert};
grpc_server_credentials *ssl_creds =
grpc_ssl_server_credentials_create(NULL, &pem_key_cert_pair, 1);
- server = grpc_server_create(cq, NULL);
+ server = grpc_server_create(NULL);
GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds));
grpc_server_credentials_release(ssl_creds);
} else {
- server = grpc_server_create(cq, NULL);
+ server = grpc_server_create(NULL);
GPR_ASSERT(grpc_server_add_http2_port(server, addr));
}
+ grpc_server_register_completion_queue(server, cq);
grpc_server_start(server);
gpr_free(addr_buf);
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 6c0dfadbb9..d7c190dade 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -91,7 +91,7 @@ void verify_timed_ok(
class AsyncEnd2endTest : public ::testing::Test {
protected:
- AsyncEnd2endTest() : service_(&srv_cq_) {}
+ AsyncEnd2endTest() {}
void SetUp() GRPC_OVERRIDE {
int port = grpc_pick_unused_port_or_die();
@@ -100,6 +100,7 @@ class AsyncEnd2endTest : public ::testing::Test {
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials());
builder.RegisterAsyncService(&service_);
+ srv_cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
}
@@ -108,10 +109,10 @@ class AsyncEnd2endTest : public ::testing::Test {
void* ignored_tag;
bool ignored_ok;
cli_cq_.Shutdown();
- srv_cq_.Shutdown();
+ srv_cq_->Shutdown();
while (cli_cq_.Next(&ignored_tag, &ignored_ok))
;
- while (srv_cq_.Next(&ignored_tag, &ignored_ok))
+ while (srv_cq_->Next(&ignored_tag, &ignored_ok))
;
}
@@ -121,9 +122,9 @@ class AsyncEnd2endTest : public ::testing::Test {
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}
- void server_ok(int i) { verify_ok(&srv_cq_, i, true); }
+ void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
- void server_fail(int i) { verify_ok(&srv_cq_, i, false); }
+ void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
void SendRpc(int num_rpcs) {
@@ -142,8 +143,8 @@ class AsyncEnd2endTest : public ::testing::Test {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
- tag(2));
+ service_.RequestEcho(&srv_ctx, &recv_request, &response_writer,
+ srv_cq_.get(), srv_cq_.get(), tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -162,7 +163,7 @@ class AsyncEnd2endTest : public ::testing::Test {
}
CompletionQueue cli_cq_;
- CompletionQueue srv_cq_;
+ std::unique_ptr<ServerCompletionQueue> srv_cq_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server_;
grpc::cpp::test::util::TestService::AsyncService service_;
@@ -200,19 +201,19 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::time_point time_now(
std::chrono::system_clock::now()),
time_limit(std::chrono::system_clock::now() + std::chrono::seconds(5));
- verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
+ verify_timed_ok(srv_cq_.get(), -1, true, time_now, CompletionQueue::TIMEOUT);
verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
- tag(2));
+ service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
- verify_timed_ok(&srv_cq_, 2, true, time_limit);
+ verify_timed_ok(srv_cq_.get(), 2, true, time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
verify_timed_ok(&cli_cq_, 1, true, time_limit);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- verify_timed_ok(&srv_cq_, 3, true);
+ verify_timed_ok(srv_cq_.get(), 3, true);
response_reader->Finish(&recv_response, &recv_status, tag(4));
verify_timed_ok(&cli_cq_, 4, true);
@@ -238,7 +239,8 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1)));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2));
+ service_.RequestRequestStream(&srv_ctx, &srv_stream, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
server_ok(2);
client_ok(1);
@@ -291,8 +293,8 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, &srv_cq_,
- tag(2));
+ service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ srv_cq_.get(), srv_cq_.get(), tag(2));
server_ok(2);
client_ok(1);
@@ -342,7 +344,8 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2));
+ service_.RequestBidiStream(&srv_ctx, &srv_stream, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
server_ok(2);
client_ok(1);
@@ -400,8 +403,8 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
- tag(2));
+ service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@@ -442,8 +445,8 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
- tag(2));
+ service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
@@ -490,8 +493,8 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
- tag(2));
+ service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
@@ -551,8 +554,8 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
- tag(2));
+ service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc
index 103f613f70..80e43fd854 100644
--- a/test/cpp/end2end/generic_end2end_test.cc
+++ b/test/cpp/end2end/generic_end2end_test.cc
@@ -109,6 +109,7 @@ class GenericEnd2endTest : public ::testing::Test {
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(), InsecureServerCredentials());
builder.RegisterAsyncGenericService(&generic_service_);
+ srv_cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
}
@@ -117,10 +118,10 @@ class GenericEnd2endTest : public ::testing::Test {
void* ignored_tag;
bool ignored_ok;
cli_cq_.Shutdown();
- srv_cq_.Shutdown();
+ srv_cq_->Shutdown();
while (cli_cq_.Next(&ignored_tag, &ignored_ok))
;
- while (srv_cq_.Next(&ignored_tag, &ignored_ok))
+ while (srv_cq_->Next(&ignored_tag, &ignored_ok))
;
}
@@ -130,9 +131,9 @@ class GenericEnd2endTest : public ::testing::Test {
generic_stub_.reset(new GenericStub(channel));
}
- void server_ok(int i) { verify_ok(&srv_cq_, i, true); }
+ void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
- void server_fail(int i) { verify_ok(&srv_cq_, i, false); }
+ void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
void SendRpc(int num_rpcs) {
@@ -160,9 +161,10 @@ class GenericEnd2endTest : public ::testing::Test {
call->WritesDone(tag(3));
client_ok(3);
- generic_service_.RequestCall(&srv_ctx, &stream, &srv_cq_, tag(4));
+ generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
+ srv_cq_.get(), tag(4));
- verify_ok(generic_service_.completion_queue(), 4, true);
+ verify_ok(srv_cq_.get(), 4, true);
EXPECT_EQ(server_address_.str(), srv_ctx.host());
EXPECT_EQ(kMethodName, srv_ctx.method());
ByteBuffer recv_buffer;
@@ -193,7 +195,7 @@ class GenericEnd2endTest : public ::testing::Test {
}
CompletionQueue cli_cq_;
- CompletionQueue srv_cq_;
+ std::unique_ptr<ServerCompletionQueue> srv_cq_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<grpc::GenericStub> generic_stub_;
std::unique_ptr<Server> server_;
@@ -230,9 +232,10 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
client_ok(1);
- generic_service_.RequestCall(&srv_ctx, &srv_stream, &srv_cq_, tag(2));
+ generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
- verify_ok(generic_service_.completion_queue(), 2, true);
+ verify_ok(srv_cq_.get(), 2, true);
EXPECT_EQ(server_address_.str(), srv_ctx.host());
EXPECT_EQ(kMethodName, srv_ctx.method());
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index b19c443c82..6cb3192908 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -63,9 +63,7 @@ namespace testing {
class AsyncQpsServerTest : public Server {
public:
- AsyncQpsServerTest(const ServerConfig& config, int port)
- : srv_cq_(), async_service_(&srv_cq_), server_(nullptr),
- shutdown_(false) {
+ AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
@@ -74,15 +72,17 @@ class AsyncQpsServerTest : public Server {
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
+ srv_cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
using namespace std::placeholders;
- request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall,
- &async_service_, _1, _2, _3, &srv_cq_, _4);
+ request_unary_ =
+ std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_,
+ _1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4);
request_streaming_ =
- std::bind(&TestService::AsyncService::RequestStreamingCall,
- &async_service_, _1, _2, &srv_cq_, _3);
+ std::bind(&TestService::AsyncService::RequestStreamingCall,
+ &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3);
for (int i = 0; i < 100; i++) {
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
@@ -96,7 +96,7 @@ class AsyncQpsServerTest : public Server {
// Wait until work is available or we are shutting down
bool ok;
void* got_tag;
- while (srv_cq_.Next(&got_tag, &ok)) {
+ while (srv_cq_->Next(&got_tag, &ok)) {
ServerRpcContext* ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
if (ctx->RunNextState(ok) == false) {
@@ -116,7 +116,7 @@ class AsyncQpsServerTest : public Server {
{
std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true;
- srv_cq_.Shutdown();
+ srv_cq_->Shutdown();
}
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
@@ -290,10 +290,10 @@ class AsyncQpsServerTest : public Server {
}
return Status::OK;
}
- CompletionQueue srv_cq_;
- TestService::AsyncService async_service_;
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
+ std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
+ TestService::AsyncService async_service_;
std::function<void(ServerContext*, SimpleRequest*,
grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)>
request_unary_;