diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-10-08 19:03:33 -0700 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-10-16 14:10:02 -0700 |
commit | 63bdf4e2363a3c55edf8ddb9d089da88c31963f2 (patch) | |
tree | adbadc9a7d9adc34a6110360d85d958332fee8b6 /src/cpp/server | |
parent | 5d831da9d135d7f1c58ff61bacb6e5a2787f05c9 (diff) |
More changes for client interception
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/server_cc.cc | 21 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 28 |
2 files changed, 26 insertions, 23 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 27629f2be0..2faaf618a5 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -667,18 +667,7 @@ void Server::Wait() { void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, internal::Call* call) { - static const size_t MAX_OPS = 8; - size_t nops = 0; - grpc_op cops[MAX_OPS]; - ops->FillOps(call, cops, &nops); - // TODO(vjpai): Use ops->cq_tag once this case supports callbacks - auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); - if (result != GRPC_CALL_OK) { - gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result); - grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR, - call->call(), cops, nops, ops); - abort(); - } + ops->FillOps(call); } ServerInterface::BaseAsyncRequest::BaseAsyncRequest( @@ -705,11 +694,13 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, context_->cq_ = call_cq_; internal::Call call(call_, server_, call_cq_, server_->max_receive_message_size()); + + // just the pointers inside call are copied here + auto* new_call = stream_->BindCall(std::move(call)); if (*status && call_) { - context_->BeginCompletionOp(&call); + context_->BeginCompletionOp(new_call); } - // just the pointers inside call are copied here - stream_->BindCall(&call); + *tag = tag_; if (delete_on_finalize_) { delete this; diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index cfa6c8d7e8..dd94a44e1d 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -47,7 +47,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { finalized_(false), cancelled_(0) {} - void FillOps(internal::Call* call, grpc_op* ops, size_t* nops) override; + void FillOps(internal::Call* call) override; bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -66,6 +66,17 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { void Unref(); + // This will be called while interceptors are run if the RPC is a hijacked + // RPC. This should set hijacking state for each of the ops. + void SetHijackingState() override {} + + /* Should be called after interceptors are done running */ + void ContinueFillOpsAfterInterception() override {} + + /* Should be called after interceptors are done running on the finalize result + * path */ + void ContinueFinalizeResultAfterInterception() override {} + private: bool CheckCancelledNoPluck() { std::lock_guard<std::mutex> g(mu_); @@ -88,13 +99,14 @@ void ServerContext::CompletionOp::Unref() { } } -void ServerContext::CompletionOp::FillOps(internal::Call* call, grpc_op* ops, - size_t* nops) { - ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops->data.recv_close_on_server.cancelled = &cancelled_; - ops->flags = 0; - ops->reserved = nullptr; - *nops = 1; +void ServerContext::CompletionOp::FillOps(internal::Call* call) { + grpc_op ops; + ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops.data.recv_close_on_server.cancelled = &cancelled_; + ops.flags = 0; + ops.reserved = nullptr; + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call->call(), &ops, 1, cq_tag(), nullptr)); } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { |