diff options
author | Yash Tibrewal <yashkt@google.com> | 2019-01-07 10:09:06 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2019-01-07 10:09:06 -0800 |
commit | dd067fd39034d42bf9a1799d09c1c2a7daedc61a (patch) | |
tree | edb1db69a990b7ade9c25f2e4890ede8596c703f /include | |
parent | 7d1491d64c9b6279233c780d290c514a7040c1c7 (diff) | |
parent | 46bd2f7adb926053345665d5c487fa20acd2b5b0 (diff) |
Merge branch 'master' into nocopyinterception
Diffstat (limited to 'include')
-rw-r--r-- | include/grpcpp/impl/codegen/call_op_set.h | 46 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/interceptor.h | 16 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/interceptor_common.h | 45 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_interface.h | 2 |
4 files changed, 97 insertions, 12 deletions
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index ced0b2ff3e..2c34082ebb 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -317,11 +317,15 @@ class CallOpSendMessage { protected: void AddOp(grpc_op* ops, size_t* nops) { - if ((msg_ == nullptr && !send_buf_.Valid()) || hijacked_) return; + if (msg_ == nullptr && !send_buf_.Valid()) return; + if (hijacked_) { + serializer_ = nullptr; + return; + } if (msg_ != nullptr) { GPR_CODEGEN_ASSERT(serializer_(msg_).ok()); + serializer_ = nullptr; } - serializer_ = nullptr; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_MESSAGE; op->flags = write_options_.flags(); @@ -330,21 +334,38 @@ class CallOpSendMessage { // Flags are per-message: clear them after use. write_options_.Clear(); } - void FinishOp(bool* status) { send_buf_.Clear(); } + void FinishOp(bool* status) { + if (msg_ == nullptr && !send_buf_.Valid()) return; + if (hijacked_ && failed_send_) { + // Hijacking interceptor failed this Op + *status = false; + } else if (!*status) { + // This Op was passed down to core and the Op failed + failed_send_ = true; + } + } void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { if (msg_ == nullptr && !send_buf_.Valid()) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); - interceptor_methods->SetSendMessage(&send_buf_, &msg_, serializer_); + interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_, + serializer_); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { + if (msg_ != nullptr || send_buf_.Valid()) { + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_SEND_MESSAGE); + } + send_buf_.Clear(); + msg_ = nullptr; // The contents of the SendMessage value that was previously set // has had its references stolen by core's operations - interceptor_methods->SetSendMessage(nullptr, nullptr, nullptr); + interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_, + nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { @@ -354,6 +375,7 @@ class CallOpSendMessage { private: const void* msg_ = nullptr; // The original non-serialized message bool hijacked_ = false; + bool failed_send_ = false; ByteBuffer send_buf_; WriteOptions write_options_; std::function<Status(const void*)> serializer_; @@ -379,6 +401,7 @@ Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { // Serialize immediately only if we do not have access to the message pointer if (msg_ == nullptr) { return serializer_(&message); + serializer_ = nullptr; } return Status(); } @@ -449,14 +472,16 @@ class CallOpRecvMessage { void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - interceptor_methods->SetRecvMessage(message_); + if (message_ == nullptr) return; + interceptor_methods->SetRecvMessage(message_, &got_message); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - if (!got_message) return; + if (message_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; @@ -544,20 +569,23 @@ class CallOpGenericRecvMessage { void SetInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - interceptor_methods->SetRecvMessage(message_); + if (!deserialize_) return; + interceptor_methods->SetRecvMessage(message_, &got_message); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { - if (!got_message) return; + if (!deserialize_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (!deserialize_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_RECV_MESSAGE); + got_message = true; } private: diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index edf3ab49f1..d749d8578a 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -46,9 +46,10 @@ namespace experimental { /// operation has been requested and it is available. POST_RECV means that a /// result is available but has not yet been passed back to the application. enum class InterceptionHookPoints { - /// The first two in this list are for clients and servers + /// The first three in this list are for clients and servers PRE_SEND_INITIAL_METADATA, PRE_SEND_MESSAGE, + POST_SEND_MESSAGE, PRE_SEND_STATUS, // server only PRE_SEND_CLOSE, // client only: WritesDone for stream; after write in unary /// The following three are for hijacked clients only and can only be @@ -119,6 +120,10 @@ class InterceptorBatchMethods { virtual void ModifySendMessage(const void* message) = 0; + /// Checks whether the SEND MESSAGE op succeeded. Valid for POST_SEND_MESSAGE + /// interceptions. + virtual bool GetSendMessageStatus() = 0; + /// Returns a modifiable multimap of the initial metadata to be sent. Valid /// for PRE_SEND_INITIAL_METADATA interceptions. A value of nullptr indicates /// that this field is not valid. @@ -164,6 +169,15 @@ class InterceptorBatchMethods { /// started from interceptors without infinite regress through the interceptor /// list. virtual std::unique_ptr<ChannelInterface> GetInterceptedChannel() = 0; + + /// On a hijacked RPC, an interceptor can decide to fail a PRE_RECV_MESSAGE + /// op. This would be a signal to the reader that there will be no more + /// messages, or the stream has failed or been cancelled. + virtual void FailHijackedRecvMessage() = 0; + + /// On a hijacked RPC/ to-be hijacked RPC, this can be called to fail a SEND + /// MESSAGE op + virtual void FailHijackedSendMessage() = 0; }; /// Interface for an interceptor. Interceptor authors must create a class diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h index 6fa6210dc3..33e46389b3 100644 --- a/include/grpcpp/impl/codegen/interceptor_common.h +++ b/include/grpcpp/impl/codegen/interceptor_common.h @@ -98,6 +98,10 @@ class InterceptorBatchMethodsImpl *orig_send_message_ = message; } + bool GetSendMessageStatus() override { + return !*fail_send_message_; + } + std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override { return send_initial_metadata_; } @@ -127,15 +131,23 @@ class InterceptorBatchMethodsImpl Status* GetRecvStatus() override { return recv_status_; } + void FailHijackedSendMessage() override { + GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>( + experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)]); + *fail_send_message_ = true; + } + std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata() override { return recv_trailing_metadata_->map(); } void SetSendMessage(ByteBuffer* buf, const void** msg, + bool* fail_send_message, std::function<Status(const void*)> serializer) { send_message_ = buf; orig_send_message_ = msg; + fail_send_message_ = fail_send_message; serializer_ = serializer; } @@ -156,7 +168,10 @@ class InterceptorBatchMethodsImpl send_trailing_metadata_ = metadata; } - void SetRecvMessage(void* message) { recv_message_ = message; } + void SetRecvMessage(void* message, bool* got_message) { + recv_message_ = message; + got_message_ = got_message; + } void SetRecvInitialMetadata(MetadataMap* map) { recv_initial_metadata_ = map; @@ -179,6 +194,12 @@ class InterceptorBatchMethodsImpl info->channel(), current_interceptor_index_ + 1)); } + void FailHijackedRecvMessage() override { + GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>( + experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)]); + *got_message_ = false; + } + // Clears all state void ClearState() { reverse_ = false; @@ -356,6 +377,7 @@ class InterceptorBatchMethodsImpl std::function<void(void)> callback_; ByteBuffer* send_message_ = nullptr; + bool* fail_send_message_ = nullptr; const void** orig_send_message_ = nullptr; std::function<Status(const void*)> serializer_; @@ -369,6 +391,7 @@ class InterceptorBatchMethodsImpl std::multimap<grpc::string, grpc::string>* send_trailing_metadata_ = nullptr; void* recv_message_ = nullptr; + bool* got_message_ = nullptr; MetadataMap* recv_initial_metadata_ = nullptr; @@ -410,6 +433,14 @@ class CancelInterceptorBatchMethods return nullptr; } + bool GetSendMessageStatus() override { + GPR_CODEGEN_ASSERT( + false && + "It is illegal to call GetSendMessageStatus on a method which " + "has a Cancel notification"); + return false; + } + const void* GetSendMessage() override { GPR_CODEGEN_ASSERT( false && @@ -490,6 +521,18 @@ class CancelInterceptorBatchMethods "method which has a Cancel notification"); return std::unique_ptr<ChannelInterface>(nullptr); } + + void FailHijackedRecvMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call FailHijackedRecvMessage on a " + "method which has a Cancel notification"); + } + + void FailHijackedSendMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call FailHijackedSendMessage on a " + "method which has a Cancel notification"); + } }; } // namespace internal } // namespace grpc diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index e0e2629827..890a5650d0 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -272,7 +272,7 @@ class ServerInterface : public internal::CallHook { /* Set interception point for recv message */ interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - interceptor_methods_.SetRecvMessage(request_); + interceptor_methods_.SetRecvMessage(request_, nullptr); return RegisteredAsyncRequest::FinalizeResult(tag, status); } |