aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-08 19:03:33 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-16 14:10:02 -0700
commit63bdf4e2363a3c55edf8ddb9d089da88c31963f2 (patch)
treeadbadc9a7d9adc34a6110360d85d958332fee8b6 /src/cpp/server
parent5d831da9d135d7f1c58ff61bacb6e5a2787f05c9 (diff)
More changes for client interception
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/server_cc.cc21
-rw-r--r--src/cpp/server/server_context.cc28
2 files changed, 26 insertions, 23 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 27629f2be0..2faaf618a5 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -667,18 +667,7 @@ void Server::Wait() {
void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
internal::Call* call) {
- static const size_t MAX_OPS = 8;
- size_t nops = 0;
- grpc_op cops[MAX_OPS];
- ops->FillOps(call, cops, &nops);
- // TODO(vjpai): Use ops->cq_tag once this case supports callbacks
- auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
- if (result != GRPC_CALL_OK) {
- gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
- grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR,
- call->call(), cops, nops, ops);
- abort();
- }
+ ops->FillOps(call);
}
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
@@ -705,11 +694,13 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
context_->cq_ = call_cq_;
internal::Call call(call_, server_, call_cq_,
server_->max_receive_message_size());
+
+ // just the pointers inside call are copied here
+ auto* new_call = stream_->BindCall(std::move(call));
if (*status && call_) {
- context_->BeginCompletionOp(&call);
+ context_->BeginCompletionOp(new_call);
}
- // just the pointers inside call are copied here
- stream_->BindCall(&call);
+
*tag = tag_;
if (delete_on_finalize_) {
delete this;
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index cfa6c8d7e8..dd94a44e1d 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -47,7 +47,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
finalized_(false),
cancelled_(0) {}
- void FillOps(internal::Call* call, grpc_op* ops, size_t* nops) override;
+ void FillOps(internal::Call* call) override;
bool FinalizeResult(void** tag, bool* status) override;
bool CheckCancelled(CompletionQueue* cq) {
@@ -66,6 +66,17 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
void Unref();
+ // This will be called while interceptors are run if the RPC is a hijacked
+ // RPC. This should set hijacking state for each of the ops.
+ void SetHijackingState() override {}
+
+ /* Should be called after interceptors are done running */
+ void ContinueFillOpsAfterInterception() override {}
+
+ /* Should be called after interceptors are done running on the finalize result
+ * path */
+ void ContinueFinalizeResultAfterInterception() override {}
+
private:
bool CheckCancelledNoPluck() {
std::lock_guard<std::mutex> g(mu_);
@@ -88,13 +99,14 @@ void ServerContext::CompletionOp::Unref() {
}
}
-void ServerContext::CompletionOp::FillOps(internal::Call* call, grpc_op* ops,
- size_t* nops) {
- ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops->data.recv_close_on_server.cancelled = &cancelled_;
- ops->flags = 0;
- ops->reserved = nullptr;
- *nops = 1;
+void ServerContext::CompletionOp::FillOps(internal::Call* call) {
+ grpc_op ops;
+ ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops.data.recv_close_on_server.cancelled = &cancelled_;
+ ops.flags = 0;
+ ops.reserved = nullptr;
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call->call(), &ops, 1, cq_tag(), nullptr));
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {