aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2019-01-07 10:09:06 -0800
committerGravatar Yash Tibrewal <yashkt@google.com>2019-01-07 10:09:06 -0800
commitdd067fd39034d42bf9a1799d09c1c2a7daedc61a (patch)
treeedb1db69a990b7ade9c25f2e4890ede8596c703f /include
parent7d1491d64c9b6279233c780d290c514a7040c1c7 (diff)
parent46bd2f7adb926053345665d5c487fa20acd2b5b0 (diff)
Merge branch 'master' into nocopyinterception
Diffstat (limited to 'include')
-rw-r--r--include/grpcpp/impl/codegen/call_op_set.h46
-rw-r--r--include/grpcpp/impl/codegen/interceptor.h16
-rw-r--r--include/grpcpp/impl/codegen/interceptor_common.h45
-rw-r--r--include/grpcpp/impl/codegen/server_interface.h2
4 files changed, 97 insertions, 12 deletions
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h
index ced0b2ff3e..2c34082ebb 100644
--- a/include/grpcpp/impl/codegen/call_op_set.h
+++ b/include/grpcpp/impl/codegen/call_op_set.h
@@ -317,11 +317,15 @@ class CallOpSendMessage {
protected:
void AddOp(grpc_op* ops, size_t* nops) {
- if ((msg_ == nullptr && !send_buf_.Valid()) || hijacked_) return;
+ if (msg_ == nullptr && !send_buf_.Valid()) return;
+ if (hijacked_) {
+ serializer_ = nullptr;
+ return;
+ }
if (msg_ != nullptr) {
GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
+ serializer_ = nullptr;
}
- serializer_ = nullptr;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_MESSAGE;
op->flags = write_options_.flags();
@@ -330,21 +334,38 @@ class CallOpSendMessage {
// Flags are per-message: clear them after use.
write_options_.Clear();
}
- void FinishOp(bool* status) { send_buf_.Clear(); }
+ void FinishOp(bool* status) {
+ if (msg_ == nullptr && !send_buf_.Valid()) return;
+ if (hijacked_ && failed_send_) {
+ // Hijacking interceptor failed this Op
+ *status = false;
+ } else if (!*status) {
+ // This Op was passed down to core and the Op failed
+ failed_send_ = true;
+ }
+ }
void SetInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
if (msg_ == nullptr && !send_buf_.Valid()) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_MESSAGE);
- interceptor_methods->SetSendMessage(&send_buf_, &msg_, serializer_);
+ interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
+ serializer_);
}
void SetFinishInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
+ if (msg_ != nullptr || send_buf_.Valid()) {
+ interceptor_methods->AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_SEND_MESSAGE);
+ }
+ send_buf_.Clear();
+ msg_ = nullptr;
// The contents of the SendMessage value that was previously set
// has had its references stolen by core's operations
- interceptor_methods->SetSendMessage(nullptr, nullptr, nullptr);
+ interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
+ nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
@@ -354,6 +375,7 @@ class CallOpSendMessage {
private:
const void* msg_ = nullptr; // The original non-serialized message
bool hijacked_ = false;
+ bool failed_send_ = false;
ByteBuffer send_buf_;
WriteOptions write_options_;
std::function<Status(const void*)> serializer_;
@@ -379,6 +401,7 @@ Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
// Serialize immediately only if we do not have access to the message pointer
if (msg_ == nullptr) {
return serializer_(&message);
+ serializer_ = nullptr;
}
return Status();
}
@@ -449,14 +472,16 @@ class CallOpRecvMessage {
void SetInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
- interceptor_methods->SetRecvMessage(message_);
+ if (message_ == nullptr) return;
+ interceptor_methods->SetRecvMessage(message_, &got_message);
}
void SetFinishInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
- if (!got_message) return;
+ if (message_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
@@ -544,20 +569,23 @@ class CallOpGenericRecvMessage {
void SetInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
- interceptor_methods->SetRecvMessage(message_);
+ if (!deserialize_) return;
+ interceptor_methods->SetRecvMessage(message_, &got_message);
}
void SetFinishInterceptionHookPoint(
InterceptorBatchMethodsImpl* interceptor_methods) {
- if (!got_message) return;
+ if (!deserialize_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (!deserialize_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
+ got_message = true;
}
private:
diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h
index edf3ab49f1..d749d8578a 100644
--- a/include/grpcpp/impl/codegen/interceptor.h
+++ b/include/grpcpp/impl/codegen/interceptor.h
@@ -46,9 +46,10 @@ namespace experimental {
/// operation has been requested and it is available. POST_RECV means that a
/// result is available but has not yet been passed back to the application.
enum class InterceptionHookPoints {
- /// The first two in this list are for clients and servers
+ /// The first three in this list are for clients and servers
PRE_SEND_INITIAL_METADATA,
PRE_SEND_MESSAGE,
+ POST_SEND_MESSAGE,
PRE_SEND_STATUS, // server only
PRE_SEND_CLOSE, // client only: WritesDone for stream; after write in unary
/// The following three are for hijacked clients only and can only be
@@ -119,6 +120,10 @@ class InterceptorBatchMethods {
virtual void ModifySendMessage(const void* message) = 0;
+ /// Checks whether the SEND MESSAGE op succeeded. Valid for POST_SEND_MESSAGE
+ /// interceptions.
+ virtual bool GetSendMessageStatus() = 0;
+
/// Returns a modifiable multimap of the initial metadata to be sent. Valid
/// for PRE_SEND_INITIAL_METADATA interceptions. A value of nullptr indicates
/// that this field is not valid.
@@ -164,6 +169,15 @@ class InterceptorBatchMethods {
/// started from interceptors without infinite regress through the interceptor
/// list.
virtual std::unique_ptr<ChannelInterface> GetInterceptedChannel() = 0;
+
+ /// On a hijacked RPC, an interceptor can decide to fail a PRE_RECV_MESSAGE
+ /// op. This would be a signal to the reader that there will be no more
+ /// messages, or the stream has failed or been cancelled.
+ virtual void FailHijackedRecvMessage() = 0;
+
+ /// On a hijacked RPC/ to-be hijacked RPC, this can be called to fail a SEND
+ /// MESSAGE op
+ virtual void FailHijackedSendMessage() = 0;
};
/// Interface for an interceptor. Interceptor authors must create a class
diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h
index 6fa6210dc3..33e46389b3 100644
--- a/include/grpcpp/impl/codegen/interceptor_common.h
+++ b/include/grpcpp/impl/codegen/interceptor_common.h
@@ -98,6 +98,10 @@ class InterceptorBatchMethodsImpl
*orig_send_message_ = message;
}
+ bool GetSendMessageStatus() override {
+ return !*fail_send_message_;
+ }
+
std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override {
return send_initial_metadata_;
}
@@ -127,15 +131,23 @@ class InterceptorBatchMethodsImpl
Status* GetRecvStatus() override { return recv_status_; }
+ void FailHijackedSendMessage() override {
+ GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>(
+ experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)]);
+ *fail_send_message_ = true;
+ }
+
std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
override {
return recv_trailing_metadata_->map();
}
void SetSendMessage(ByteBuffer* buf, const void** msg,
+ bool* fail_send_message,
std::function<Status(const void*)> serializer) {
send_message_ = buf;
orig_send_message_ = msg;
+ fail_send_message_ = fail_send_message;
serializer_ = serializer;
}
@@ -156,7 +168,10 @@ class InterceptorBatchMethodsImpl
send_trailing_metadata_ = metadata;
}
- void SetRecvMessage(void* message) { recv_message_ = message; }
+ void SetRecvMessage(void* message, bool* got_message) {
+ recv_message_ = message;
+ got_message_ = got_message;
+ }
void SetRecvInitialMetadata(MetadataMap* map) {
recv_initial_metadata_ = map;
@@ -179,6 +194,12 @@ class InterceptorBatchMethodsImpl
info->channel(), current_interceptor_index_ + 1));
}
+ void FailHijackedRecvMessage() override {
+ GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>(
+ experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)]);
+ *got_message_ = false;
+ }
+
// Clears all state
void ClearState() {
reverse_ = false;
@@ -356,6 +377,7 @@ class InterceptorBatchMethodsImpl
std::function<void(void)> callback_;
ByteBuffer* send_message_ = nullptr;
+ bool* fail_send_message_ = nullptr;
const void** orig_send_message_ = nullptr;
std::function<Status(const void*)> serializer_;
@@ -369,6 +391,7 @@ class InterceptorBatchMethodsImpl
std::multimap<grpc::string, grpc::string>* send_trailing_metadata_ = nullptr;
void* recv_message_ = nullptr;
+ bool* got_message_ = nullptr;
MetadataMap* recv_initial_metadata_ = nullptr;
@@ -410,6 +433,14 @@ class CancelInterceptorBatchMethods
return nullptr;
}
+ bool GetSendMessageStatus() override {
+ GPR_CODEGEN_ASSERT(
+ false &&
+ "It is illegal to call GetSendMessageStatus on a method which "
+ "has a Cancel notification");
+ return false;
+ }
+
const void* GetSendMessage() override {
GPR_CODEGEN_ASSERT(
false &&
@@ -490,6 +521,18 @@ class CancelInterceptorBatchMethods
"method which has a Cancel notification");
return std::unique_ptr<ChannelInterface>(nullptr);
}
+
+ void FailHijackedRecvMessage() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call FailHijackedRecvMessage on a "
+ "method which has a Cancel notification");
+ }
+
+ void FailHijackedSendMessage() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call FailHijackedSendMessage on a "
+ "method which has a Cancel notification");
+ }
};
} // namespace internal
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h
index e0e2629827..890a5650d0 100644
--- a/include/grpcpp/impl/codegen/server_interface.h
+++ b/include/grpcpp/impl/codegen/server_interface.h
@@ -272,7 +272,7 @@ class ServerInterface : public internal::CallHook {
/* Set interception point for recv message */
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- interceptor_methods_.SetRecvMessage(request_);
+ interceptor_methods_.SetRecvMessage(request_, nullptr);
return RegisteredAsyncRequest::FinalizeResult(tag, status);
}