diff options
author | Yang Gao <yangg@google.com> | 2015-05-12 12:37:24 -0700 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-05-12 12:37:24 -0700 |
commit | a468c36601dd5997580129bbd66b5ebed02521f8 (patch) | |
tree | da9bbea984725d8c16653805836877de7442dc7e /include | |
parent | 2e1229172c0b696fd77e400edecfb1ec562e31b2 (diff) | |
parent | 9ec7f5ab81b98408b1cd4094ff536358528534c0 (diff) |
Merge pull request #1493 from ctiller/churn-churn-churn-the-api-gently-down-the-stream
Completion queue binding for new requests API change
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/async_generic_service.h | 6 | ||||
-rw-r--r-- | include/grpc++/completion_queue.h | 7 | ||||
-rw-r--r-- | include/grpc++/impl/service_type.h | 37 | ||||
-rw-r--r-- | include/grpc++/server.h | 8 | ||||
-rw-r--r-- | include/grpc++/server_builder.h | 7 | ||||
-rw-r--r-- | include/grpc/grpc.h | 18 |
6 files changed, 55 insertions, 28 deletions
diff --git a/include/grpc++/async_generic_service.h b/include/grpc++/async_generic_service.h index 911d31cb1f..b435c6e73d 100644 --- a/include/grpc++/async_generic_service.h +++ b/include/grpc++/async_generic_service.h @@ -65,10 +65,8 @@ class AsyncGenericService GRPC_FINAL { void RequestCall(GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, - CompletionQueue* cq, void* tag); - - // The new rpc event should be obtained from this completion queue. - CompletionQueue* completion_queue(); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag); private: friend class Server; diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 5c2b1cce93..e8429c8f41 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -58,6 +58,7 @@ class ServerReaderWriter; class CompletionQueue; class Server; +class ServerBuilder; class ServerContext; class CompletionQueueTag { @@ -137,6 +138,12 @@ class CompletionQueue : public GrpcLibrary { grpc_completion_queue* cq_; // owned }; +class ServerCompletionQueue : public CompletionQueue { + private: + friend class ServerBuilder; + ServerCompletionQueue() {} +}; + } // namespace grpc #endif // GRPCXX_COMPLETION_QUEUE_H diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index 7cd3ddad6b..bc39bb82ac 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -39,8 +39,10 @@ namespace grpc { class Call; +class CompletionQueue; class RpcService; class Server; +class ServerCompletionQueue; class ServerContext; class Status; @@ -70,52 +72,55 @@ class AsynchronousService { ServerContext* context, ::grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) = 0; + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) = 0; }; - AsynchronousService(CompletionQueue* cq, const char** method_names, - size_t method_count) - : cq_(cq), - dispatch_impl_(nullptr), + AsynchronousService(const char** method_names, size_t method_count) + : dispatch_impl_(nullptr), method_names_(method_names), method_count_(method_count), request_args_(nullptr) {} ~AsynchronousService() { delete[] request_args_; } - CompletionQueue* completion_queue() const { return cq_; } - protected: void RequestAsyncUnary(int index, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestClientStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestServerStreaming(int index, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestBidiStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } private: friend class Server; - CompletionQueue* const cq_; DispatchImpl* dispatch_impl_; const char** const method_names_; size_t method_count_; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index b2b9044dca..50a2416321 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -101,11 +101,15 @@ class Server GRPC_FINAL : public GrpcLibrary, void RequestAsyncCall(void* registered_method, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) GRPC_OVERRIDE; + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) GRPC_OVERRIDE; void RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag); + CompletionQueue* cq, + ServerCompletionQueue* notification_cq, + void* tag); const int max_message_size_; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 7155c7fd46..ecee475e3e 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -46,6 +46,7 @@ class AsynchronousService; class CompletionQueue; class RpcService; class Server; +class ServerCompletionQueue; class ServerCredentials; class SynchronousService; class ThreadPoolInterface; @@ -82,6 +83,11 @@ class ServerBuilder { // Does not take ownership. void SetThreadPool(ThreadPoolInterface* thread_pool); + // Add a completion queue for handling asynchronous services + // Caller is required to keep this completion queue live until calling + // BuildAndStart() + std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(); + // Return a running server which is ready for processing rpcs. std::unique_ptr<Server> BuildAndStart(); @@ -96,6 +102,7 @@ class ServerBuilder { std::vector<RpcService*> services_; std::vector<AsynchronousService*> async_services_; std::vector<Port> ports_; + std::vector<ServerCompletionQueue*> cqs_; std::shared_ptr<ServerCredentials> creds_; AsyncGenericService* generic_service_; ThreadPoolInterface* thread_pool_; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 3348653956..0212b42188 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -462,7 +462,8 @@ void grpc_call_destroy(grpc_call *call); grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, - grpc_completion_queue *cq_bound_to_call, void *tag_new); + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag_new); /* Registers a method in the server. Methods to this (host, method) pair will not be reported by @@ -472,21 +473,26 @@ grpc_call_error grpc_server_request_call( Must be called before grpc_server_start. Returns NULL on failure. */ void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, - grpc_completion_queue *new_call_cq); + const char *host); /* Request notification of a new pre-registered call */ grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_bound_to_call, void *tag_new); + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag_new); /* Create a server. Additional configuration for each incoming channel can be specified with args. If no additional configuration is needed, args can be NULL. See grpc_channel_args for more. */ -grpc_server *grpc_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args); +grpc_server *grpc_server_create(const grpc_channel_args *args); + +/* Register a completion queue with the server. Must be done for any completion + queue that is passed to grpc_server_request_* call. Must be performed prior + to grpc_server_start. */ +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq); /* Add a HTTP2 over plaintext over tcp listener. Returns bound port number on success, 0 on failure. |