diff options
author | 2019-01-07 10:09:06 -0800 | |
---|---|---|
committer | 2019-01-07 10:09:06 -0800 | |
commit | dd067fd39034d42bf9a1799d09c1c2a7daedc61a (patch) | |
tree | edb1db69a990b7ade9c25f2e4890ede8596c703f /include/grpcpp/impl/codegen/call_op_set.h | |
parent | 7d1491d64c9b6279233c780d290c514a7040c1c7 (diff) | |
parent | 46bd2f7adb926053345665d5c487fa20acd2b5b0 (diff) |
Merge branch 'master' into nocopyinterception
Diffstat (limited to 'include/grpcpp/impl/codegen/call_op_set.h')
-rw-r--r-- | include/grpcpp/impl/codegen/call_op_set.h | 46 |
1 files changed, 37 insertions, 9 deletions
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index ced0b2ff3e..2c34082ebb 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -317,11 +317,15 @@ class CallOpSendMessage { protected: void AddOp(grpc_op* ops, size_t* nops) { - if ((msg_ == nullptr && !send_buf_.Valid()) || hijacked_) return; + if (msg_ == nullptr && !send_buf_.Valid()) return; + if (hijacked_) { + serializer_ = nullptr; + return; + } if (msg_ != nullptr) { GPR_CODEGEN_ASSERT(serializer_(msg_).ok()); + serializer_ = nullptr; } - serializer_ = nullptr; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_MESSAGE; op->flags = write_options_.flags(); @@ -330,21 +334,38 @@ class CallOpSendMessage { // Flags are per-message: clear them after use. write_options_.Clear(); } - void FinishOp(bool* status) { send_buf_.Clear(); } + void FinishOp(bool* status) { + if (msg_ == nullptr && !send_buf_.Valid()) return; + if (hijacked_ && failed_send_) { + // Hijacking interceptor failed this Op + *status = false; + } else if (!*status) { + // This Op was passed down to core and the Op failed + failed_send_ = true; + } + } void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { if (msg_ == nullptr && !send_buf_.Valid()) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); - interceptor_methods->SetSendMessage(&send_buf_, &msg_, serializer_); + interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_, + serializer_); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { + if (msg_ != nullptr || send_buf_.Valid()) { + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_SEND_MESSAGE); + } + send_buf_.Clear(); + msg_ = nullptr; // The contents of the SendMessage value that was previously set // has had its references stolen by core's operations - interceptor_methods->SetSendMessage(nullptr, nullptr, nullptr); + interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_, + nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { @@ -354,6 +375,7 @@ class CallOpSendMessage { private: const void* msg_ = nullptr; // The original non-serialized message bool hijacked_ = false; + bool failed_send_ = false; ByteBuffer send_buf_; WriteOptions write_options_; std::function<Status(const void*)> serializer_; @@ -379,6 +401,7 @@ Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { // Serialize immediately only if we do not have access to the message pointer if (msg_ == nullptr) { return serializer_(&message); + serializer_ = nullptr; } return Status(); } @@ -449,14 +472,16 @@ class CallOpRecvMessage { void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - interceptor_methods->SetRecvMessage(message_); + if (message_ == nullptr) return; + interceptor_methods->SetRecvMessage(message_, &got_message); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - if (!got_message) return; + if (message_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; @@ -544,20 +569,23 @@ class CallOpGenericRecvMessage { void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - interceptor_methods->SetRecvMessage(message_); + if (!deserialize_) return; + interceptor_methods->SetRecvMessage(message_, &got_message); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - if (!got_message) return; + if (!deserialize_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (!deserialize_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_RECV_MESSAGE); + got_message = true; } private: |