aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-02 14:17:59 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-16 14:09:34 -0700
commit5d831da9d135d7f1c58ff61bacb6e5a2787f05c9 (patch)
tree83a688383b0fae19bb88adb8e44df30a03abe4a6 /include
parent8e626a8de1ce101bf4d1fd8856c87de1cc43f7bd (diff)
Adding hook points for interception. Code compiles and tests still run
Diffstat (limited to 'include')
-rw-r--r--include/grpcpp/impl/codegen/call.h223
-rw-r--r--include/grpcpp/impl/codegen/client_interceptor.h59
-rw-r--r--include/grpcpp/impl/codegen/interceptor.h2
3 files changed, 215 insertions, 69 deletions
diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h
index 7cadea0055..771fc22d46 100644
--- a/include/grpcpp/impl/codegen/call.h
+++ b/include/grpcpp/impl/codegen/call.h
@@ -24,10 +24,12 @@
#include <functional>
#include <map>
#include <memory>
+#include <vector>
#include <grpcpp/impl/codegen/byte_buffer.h>
#include <grpcpp/impl/codegen/call_hook.h>
#include <grpcpp/impl/codegen/client_context.h>
+#include <grpcpp/impl/codegen/client_interceptor.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
@@ -50,6 +52,58 @@ namespace internal {
class Call;
class CallHook;
+/// Straightforward wrapping of the C call object
+class Call final {
+ public:
+ /** call is owned by the caller */
+ Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
+ : call_hook_(call_hook),
+ cq_(cq),
+ call_(call),
+ max_receive_message_size_(-1),
+ rpc_info_(nullptr, nullptr, nullptr) {}
+
+ Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
+ experimental::ClientRpcInfo rpc_info,
+ const std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>&
+ creators)
+ : call_hook_(call_hook),
+ cq_(cq),
+ call_(call),
+ max_receive_message_size_(-1),
+ rpc_info_(rpc_info) {
+ for (const auto& creator : creators) {
+ interceptors_.push_back(creator->CreateClientInterceptor(&rpc_info_));
+ }
+ }
+
+ Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
+ int max_receive_message_size)
+ : call_hook_(call_hook),
+ cq_(cq),
+ call_(call),
+ max_receive_message_size_(max_receive_message_size),
+ rpc_info_(nullptr, nullptr, nullptr) {}
+
+ void PerformOps(CallOpSetInterface* ops) {
+ call_hook_->PerformOpsOnCall(ops, this);
+ }
+
+ grpc_call* call() const { return call_; }
+ CompletionQueue* cq() const { return cq_; }
+
+ int max_receive_message_size() const { return max_receive_message_size_; }
+
+ private:
+ CallHook* call_hook_;
+ CompletionQueue* cq_;
+ grpc_call* call_;
+ int max_receive_message_size_;
+ experimental::ClientRpcInfo rpc_info_;
+ std::vector<experimental::ClientInterceptor*> interceptors_;
+};
+
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
inline grpc_metadata* FillMetadataArray(
@@ -201,13 +255,45 @@ class WriteOptions {
};
namespace internal {
+
+class InterceptorBatchMethodsImpl
+ : public experimental::InterceptorBatchMethods {
+ public:
+ InterceptorBatchMethodsImpl() {}
+
+ virtual ~InterceptorBatchMethodsImpl() {}
+
+ virtual bool QueryInterceptionHookPoint(
+ experimental::InterceptionHookPoints type) override {
+ return hooks_[static_cast<int>(type)];
+ }
+
+ virtual void Proceed() override { /* fill this */
+ }
+
+ virtual void Hijack() override { /* fill this */
+ }
+
+ void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) {
+ hooks_[static_cast<int>(type)];
+ }
+
+ private:
+ std::array<bool,
+ static_cast<int>(
+ experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS)>
+ hooks_;
+};
+
/// Default argument for CallOpSet. I is unused by the class, but can be
/// used for generating multiple names for the same thing.
template <int I>
class CallNoOp {
protected:
- void AddOp(grpc_op* ops, size_t* nops) {}
- void FinishOp(bool* status) {}
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
};
class CallOpSendInitialMetadata {
@@ -232,7 +318,8 @@ class CallOpSendInitialMetadata {
}
protected:
- void AddOp(grpc_op* ops, size_t* nops) {
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -246,8 +333,11 @@ class CallOpSendInitialMetadata {
op->data.send_initial_metadata.maybe_compression_level.level =
maybe_compression_level_.level;
}
+ interceptor_methods->AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA);
}
- void FinishOp(bool* status) {
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_) return;
g_core_codegen_interface->gpr_free(initial_metadata_);
send_ = false;
@@ -277,7 +367,8 @@ class CallOpSendMessage {
Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
protected:
- void AddOp(grpc_op* ops, size_t* nops) {
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_buf_.Valid()) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_MESSAGE;
@@ -286,8 +377,13 @@ class CallOpSendMessage {
op->data.send_message.send_message = send_buf_.c_buffer();
// Flags are per-message: clear them after use.
write_options_.Clear();
+ interceptor_methods->AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::PRE_SEND_MESSAGE);
+ }
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
+ send_buf_.Clear();
}
- void FinishOp(bool* status) { send_buf_.Clear(); }
private:
ByteBuffer send_buf_;
@@ -331,7 +427,8 @@ class CallOpRecvMessage {
bool got_message;
protected:
- void AddOp(grpc_op* ops, size_t* nops) {
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (message_ == nullptr) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
@@ -340,7 +437,8 @@ class CallOpRecvMessage {
op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
}
- void FinishOp(bool* status) {
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (message_ == nullptr) return;
if (recv_buf_.Valid()) {
if (*status) {
@@ -359,6 +457,8 @@ class CallOpRecvMessage {
}
}
message_ = nullptr;
+ interceptor_methods->AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
}
private:
@@ -406,7 +506,8 @@ class CallOpGenericRecvMessage {
bool got_message;
protected:
- void AddOp(grpc_op* ops, size_t* nops) {
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!deserialize_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
@@ -415,7 +516,8 @@ class CallOpGenericRecvMessage {
op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
}
- void FinishOp(bool* status) {
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!deserialize_) return;
if (recv_buf_.Valid()) {
if (*status) {
@@ -433,6 +535,8 @@ class CallOpGenericRecvMessage {
}
}
deserialize_.reset();
+ interceptor_methods->AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
}
private:
@@ -448,14 +552,18 @@ class CallOpClientSendClose {
void ClientSendClose() { send_ = true; }
protected:
- void AddOp(grpc_op* ops, size_t* nops) {
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
}
- void FinishOp(bool* status) { send_ = false; }
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
+ send_ = false;
+ }
private:
bool send_;
@@ -477,7 +585,8 @@ class CallOpServerSendStatus {
}
protected:
- void AddOp(grpc_op* ops, size_t* nops) {
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_status_available_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
@@ -490,9 +599,12 @@ class CallOpServerSendStatus {
send_error_message_.empty() ? nullptr : &error_message_slice_;
op->flags = 0;
op->reserved = NULL;
+ interceptor_methods->AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::PRE_SEND_STATUS);
}
- void FinishOp(bool* status) {
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_status_available_) return;
g_core_codegen_interface->gpr_free(trailing_metadata_);
send_status_available_ = false;
@@ -518,7 +630,8 @@ class CallOpRecvInitialMetadata {
}
protected:
- void AddOp(grpc_op* ops, size_t* nops) {
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (metadata_map_ == nullptr) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_INITIAL_METADATA;
@@ -527,9 +640,12 @@ class CallOpRecvInitialMetadata {
op->reserved = NULL;
}
- void FinishOp(bool* status) {
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (metadata_map_ == nullptr) return;
metadata_map_ = nullptr;
+ interceptor_methods->AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
}
private:
@@ -549,7 +665,8 @@ class CallOpClientRecvStatus {
}
protected:
- void AddOp(grpc_op* ops, size_t* nops) {
+ void AddOp(grpc_op* ops, size_t* nops,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (recv_status_ == nullptr) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
@@ -561,7 +678,8 @@ class CallOpClientRecvStatus {
op->reserved = NULL;
}
- void FinishOp(bool* status) {
+ void FinishOp(bool* status,
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (recv_status_ == nullptr) return;
grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
*recv_status_ =
@@ -578,6 +696,8 @@ class CallOpClientRecvStatus {
g_core_codegen_interface->gpr_free((void*)debug_error_string_);
}
recv_status_ = nullptr;
+ interceptor_methods->AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_STATUS);
}
private:
@@ -598,7 +718,7 @@ class CallOpSetInterface : public CompletionQueueTag {
public:
/// Fills in grpc_op, starting from ops[*nops] and moving
/// upwards.
- virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
+ virtual void FillOps(internal::Call* call, grpc_op* ops, size_t* nops) = 0;
/// Get the tag to be used at the core completion queue. Generally, the
/// value of cq_tag will be "this". However, it can be overridden if we
@@ -624,27 +744,27 @@ class CallOpSet : public CallOpSetInterface,
public Op6 {
public:
CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {}
- void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override {
- this->Op1::AddOp(ops, nops);
- this->Op2::AddOp(ops, nops);
- this->Op3::AddOp(ops, nops);
- this->Op4::AddOp(ops, nops);
- this->Op5::AddOp(ops, nops);
- this->Op6::AddOp(ops, nops);
- g_core_codegen_interface->grpc_call_ref(call);
+ void FillOps(Call* call, grpc_op* ops, size_t* nops) override {
+ this->Op1::AddOp(ops, nops, &interceptor_methods_);
+ this->Op2::AddOp(ops, nops, &interceptor_methods_);
+ this->Op3::AddOp(ops, nops, &interceptor_methods_);
+ this->Op4::AddOp(ops, nops, &interceptor_methods_);
+ this->Op5::AddOp(ops, nops, &interceptor_methods_);
+ this->Op6::AddOp(ops, nops, &interceptor_methods_);
+ g_core_codegen_interface->grpc_call_ref(call->call());
call_ = call;
}
bool FinalizeResult(void** tag, bool* status) override {
- this->Op1::FinishOp(status);
- this->Op2::FinishOp(status);
- this->Op3::FinishOp(status);
- this->Op4::FinishOp(status);
- this->Op5::FinishOp(status);
- this->Op6::FinishOp(status);
+ this->Op1::FinishOp(status, &interceptor_methods_);
+ this->Op2::FinishOp(status, &interceptor_methods_);
+ this->Op3::FinishOp(status, &interceptor_methods_);
+ this->Op4::FinishOp(status, &interceptor_methods_);
+ this->Op5::FinishOp(status, &interceptor_methods_);
+ this->Op6::FinishOp(status, &interceptor_methods_);
*tag = return_tag_;
- g_core_codegen_interface->grpc_call_unref(call_);
+ g_core_codegen_interface->grpc_call_unref(call_->call());
return true;
}
@@ -661,41 +781,10 @@ class CallOpSet : public CallOpSetInterface,
private:
void* cq_tag_;
void* return_tag_;
- grpc_call* call_;
+ Call* call_;
+ InterceptorBatchMethodsImpl interceptor_methods_;
};
-/// Straightforward wrapping of the C call object
-class Call final {
- public:
- /** call is owned by the caller */
- Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
- : call_hook_(call_hook),
- cq_(cq),
- call_(call),
- max_receive_message_size_(-1) {}
-
- Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
- int max_receive_message_size)
- : call_hook_(call_hook),
- cq_(cq),
- call_(call),
- max_receive_message_size_(max_receive_message_size) {}
-
- void PerformOps(CallOpSetInterface* ops) {
- call_hook_->PerformOpsOnCall(ops, this);
- }
-
- grpc_call* call() const { return call_; }
- CompletionQueue* cq() const { return cq_; }
-
- int max_receive_message_size() const { return max_receive_message_size_; }
-
- private:
- CallHook* call_hook_;
- CompletionQueue* cq_;
- grpc_call* call_;
- int max_receive_message_size_;
-};
} // namespace internal
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h
index f460c5ac0c..f7963a57d5 100644
--- a/include/grpcpp/impl/codegen/client_interceptor.h
+++ b/include/grpcpp/impl/codegen/client_interceptor.h
@@ -19,7 +19,9 @@
#ifndef GRPCPP_IMPL_CODEGEN_CLIENT_INTERCEPTOR_H
#define GRPCPP_IMPL_CODEGEN_CLIENT_INTERCEPTOR_H
+#include <grpcpp/impl/codegen/client_context.h>
#include <grpcpp/impl/codegen/interceptor.h>
+#include <grpcpp/impl/codegen/string_ref.h>
namespace grpc {
namespace experimental {
@@ -30,7 +32,62 @@ class ClientInterceptor {
virtual void Intercept(InterceptorBatchMethods* methods) = 0;
};
-class ClientRpcInfo {};
+class ClientRpcInfo {
+ public:
+ ClientRpcInfo(grpc::ClientContext* ctx, const char* method,
+ const grpc::Channel* channel)
+ : ctx_(ctx), method_(method), channel_(channel) {}
+ ~ClientRpcInfo(){};
+
+ // Getter methods
+ const char* method() { return method_; }
+ string peer() { return ctx_->peer(); }
+ const Channel* channel() { return channel_; }
+ // const grpc::InterceptedMessage& outgoing_message();
+ // grpc::InterceptedMessage *mutable_outgoing_message();
+ // const grpc::InterceptedMessage& received_message();
+ // grpc::InterceptedMessage *mutable_received_message();
+ std::shared_ptr<const AuthContext> auth_context() {
+ return ctx_->auth_context();
+ }
+ const struct census_context* census_context() {
+ return ctx_->census_context();
+ }
+ gpr_timespec deadline() { return ctx_->raw_deadline(); }
+ // const std::multimap<grpc::string, grpc::string>* client_initial_metadata()
+ // { return &ctx_->send_initial_metadata_; } const
+ // std::multimap<grpc::string_ref, grpc::string_ref>*
+ // server_initial_metadata() { return &ctx_->GetServerInitialMetadata(); }
+ // const std::multimap<grpc::string_ref, grpc::string_ref>*
+ // server_trailing_metadata() { return &ctx_->GetServerTrailingMetadata(); }
+ // const Status *status();
+
+ // Setter methods
+ template <typename T>
+ void set_deadline(const T& deadline) {
+ ctx_->set_deadline(deadline);
+ }
+ void set_census_context(struct census_context* cc) {
+ ctx_->set_census_context(cc);
+ }
+ // template <class M>
+ // void set_outgoing_message(M* msg); // edit outgoing message
+ // template <class M>
+ // void set_received_message(M* msg); // edit received message
+ // for hijacking (can be called multiple times for streaming)
+ // template <class M>
+ // void inject_received_message(M* msg);
+ // void set_client_initial_metadata(
+ // const std::multimap<grpc::string, grpc::string>& overwrite);
+ // void set_server_initial_metadata(const std::multimap<grpc::string,
+ // grpc::string>& overwrite); void set_server_trailing_metadata(const
+ // std::multimap<grpc::string, grpc::string>& overwrite); void
+ // set_status(Status status);
+ private:
+ grpc::ClientContext* ctx_;
+ const char* method_;
+ const grpc::Channel* channel_;
+};
class ClientInterceptorFactoryInterface {
public:
diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h
index 6402a3a946..84dce42f97 100644
--- a/include/grpcpp/impl/codegen/interceptor.h
+++ b/include/grpcpp/impl/codegen/interceptor.h
@@ -50,7 +50,7 @@ enum class InterceptionHookPoints {
class InterceptorBatchMethods {
public:
- virtual ~InterceptorBatchMethods();
+ virtual ~InterceptorBatchMethods(){};
// Queries to check whether the current batch has an interception hook point
// of type \a type
virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type) = 0;