aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/channel_interface.h4
-rw-r--r--include/grpc++/impl/call.h13
-rw-r--r--include/grpc++/server.h8
-rw-r--r--src/cpp/common/call.cc6
-rw-r--r--src/cpp/server/server.cc16
5 files changed, 34 insertions, 13 deletions
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h
index 3631ea4d5d..b0366faabb 100644
--- a/include/grpc++/channel_interface.h
+++ b/include/grpc++/channel_interface.h
@@ -35,6 +35,7 @@
#define __GRPCPP_CHANNEL_INTERFACE_H__
#include <grpc++/status.h>
+#include <grpc++/impl/call.h>
namespace google {
namespace protobuf {
@@ -52,13 +53,12 @@ class CompletionQueue;
class RpcMethod;
class CallInterface;
-class ChannelInterface {
+class ChannelInterface : public CallHook {
public:
virtual ~ChannelInterface() {}
virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
CompletionQueue *cq) = 0;
- virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
};
} // namespace grpc
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index d0cb9024ba..bc1a3d847d 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -52,7 +52,7 @@ struct grpc_op;
namespace grpc {
-class ChannelInterface;
+class Call;
class CallOpBuffer final : public CompletionQueueTag {
public:
@@ -103,10 +103,17 @@ class CCallDeleter {
void operator()(grpc_call *c);
};
+// Channel and Server implement this to allow them to hook performing ops
+class CallHook {
+ public:
+ virtual ~CallHook() {}
+ virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
+};
+
// Straightforward wrapping of the C call object
class Call final {
public:
- Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq);
+ Call(grpc_call *call, CallHook *call_hook_, CompletionQueue *cq);
void PerformOps(CallOpBuffer *buffer);
@@ -114,7 +121,7 @@ class Call final {
CompletionQueue *cq() { return cq_; }
private:
- ChannelInterface *channel_;
+ CallHook *call_hook_;
CompletionQueue *cq_;
std::unique_ptr<grpc_call, CCallDeleter> call_;
};
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index daa3f0a661..98f3f17197 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -41,6 +41,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
+#include <grpc++/impl/call.h>
#include <grpc++/status.h>
struct grpc_server;
@@ -59,7 +60,7 @@ class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
-class Server {
+class Server final : private CallHook {
public:
~Server();
@@ -72,7 +73,8 @@ class Server {
class MethodRequestData;
// ServerBuilder use only
- Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds);
+ Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
+ ServerCredentials* creds);
Server();
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance.
@@ -86,6 +88,8 @@ class Server {
void RunRpc();
void ScheduleCallback();
+ void PerformOpsOnCall(CallOpBuffer* ops, Call* call) override;
+
// Completion queue.
CompletionQueue cq_;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index f97240d067..4d465e0a6f 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -184,11 +184,11 @@ void CCallDeleter::operator()(grpc_call* c) {
grpc_call_destroy(c);
}
-Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq)
- : channel_(channel), cq_(cq), call_(call) {}
+Call::Call(grpc_call* call, CallHook *call_hook, CompletionQueue* cq)
+ : call_hook_(call_hook), cq_(cq), call_(call) {}
void Call::PerformOps(CallOpBuffer* buffer) {
- channel_->PerformOpsOnCall(buffer, this);
+ call_hook_->PerformOpsOnCall(buffer, this);
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 049c3e36b2..8974850b8c 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -143,9 +143,9 @@ class Server::MethodRequestData final : public CompletionQueueTag {
class CallData {
public:
- explicit CallData(MethodRequestData *mrd)
+ explicit CallData(Server *server, MethodRequestData *mrd)
: cq_(mrd->cq_),
- call_(mrd->call_, nullptr, &cq_),
+ call_(mrd->call_, server, &cq_),
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
@@ -235,6 +235,16 @@ void Server::Shutdown() {
}
}
+void Server::PerformOpsOnCall(CallOpBuffer *buf, Call *call) {
+ static const size_t MAX_OPS = 8;
+ size_t nops = MAX_OPS;
+ grpc_op ops[MAX_OPS];
+ buf->FillOps(ops, &nops);
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call->call(), ops, nops,
+ buf));
+}
+
void Server::ScheduleCallback() {
{
std::unique_lock<std::mutex> lock(mu_);
@@ -248,7 +258,7 @@ void Server::RunRpc() {
bool ok;
auto *mrd = MethodRequestData::Wait(&cq_, &ok);
if (mrd) {
- MethodRequestData::CallData cd(mrd);
+ MethodRequestData::CallData cd(this, mrd);
if (ok) {
mrd->Request(server_);