diff options
author | Craig Tiller <ctiller@google.com> | 2015-02-12 14:10:25 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-02-12 14:10:25 -0800 |
commit | 1c9a2a91ca3b917de982eb6aac6adc421b595f3e (patch) | |
tree | b53b1d9d2a8588549b95a7372a8abf9e0c33750c /include | |
parent | 7478d9184f417f9147b24ce0c2806f4d3f0ee485 (diff) |
Async API progress
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/impl/service_type.h | 66 | ||||
-rw-r--r-- | include/grpc++/server.h | 15 | ||||
-rw-r--r-- | include/grpc++/stream.h | 14 |
3 files changed, 77 insertions, 18 deletions
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index 30654553ad..19432522df 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -34,10 +34,18 @@ #ifndef __GRPCPP_IMPL_SERVICE_TYPE_H__ #define __GRPCPP_IMPL_SERVICE_TYPE_H__ +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + namespace grpc { class RpcService; class Server; +class ServerContext; +class Status; class SynchronousService { public: @@ -45,19 +53,69 @@ class SynchronousService { virtual RpcService* service() = 0; }; +class ServerAsyncStreamingInterface { + public: + virtual ~ServerAsyncStreamingInterface() {} + + virtual void SendInitialMetadata(void* tag) = 0; + virtual void Finish(const Status& status, void* tag) = 0; +}; + class AsynchronousService { public: - AsynchronousService(CompletionQueue* cq, const char** method_names, size_t method_count) : cq_(cq), method_names_(method_names), method_count_(method_count) {} + // this is Server, but in disguise to avoid a link dependency + class DispatchImpl { + public: + virtual void RequestAsyncCall(void* registered_method, + ServerContext* context, + ::google::protobuf::Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) = 0; + }; + + AsynchronousService(CompletionQueue* cq, const char** method_names, + size_t method_count) + : cq_(cq), method_names_(method_names), method_count_(method_count) {} + + ~AsynchronousService(); CompletionQueue* completion_queue() const { return cq_; } + protected: + void RequestAsyncUnary(int index, ServerContext* context, + ::google::protobuf::Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, + stream, cq, tag); + } + void RequestClientStreaming(int index, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, + stream, cq, tag); + } + void RequestServerStreaming(int index, ServerContext* context, + ::google::protobuf::Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, + stream, cq, tag); + } + void RequestBidiStreaming(int index, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, + stream, cq, tag); + } + private: friend class Server; CompletionQueue* const cq_; - Server* server_ = nullptr; - const char**const method_names_; + DispatchImpl* dispatch_impl_ = nullptr; + const char** const method_names_; size_t method_count_; - std::vector<void*> request_args_; + void** request_args_ = nullptr; }; } // namespace grpc diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 77aac75076..8050ef8c9d 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -42,6 +42,7 @@ #include <grpc++/completion_queue.h> #include <grpc++/config.h> #include <grpc++/impl/call.h> +#include <grpc++/impl/service_type.h> #include <grpc++/status.h> struct grpc_server; @@ -60,7 +61,8 @@ class ServerCredentials; class ThreadPoolInterface; // Currently it only supports handling rpcs in a single thread. -class Server final : private CallHook { +class Server final : private CallHook, + private AsynchronousService::DispatchImpl { public: ~Server(); @@ -70,7 +72,8 @@ class Server final : private CallHook { private: friend class ServerBuilder; - class MethodRequestData; + class SyncRequest; + class AsyncRequest; // ServerBuilder use only Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, @@ -91,6 +94,12 @@ class Server final : private CallHook { void PerformOpsOnCall(CallOpBuffer* ops, Call* call) override; + // DispatchImpl + void RequestAsyncCall(void* registered_method, ServerContext* context, + ::google::protobuf::Message* request, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag); + // Completion queue. CompletionQueue cq_; @@ -102,7 +111,7 @@ class Server final : private CallHook { int num_running_cb_; std::condition_variable callback_cv_; - std::list<MethodRequestData> methods_; + std::list<SyncRequest> sync_methods_; // Pointer to the c grpc server. grpc_server* server_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 6dc05bc9a6..c013afb141 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -39,6 +39,7 @@ #include <grpc++/completion_queue.h> #include <grpc++/server_context.h> #include <grpc++/impl/call.h> +#include <grpc++/impl/service_type.h> #include <grpc++/status.h> #include <grpc/support/log.h> @@ -370,15 +371,6 @@ class ClientAsyncStreamingInterface { virtual void Finish(Status* status, void* tag) = 0; }; -class ServerAsyncStreamingInterface { - public: - virtual ~ServerAsyncStreamingInterface() {} - - virtual void SendInitialMetadata(void* tag) = 0; - - virtual void Finish(const Status& status, void* tag) = 0; -}; - // An interface that yields a sequence of R messages. template <class R> class AsyncReaderInterface { @@ -580,11 +572,11 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, // TODO(yangg) Move out of stream.h template <class W> -class ServerAsyncResponseWriter final { +class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(Call* call) : call_(call) {} - virtual void Write(const W& msg, void* tag) override { + virtual void Write(const W& msg, void* tag) { CallOpBuffer buf; buf.Reset(tag); buf.AddSendMessage(msg); |