aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-05-12 12:37:24 -0700
committerGravatar Yang Gao <yangg@google.com>2015-05-12 12:37:24 -0700
commita468c36601dd5997580129bbd66b5ebed02521f8 (patch)
treeda9bbea984725d8c16653805836877de7442dc7e /include
parent2e1229172c0b696fd77e400edecfb1ec562e31b2 (diff)
parent9ec7f5ab81b98408b1cd4094ff536358528534c0 (diff)
Merge pull request #1493 from ctiller/churn-churn-churn-the-api-gently-down-the-stream
Completion queue binding for new requests API change
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/async_generic_service.h6
-rw-r--r--include/grpc++/completion_queue.h7
-rw-r--r--include/grpc++/impl/service_type.h37
-rw-r--r--include/grpc++/server.h8
-rw-r--r--include/grpc++/server_builder.h7
-rw-r--r--include/grpc/grpc.h18
6 files changed, 55 insertions, 28 deletions
diff --git a/include/grpc++/async_generic_service.h b/include/grpc++/async_generic_service.h
index 911d31cb1f..b435c6e73d 100644
--- a/include/grpc++/async_generic_service.h
+++ b/include/grpc++/async_generic_service.h
@@ -65,10 +65,8 @@ class AsyncGenericService GRPC_FINAL {
void RequestCall(GenericServerContext* ctx,
GenericServerAsyncReaderWriter* reader_writer,
- CompletionQueue* cq, void* tag);
-
- // The new rpc event should be obtained from this completion queue.
- CompletionQueue* completion_queue();
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag);
private:
friend class Server;
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 5c2b1cce93..e8429c8f41 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -58,6 +58,7 @@ class ServerReaderWriter;
class CompletionQueue;
class Server;
+class ServerBuilder;
class ServerContext;
class CompletionQueueTag {
@@ -137,6 +138,12 @@ class CompletionQueue : public GrpcLibrary {
grpc_completion_queue* cq_; // owned
};
+class ServerCompletionQueue : public CompletionQueue {
+ private:
+ friend class ServerBuilder;
+ ServerCompletionQueue() {}
+};
+
} // namespace grpc
#endif // GRPCXX_COMPLETION_QUEUE_H
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index 7cd3ddad6b..bc39bb82ac 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -39,8 +39,10 @@
namespace grpc {
class Call;
+class CompletionQueue;
class RpcService;
class Server;
+class ServerCompletionQueue;
class ServerContext;
class Status;
@@ -70,52 +72,55 @@ class AsynchronousService {
ServerContext* context,
::grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) = 0;
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) = 0;
};
- AsynchronousService(CompletionQueue* cq, const char** method_names,
- size_t method_count)
- : cq_(cq),
- dispatch_impl_(nullptr),
+ AsynchronousService(const char** method_names, size_t method_count)
+ : dispatch_impl_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
~AsynchronousService() { delete[] request_args_; }
- CompletionQueue* completion_queue() const { return cq_; }
-
protected:
void RequestAsyncUnary(int index, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestServerStreaming(int index, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) {
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
- stream, cq, tag);
+ stream, call_cq, notification_cq, tag);
}
private:
friend class Server;
- CompletionQueue* const cq_;
DispatchImpl* dispatch_impl_;
const char** const method_names_;
size_t method_count_;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index b2b9044dca..50a2416321 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -101,11 +101,15 @@ class Server GRPC_FINAL : public GrpcLibrary,
void RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag) GRPC_OVERRIDE;
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag) GRPC_OVERRIDE;
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* cq, void* tag);
+ CompletionQueue* cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag);
const int max_message_size_;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 7155c7fd46..ecee475e3e 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -46,6 +46,7 @@ class AsynchronousService;
class CompletionQueue;
class RpcService;
class Server;
+class ServerCompletionQueue;
class ServerCredentials;
class SynchronousService;
class ThreadPoolInterface;
@@ -82,6 +83,11 @@ class ServerBuilder {
// Does not take ownership.
void SetThreadPool(ThreadPoolInterface* thread_pool);
+ // Add a completion queue for handling asynchronous services
+ // Caller is required to keep this completion queue live until calling
+ // BuildAndStart()
+ std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
+
// Return a running server which is ready for processing rpcs.
std::unique_ptr<Server> BuildAndStart();
@@ -96,6 +102,7 @@ class ServerBuilder {
std::vector<RpcService*> services_;
std::vector<AsynchronousService*> async_services_;
std::vector<Port> ports_;
+ std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
AsyncGenericService* generic_service_;
ThreadPoolInterface* thread_pool_;
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 3348653956..0212b42188 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -462,7 +462,8 @@ void grpc_call_destroy(grpc_call *call);
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata,
- grpc_completion_queue *cq_bound_to_call, void *tag_new);
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag_new);
/* Registers a method in the server.
Methods to this (host, method) pair will not be reported by
@@ -472,21 +473,26 @@ grpc_call_error grpc_server_request_call(
Must be called before grpc_server_start.
Returns NULL on failure. */
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host,
- grpc_completion_queue *new_call_cq);
+ const char *host);
/* Request notification of a new pre-registered call */
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata,
grpc_byte_buffer **optional_payload,
- grpc_completion_queue *cq_bound_to_call, void *tag_new);
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag_new);
/* Create a server. Additional configuration for each incoming channel can
be specified with args. If no additional configuration is needed, args can
be NULL. See grpc_channel_args for more. */
-grpc_server *grpc_server_create(grpc_completion_queue *cq,
- const grpc_channel_args *args);
+grpc_server *grpc_server_create(const grpc_channel_args *args);
+
+/* Register a completion queue with the server. Must be done for any completion
+ queue that is passed to grpc_server_request_* call. Must be performed prior
+ to grpc_server_start. */
+void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq);
/* Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.