aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-02-12 14:10:25 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-02-12 14:10:25 -0800
commit1c9a2a91ca3b917de982eb6aac6adc421b595f3e (patch)
treeb53b1d9d2a8588549b95a7372a8abf9e0c33750c /include
parent7478d9184f417f9147b24ce0c2806f4d3f0ee485 (diff)
Async API progress
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/impl/service_type.h66
-rw-r--r--include/grpc++/server.h15
-rw-r--r--include/grpc++/stream.h14
3 files changed, 77 insertions, 18 deletions
diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h
index 30654553ad..19432522df 100644
--- a/include/grpc++/impl/service_type.h
+++ b/include/grpc++/impl/service_type.h
@@ -34,10 +34,18 @@
#ifndef __GRPCPP_IMPL_SERVICE_TYPE_H__
#define __GRPCPP_IMPL_SERVICE_TYPE_H__
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
namespace grpc {
class RpcService;
class Server;
+class ServerContext;
+class Status;
class SynchronousService {
public:
@@ -45,19 +53,69 @@ class SynchronousService {
virtual RpcService* service() = 0;
};
+class ServerAsyncStreamingInterface {
+ public:
+ virtual ~ServerAsyncStreamingInterface() {}
+
+ virtual void SendInitialMetadata(void* tag) = 0;
+ virtual void Finish(const Status& status, void* tag) = 0;
+};
+
class AsynchronousService {
public:
- AsynchronousService(CompletionQueue* cq, const char** method_names, size_t method_count) : cq_(cq), method_names_(method_names), method_count_(method_count) {}
+ // this is Server, but in disguise to avoid a link dependency
+ class DispatchImpl {
+ public:
+ virtual void RequestAsyncCall(void* registered_method,
+ ServerContext* context,
+ ::google::protobuf::Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) = 0;
+ };
+
+ AsynchronousService(CompletionQueue* cq, const char** method_names,
+ size_t method_count)
+ : cq_(cq), method_names_(method_names), method_count_(method_count) {}
+
+ ~AsynchronousService();
CompletionQueue* completion_queue() const { return cq_; }
+ protected:
+ void RequestAsyncUnary(int index, ServerContext* context,
+ ::google::protobuf::Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
+ stream, cq, tag);
+ }
+ void RequestClientStreaming(int index, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
+ stream, cq, tag);
+ }
+ void RequestServerStreaming(int index, ServerContext* context,
+ ::google::protobuf::Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
+ stream, cq, tag);
+ }
+ void RequestBidiStreaming(int index, ServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
+ stream, cq, tag);
+ }
+
private:
friend class Server;
CompletionQueue* const cq_;
- Server* server_ = nullptr;
- const char**const method_names_;
+ DispatchImpl* dispatch_impl_ = nullptr;
+ const char** const method_names_;
size_t method_count_;
- std::vector<void*> request_args_;
+ void** request_args_ = nullptr;
};
} // namespace grpc
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 77aac75076..8050ef8c9d 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -42,6 +42,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/impl/call.h>
+#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
struct grpc_server;
@@ -60,7 +61,8 @@ class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
-class Server final : private CallHook {
+class Server final : private CallHook,
+ private AsynchronousService::DispatchImpl {
public:
~Server();
@@ -70,7 +72,8 @@ class Server final : private CallHook {
private:
friend class ServerBuilder;
- class MethodRequestData;
+ class SyncRequest;
+ class AsyncRequest;
// ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
@@ -91,6 +94,12 @@ class Server final : private CallHook {
void PerformOpsOnCall(CallOpBuffer* ops, Call* call) override;
+ // DispatchImpl
+ void RequestAsyncCall(void* registered_method, ServerContext* context,
+ ::google::protobuf::Message* request,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag);
+
// Completion queue.
CompletionQueue cq_;
@@ -102,7 +111,7 @@ class Server final : private CallHook {
int num_running_cb_;
std::condition_variable callback_cv_;
- std::list<MethodRequestData> methods_;
+ std::list<SyncRequest> sync_methods_;
// Pointer to the c grpc server.
grpc_server* server_;
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 6dc05bc9a6..c013afb141 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -39,6 +39,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
+#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
@@ -370,15 +371,6 @@ class ClientAsyncStreamingInterface {
virtual void Finish(Status* status, void* tag) = 0;
};
-class ServerAsyncStreamingInterface {
- public:
- virtual ~ServerAsyncStreamingInterface() {}
-
- virtual void SendInitialMetadata(void* tag) = 0;
-
- virtual void Finish(const Status& status, void* tag) = 0;
-};
-
// An interface that yields a sequence of R messages.
template <class R>
class AsyncReaderInterface {
@@ -580,11 +572,11 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
// TODO(yangg) Move out of stream.h
template <class W>
-class ServerAsyncResponseWriter final {
+class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(Call* call) : call_(call) {}
- virtual void Write(const W& msg, void* tag) override {
+ virtual void Write(const W& msg, void* tag) {
CallOpBuffer buf;
buf.Reset(tag);
buf.AddSendMessage(msg);