diff options
-rw-r--r-- | include/grpc++/channel_interface.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 13 | ||||
-rw-r--r-- | include/grpc++/server.h | 8 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 6 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 16 |
5 files changed, 34 insertions, 13 deletions
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h index 3631ea4d5d..b0366faabb 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel_interface.h @@ -35,6 +35,7 @@ #define __GRPCPP_CHANNEL_INTERFACE_H__ #include <grpc++/status.h> +#include <grpc++/impl/call.h> namespace google { namespace protobuf { @@ -52,13 +53,12 @@ class CompletionQueue; class RpcMethod; class CallInterface; -class ChannelInterface { +class ChannelInterface : public CallHook { public: virtual ~ChannelInterface() {} virtual Call CreateCall(const RpcMethod &method, ClientContext *context, CompletionQueue *cq) = 0; - virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; }; } // namespace grpc diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index d0cb9024ba..bc1a3d847d 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -52,7 +52,7 @@ struct grpc_op; namespace grpc { -class ChannelInterface; +class Call; class CallOpBuffer final : public CompletionQueueTag { public: @@ -103,10 +103,17 @@ class CCallDeleter { void operator()(grpc_call *c); }; +// Channel and Server implement this to allow them to hook performing ops +class CallHook { + public: + virtual ~CallHook() {} + virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; +}; + // Straightforward wrapping of the C call object class Call final { public: - Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq); + Call(grpc_call *call, CallHook *call_hook_, CompletionQueue *cq); void PerformOps(CallOpBuffer *buffer); @@ -114,7 +121,7 @@ class Call final { CompletionQueue *cq() { return cq_; } private: - ChannelInterface *channel_; + CallHook *call_hook_; CompletionQueue *cq_; std::unique_ptr<grpc_call, CCallDeleter> call_; }; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index daa3f0a661..98f3f17197 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -41,6 +41,7 @@ #include <grpc++/completion_queue.h> #include <grpc++/config.h> +#include <grpc++/impl/call.h> #include <grpc++/status.h> struct grpc_server; @@ -59,7 +60,7 @@ class ServerCredentials; class ThreadPoolInterface; // Currently it only supports handling rpcs in a single thread. -class Server { +class Server final : private CallHook { public: ~Server(); @@ -72,7 +73,8 @@ class Server { class MethodRequestData; // ServerBuilder use only - Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds); + Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, + ServerCredentials* creds); Server(); // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance. @@ -86,6 +88,8 @@ class Server { void RunRpc(); void ScheduleCallback(); + void PerformOpsOnCall(CallOpBuffer* ops, Call* call) override; + // Completion queue. CompletionQueue cq_; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index f97240d067..4d465e0a6f 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -184,11 +184,11 @@ void CCallDeleter::operator()(grpc_call* c) { grpc_call_destroy(c); } -Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq) - : channel_(channel), cq_(cq), call_(call) {} +Call::Call(grpc_call* call, CallHook *call_hook, CompletionQueue* cq) + : call_hook_(call_hook), cq_(cq), call_(call) {} void Call::PerformOps(CallOpBuffer* buffer) { - channel_->PerformOpsOnCall(buffer, this); + call_hook_->PerformOpsOnCall(buffer, this); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 049c3e36b2..8974850b8c 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -143,9 +143,9 @@ class Server::MethodRequestData final : public CompletionQueueTag { class CallData { public: - explicit CallData(MethodRequestData *mrd) + explicit CallData(Server *server, MethodRequestData *mrd) : cq_(mrd->cq_), - call_(mrd->call_, nullptr, &cq_), + call_(mrd->call_, server, &cq_), ctx_(mrd->deadline_, mrd->request_metadata_.metadata, mrd->request_metadata_.count), has_request_payload_(mrd->has_request_payload_), @@ -235,6 +235,16 @@ void Server::Shutdown() { } } +void Server::PerformOpsOnCall(CallOpBuffer *buf, Call *call) { + static const size_t MAX_OPS = 8; + size_t nops = MAX_OPS; + grpc_op ops[MAX_OPS]; + buf->FillOps(ops, &nops); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call->call(), ops, nops, + buf)); +} + void Server::ScheduleCallback() { { std::unique_lock<std::mutex> lock(mu_); @@ -248,7 +258,7 @@ void Server::RunRpc() { bool ok; auto *mrd = MethodRequestData::Wait(&cq_, &ok); if (mrd) { - MethodRequestData::CallData cd(mrd); + MethodRequestData::CallData cd(this, mrd); if (ok) { mrd->Request(server_); |