aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-12-26 15:10:41 -0800
committerGravatar Yash Tibrewal <yashkt@google.com>2018-12-26 15:10:41 -0800
commit24e37e249a4db24ff2c886960e3a00311e2591dd (patch)
tree7bcb4a002432e7ef04054750b1efe8dfd9ba8ade /include
parent0911e489e3fe22e2ca5d7c927dac83358f2f05b7 (diff)
parentfc7d0911a3a44d7bc926d3db99b7300a0c0f33dc (diff)
Merge branch 'master' into failhijackedrecv
Diffstat (limited to 'include')
-rw-r--r--include/grpc/grpc.h3
-rw-r--r--include/grpc/grpc_security_constants.h6
-rw-r--r--include/grpc/impl/codegen/atm_gcc_sync.h2
-rw-r--r--include/grpc/impl/codegen/atm_windows.h2
-rw-r--r--include/grpc/impl/codegen/compression_types.h3
-rw-r--r--include/grpc/impl/codegen/grpc_types.h7
-rw-r--r--include/grpc/impl/codegen/port_platform.h9
-rw-r--r--include/grpcpp/alarm.h93
-rw-r--r--include/grpcpp/alarm_impl.h116
-rw-r--r--include/grpcpp/channel.h8
-rw-r--r--include/grpcpp/create_channel.h4
-rw-r--r--include/grpcpp/generic/generic_stub.h5
-rw-r--r--include/grpcpp/health_check_service_interface.h4
-rw-r--r--include/grpcpp/impl/codegen/byte_buffer.h19
-rw-r--r--include/grpcpp/impl/codegen/call_op_set.h9
-rw-r--r--include/grpcpp/impl/codegen/callback_common.h27
-rw-r--r--include/grpcpp/impl/codegen/channel_interface.h13
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h655
-rw-r--r--include/grpcpp/impl/codegen/client_context.h27
-rw-r--r--include/grpcpp/impl/codegen/client_interceptor.h78
-rw-r--r--include/grpcpp/impl/codegen/client_unary_call.h18
-rw-r--r--include/grpcpp/impl/codegen/completion_queue.h3
-rw-r--r--include/grpcpp/impl/codegen/interceptor.h133
-rw-r--r--include/grpcpp/impl/codegen/interceptor_common.h10
-rw-r--r--include/grpcpp/impl/codegen/metadata_map.h19
-rw-r--r--include/grpcpp/impl/codegen/server_callback.h774
-rw-r--r--include/grpcpp/impl/codegen/server_context.h39
-rw-r--r--include/grpcpp/impl/codegen/server_interceptor.h58
-rw-r--r--include/grpcpp/impl/codegen/server_interface.h18
-rw-r--r--include/grpcpp/security/credentials.h12
-rw-r--r--include/grpcpp/server.h4
-rw-r--r--include/grpcpp/support/client_interceptor.h24
-rw-r--r--include/grpcpp/support/interceptor.h24
-rw-r--r--include/grpcpp/support/server_interceptor.h24
34 files changed, 1988 insertions, 262 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index d3b74cabab..fec7f5269e 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -511,7 +511,8 @@ GRPCAPI char* grpc_channelz_get_server(intptr_t server_id);
/* Gets all server sockets that exist in the server. */
GRPCAPI char* grpc_channelz_get_server_sockets(intptr_t server_id,
- intptr_t start_socket_id);
+ intptr_t start_socket_id,
+ intptr_t max_results);
/* Returns a single Channel, or else a NOT_FOUND code. The returned string
is allocated and must be freed by the application. */
diff --git a/include/grpc/grpc_security_constants.h b/include/grpc/grpc_security_constants.h
index f935557f2d..a082f67010 100644
--- a/include/grpc/grpc_security_constants.h
+++ b/include/grpc/grpc_security_constants.h
@@ -106,10 +106,10 @@ typedef enum {
} grpc_ssl_client_certificate_request_type;
/**
- * Type of local connection for which local channel/server credentials will be
- * applied. It only supports UDS for now.
+ * Type of local connections for which local channel/server credentials will be
+ * applied. It supports UDS and local TCP connections.
*/
-typedef enum { UDS = 0 } grpc_local_connect_type;
+typedef enum { UDS = 0, LOCAL_TCP } grpc_local_connect_type;
#ifdef __cplusplus
}
diff --git a/include/grpc/impl/codegen/atm_gcc_sync.h b/include/grpc/impl/codegen/atm_gcc_sync.h
index c0010a3469..728c3d5412 100644
--- a/include/grpc/impl/codegen/atm_gcc_sync.h
+++ b/include/grpc/impl/codegen/atm_gcc_sync.h
@@ -26,6 +26,8 @@
typedef intptr_t gpr_atm;
#define GPR_ATM_MAX INTPTR_MAX
#define GPR_ATM_MIN INTPTR_MIN
+#define GPR_ATM_INC_CAS_THEN(blah) blah
+#define GPR_ATM_INC_ADD_THEN(blah) blah
#define GPR_ATM_COMPILE_BARRIER_() __asm__ __volatile__("" : : : "memory")
diff --git a/include/grpc/impl/codegen/atm_windows.h b/include/grpc/impl/codegen/atm_windows.h
index f6b27e5df7..c016b90095 100644
--- a/include/grpc/impl/codegen/atm_windows.h
+++ b/include/grpc/impl/codegen/atm_windows.h
@@ -25,6 +25,8 @@
typedef intptr_t gpr_atm;
#define GPR_ATM_MAX INTPTR_MAX
#define GPR_ATM_MIN INTPTR_MIN
+#define GPR_ATM_INC_CAS_THEN(blah) blah
+#define GPR_ATM_INC_ADD_THEN(blah) blah
#define gpr_atm_full_barrier MemoryBarrier
diff --git a/include/grpc/impl/codegen/compression_types.h b/include/grpc/impl/codegen/compression_types.h
index e35d892967..f778b005b9 100644
--- a/include/grpc/impl/codegen/compression_types.h
+++ b/include/grpc/impl/codegen/compression_types.h
@@ -52,7 +52,8 @@ extern "C" {
"grpc.compression_enabled_algorithms_bitset"
/** \} */
-/** The various compression algorithms supported by gRPC */
+/** The various compression algorithms supported by gRPC (not sorted by
+ * compression level) */
typedef enum {
GRPC_COMPRESS_NONE = 0,
GRPC_COMPRESS_DEFLATE,
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 17a43fab0f..a9fb27946e 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -293,7 +293,7 @@ typedef struct {
"grpc.max_channel_trace_event_memory_per_node"
/** If non-zero, gRPC library will track stats and information at at per channel
* level. Disabling channelz naturally disables channel tracing. The default
- * is for channelz to be disabled. */
+ * is for channelz to be enabled. */
#define GRPC_ARG_ENABLE_CHANNELZ "grpc.enable_channelz"
/** If non-zero, Cronet transport will coalesce packets to fewer frames
* when possible. */
@@ -350,6 +350,11 @@ typedef struct {
/** If set, inhibits health checking (which may be enabled via the
* service config.) */
#define GRPC_ARG_INHIBIT_HEALTH_CHECKING "grpc.inhibit_health_checking"
+/** If set, determines the number of milliseconds that the c-ares based
+ * DNS resolver will wait on queries before cancelling them. The default value
+ * is 10000. Setting this to "0" will disable c-ares query timeouts
+ * entirely. */
+#define GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS "grpc.dns_ares_query_timeout"
/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a
diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h
index b2028a6305..031c0c36ae 100644
--- a/include/grpc/impl/codegen/port_platform.h
+++ b/include/grpc/impl/codegen/port_platform.h
@@ -526,6 +526,15 @@ typedef unsigned __int64 uint64_t;
#endif /* GPR_ATTRIBUTE_NO_TSAN (2) */
#endif /* GPR_ATTRIBUTE_NO_TSAN (1) */
+/* GRPC_TSAN_ENABLED will be defined, when compiled with thread sanitizer. */
+#if defined(__SANITIZE_THREAD__)
+#define GRPC_TSAN_ENABLED
+#elif defined(__has_feature)
+#if __has_feature(thread_sanitizer)
+#define GRPC_TSAN_ENABLED
+#endif
+#endif
+
/* GRPC_ALLOW_EXCEPTIONS should be 0 or 1 if exceptions are allowed or not */
#ifndef GRPC_ALLOW_EXCEPTIONS
/* If not already set, set to 1 on Windows (style guide standard) but to
diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h
index 365feb4eb9..2343c1149c 100644
--- a/include/grpcpp/alarm.h
+++ b/include/grpcpp/alarm.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015 gRPC authors.
+ * Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,99 +16,14 @@
*
*/
-/// An Alarm posts the user provided tag to its associated completion queue upon
-/// expiry or cancellation.
#ifndef GRPCPP_ALARM_H
#define GRPCPP_ALARM_H
-#include <functional>
-
-#include <grpc/grpc.h>
-#include <grpcpp/impl/codegen/completion_queue.h>
-#include <grpcpp/impl/codegen/completion_queue_tag.h>
-#include <grpcpp/impl/codegen/grpc_library.h>
-#include <grpcpp/impl/codegen/time.h>
-#include <grpcpp/impl/grpc_library.h>
+#include <grpcpp/alarm_impl.h>
namespace grpc {
-/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h).
-class Alarm : private GrpcLibraryCodegen {
- public:
- /// Create an unset completion queue alarm
- Alarm();
-
- /// Destroy the given completion queue alarm, cancelling it in the process.
- ~Alarm();
-
- /// DEPRECATED: Create and set a completion queue alarm instance associated to
- /// \a cq.
- /// This form is deprecated because it is inherently racy.
- /// \internal We rely on the presence of \a cq for grpc initialization. If \a
- /// cq were ever to be removed, a reference to a static
- /// internal::GrpcLibraryInitializer instance would need to be introduced
- /// here. \endinternal.
- template <typename T>
- Alarm(CompletionQueue* cq, const T& deadline, void* tag) : Alarm() {
- SetInternal(cq, TimePoint<T>(deadline).raw_time(), tag);
- }
-
- /// Trigger an alarm instance on completion queue \a cq at the specified time.
- /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel),
- /// an event with tag \a tag will be added to \a cq. If the alarm expired, the
- /// event's success bit will be true, false otherwise (ie, upon cancellation).
- template <typename T>
- void Set(CompletionQueue* cq, const T& deadline, void* tag) {
- SetInternal(cq, TimePoint<T>(deadline).raw_time(), tag);
- }
-
- /// Alarms aren't copyable.
- Alarm(const Alarm&) = delete;
- Alarm& operator=(const Alarm&) = delete;
-
- /// Alarms are movable.
- Alarm(Alarm&& rhs) : alarm_(rhs.alarm_) { rhs.alarm_ = nullptr; }
- Alarm& operator=(Alarm&& rhs) {
- alarm_ = rhs.alarm_;
- rhs.alarm_ = nullptr;
- return *this;
- }
-
- /// Cancel a completion queue alarm. Calling this function over an alarm that
- /// has already fired has no effect.
- void Cancel();
-
- /// NOTE: class experimental_type is not part of the public API of this class
- /// TODO(vjpai): Move these contents to the public API of Alarm when
- /// they are no longer experimental
- class experimental_type {
- public:
- explicit experimental_type(Alarm* alarm) : alarm_(alarm) {}
-
- /// Set an alarm to invoke callback \a f. The argument to the callback
- /// states whether the alarm expired at \a deadline (true) or was cancelled
- /// (false)
- template <typename T>
- void Set(const T& deadline, std::function<void(bool)> f) {
- alarm_->SetInternal(TimePoint<T>(deadline).raw_time(), std::move(f));
- }
-
- private:
- Alarm* alarm_;
- };
-
- /// NOTE: The function experimental() is not stable public API. It is a view
- /// to the experimental components of this class. It may be changed or removed
- /// at any time.
- experimental_type experimental() { return experimental_type(this); }
-
- private:
- void SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag);
- void SetInternal(gpr_timespec deadline, std::function<void(bool)> f);
-
- internal::CompletionQueueTag* alarm_;
-};
-
-} // namespace grpc
+typedef ::grpc_impl::Alarm Alarm;
+}
#endif // GRPCPP_ALARM_H
diff --git a/include/grpcpp/alarm_impl.h b/include/grpcpp/alarm_impl.h
new file mode 100644
index 0000000000..7844e7c886
--- /dev/null
+++ b/include/grpcpp/alarm_impl.h
@@ -0,0 +1,116 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/// An Alarm posts the user provided tag to its associated completion queue upon
+/// expiry or cancellation.
+#ifndef GRPCPP_ALARM_IMPL_H
+#define GRPCPP_ALARM_IMPL_H
+
+#include <functional>
+
+#include <grpc/grpc.h>
+#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/impl/codegen/completion_queue_tag.h>
+#include <grpcpp/impl/codegen/grpc_library.h>
+#include <grpcpp/impl/codegen/time.h>
+#include <grpcpp/impl/grpc_library.h>
+
+namespace grpc_impl {
+
+/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h).
+class Alarm : private ::grpc::GrpcLibraryCodegen {
+ public:
+ /// Create an unset completion queue alarm
+ Alarm();
+
+ /// Destroy the given completion queue alarm, cancelling it in the process.
+ ~Alarm();
+
+ /// DEPRECATED: Create and set a completion queue alarm instance associated to
+ /// \a cq.
+ /// This form is deprecated because it is inherently racy.
+ /// \internal We rely on the presence of \a cq for grpc initialization. If \a
+ /// cq were ever to be removed, a reference to a static
+ /// internal::GrpcLibraryInitializer instance would need to be introduced
+ /// here. \endinternal.
+ template <typename T>
+ Alarm(::grpc::CompletionQueue* cq, const T& deadline, void* tag) : Alarm() {
+ SetInternal(cq, ::grpc::TimePoint<T>(deadline).raw_time(), tag);
+ }
+
+ /// Trigger an alarm instance on completion queue \a cq at the specified time.
+ /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel),
+ /// an event with tag \a tag will be added to \a cq. If the alarm expired, the
+ /// event's success bit will be true, false otherwise (ie, upon cancellation).
+ template <typename T>
+ void Set(::grpc::CompletionQueue* cq, const T& deadline, void* tag) {
+ SetInternal(cq, ::grpc::TimePoint<T>(deadline).raw_time(), tag);
+ }
+
+ /// Alarms aren't copyable.
+ Alarm(const Alarm&) = delete;
+ Alarm& operator=(const Alarm&) = delete;
+
+ /// Alarms are movable.
+ Alarm(Alarm&& rhs) : alarm_(rhs.alarm_) { rhs.alarm_ = nullptr; }
+ Alarm& operator=(Alarm&& rhs) {
+ alarm_ = rhs.alarm_;
+ rhs.alarm_ = nullptr;
+ return *this;
+ }
+
+ /// Cancel a completion queue alarm. Calling this function over an alarm that
+ /// has already fired has no effect.
+ void Cancel();
+
+ /// NOTE: class experimental_type is not part of the public API of this class
+ /// TODO(vjpai): Move these contents to the public API of Alarm when
+ /// they are no longer experimental
+ class experimental_type {
+ public:
+ explicit experimental_type(Alarm* alarm) : alarm_(alarm) {}
+
+ /// Set an alarm to invoke callback \a f. The argument to the callback
+ /// states whether the alarm expired at \a deadline (true) or was cancelled
+ /// (false)
+ template <typename T>
+ void Set(const T& deadline, std::function<void(bool)> f) {
+ alarm_->SetInternal(::grpc::TimePoint<T>(deadline).raw_time(),
+ std::move(f));
+ }
+
+ private:
+ Alarm* alarm_;
+ };
+
+ /// NOTE: The function experimental() is not stable public API. It is a view
+ /// to the experimental components of this class. It may be changed or removed
+ /// at any time.
+ experimental_type experimental() { return experimental_type(this); }
+
+ private:
+ void SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline,
+ void* tag);
+ void SetInternal(gpr_timespec deadline, std::function<void(bool)> f);
+
+ ::grpc::internal::CompletionQueueTag* alarm_;
+};
+
+} // namespace grpc_impl
+
+#endif // GRPCPP_ALARM_IMPL_H
diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h
index 4502b94b17..ee83396069 100644
--- a/include/grpcpp/channel.h
+++ b/include/grpcpp/channel.h
@@ -65,13 +65,13 @@ class Channel final : public ChannelInterface,
friend void experimental::ChannelResetConnectionBackoff(Channel* channel);
friend std::shared_ptr<Channel> CreateChannelInternal(
const grpc::string& host, grpc_channel* c_channel,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
friend class internal::InterceptedChannel;
Channel(const grpc::string& host, grpc_channel* c_channel,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
internal::Call CreateCall(const internal::RpcMethod& method,
diff --git a/include/grpcpp/create_channel.h b/include/grpcpp/create_channel.h
index 43188d09e7..e8a2a70581 100644
--- a/include/grpcpp/create_channel.h
+++ b/include/grpcpp/create_channel.h
@@ -70,8 +70,8 @@ std::shared_ptr<Channel> CreateCustomChannelWithInterceptors(
const grpc::string& target,
const std::shared_ptr<ChannelCredentials>& creds,
const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
} // namespace experimental
} // namespace grpc
diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h
index d509d9a520..eb014184e4 100644
--- a/include/grpcpp/generic/generic_stub.h
+++ b/include/grpcpp/generic/generic_stub.h
@@ -24,6 +24,7 @@
#include <grpcpp/support/async_stream.h>
#include <grpcpp/support/async_unary_call.h>
#include <grpcpp/support/byte_buffer.h>
+#include <grpcpp/support/client_callback.h>
#include <grpcpp/support/status.h>
namespace grpc {
@@ -76,6 +77,10 @@ class GenericStub final {
const ByteBuffer* request, ByteBuffer* response,
std::function<void(Status)> on_completion);
+ void PrepareBidiStreamingCall(
+ ClientContext* context, const grpc::string& method,
+ experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor);
+
private:
GenericStub* stub_;
};
diff --git a/include/grpcpp/health_check_service_interface.h b/include/grpcpp/health_check_service_interface.h
index b45a699bda..dfd4c3983a 100644
--- a/include/grpcpp/health_check_service_interface.h
+++ b/include/grpcpp/health_check_service_interface.h
@@ -37,6 +37,10 @@ class HealthCheckServiceInterface {
bool serving) = 0;
/// Apply to all registered service names.
virtual void SetServingStatus(bool serving) = 0;
+
+ /// Set all registered service names to not serving and prevent future
+ /// state changes.
+ virtual void Shutdown() {}
};
/// Enable/disable the default health checking service. This applies to all C++
diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h
index abba5549b8..a77e36dfc5 100644
--- a/include/grpcpp/impl/codegen/byte_buffer.h
+++ b/include/grpcpp/impl/codegen/byte_buffer.h
@@ -45,8 +45,10 @@ template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
-template <class ServiceType, class RequestType, class ResponseType>
+template <class RequestType, class ResponseType>
class CallbackUnaryHandler;
+template <class RequestType, class ResponseType>
+class CallbackServerStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
template <class R>
@@ -91,7 +93,9 @@ class ByteBuffer final {
}
/// Constuct a byte buffer by referencing elements of existing buffer
- /// \a buf. Wrapper of core function grpc_byte_buffer_copy
+ /// \a buf. Wrapper of core function grpc_byte_buffer_copy . This is not
+ /// a deep copy; it is just a referencing. As a result, its performance is
+ /// size-independent.
ByteBuffer(const ByteBuffer& buf);
~ByteBuffer() {
@@ -100,6 +104,9 @@ class ByteBuffer final {
}
}
+ /// Wrapper of core function grpc_byte_buffer_copy . This is not
+ /// a deep copy; it is just a referencing. As a result, its performance is
+ /// size-independent.
ByteBuffer& operator=(const ByteBuffer&);
/// Dump (read) the buffer contents into \a slices.
@@ -115,7 +122,9 @@ class ByteBuffer final {
/// Make a duplicate copy of the internals of this byte
/// buffer so that we have our own owned version of it.
- /// bbuf.Duplicate(); is equivalent to bbuf=bbuf; but is actually readable
+ /// bbuf.Duplicate(); is equivalent to bbuf=bbuf; but is actually readable.
+ /// This is not a deep copy; it is a referencing and its performance
+ /// is size-independent.
void Duplicate() {
buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buffer_);
}
@@ -156,8 +165,10 @@ class ByteBuffer final {
friend class internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class internal::ServerStreamingHandler;
- template <class ServiceType, class RequestType, class ResponseType>
+ template <class RequestType, class ResponseType>
friend class internal::CallbackUnaryHandler;
+ template <class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackServerStreamingHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
template <class R>
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h
index 3699ec94f2..2e8f9a43b6 100644
--- a/include/grpcpp/impl/codegen/call_op_set.h
+++ b/include/grpcpp/impl/codegen/call_op_set.h
@@ -325,7 +325,11 @@ class CallOpSendMessage {
}
void SetFinishInterceptionHookPoint(
- InterceptorBatchMethodsImpl* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {
+ // The contents of the SendMessage value that was previously set
+ // has had its references stolen by core's operations
+ interceptor_methods->SetSendMessage(nullptr);
+ }
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
@@ -415,6 +419,7 @@ class CallOpRecvMessage {
if (message_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
@@ -511,12 +516,14 @@ class CallOpGenericRecvMessage {
if (!deserialize_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (!deserialize_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
+ got_message = true;
}
private:
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h
index 51367cf550..a3c8c41246 100644
--- a/include/grpcpp/impl/codegen/callback_common.h
+++ b/include/grpcpp/impl/codegen/callback_common.h
@@ -32,6 +32,8 @@ namespace grpc {
namespace internal {
/// An exception-safe way of invoking a user-specified callback function
+// TODO(vjpai): decide whether it is better for this to take a const lvalue
+// parameter or an rvalue parameter, or if it even matters
template <class Func, class... Args>
void CatchingCallback(Func&& func, Args&&... args) {
#if GRPC_ALLOW_EXCEPTIONS
@@ -45,6 +47,20 @@ void CatchingCallback(Func&& func, Args&&... args) {
#endif // GRPC_ALLOW_EXCEPTIONS
}
+template <class ReturnType, class Func, class... Args>
+ReturnType* CatchingReactorCreator(Func&& func, Args&&... args) {
+#if GRPC_ALLOW_EXCEPTIONS
+ try {
+ return func(std::forward<Args>(args)...);
+ } catch (...) {
+ // fail the RPC, don't crash the library
+ return nullptr;
+ }
+#else // GRPC_ALLOW_EXCEPTIONS
+ return func(std::forward<Args>(args)...);
+#endif // GRPC_ALLOW_EXCEPTIONS
+}
+
// The contract on these tags is that they are single-shot. They must be
// constructed and then fired at exactly one point. There is no expectation
// that they can be reused without reconstruction.
@@ -145,18 +161,19 @@ class CallbackWithSuccessTag
// or on a tag that has been Set before unless the tag has been cleared.
void Set(grpc_call* call, std::function<void(bool)> f,
CompletionQueueTag* ops) {
+ GPR_CODEGEN_ASSERT(call_ == nullptr);
+ g_core_codegen_interface->grpc_call_ref(call);
call_ = call;
func_ = std::move(f);
ops_ = ops;
- g_core_codegen_interface->grpc_call_ref(call);
functor_run = &CallbackWithSuccessTag::StaticRun;
}
void Clear() {
if (call_ != nullptr) {
- func_ = nullptr;
grpc_call* call = call_;
call_ = nullptr;
+ func_ = nullptr;
g_core_codegen_interface->grpc_call_unref(call);
}
}
@@ -182,11 +199,11 @@ class CallbackWithSuccessTag
}
void Run(bool ok) {
void* ignored = ops_;
- bool new_ok = ok;
// Allow a "false" return value from FinalizeResult to silence the
// callback, just as it silences a CQ tag in the async cases
- bool do_callback = ops_->FinalizeResult(&ignored, &new_ok);
- GPR_CODEGEN_ASSERT(ignored == ops_);
+ auto* ops = ops_;
+ bool do_callback = ops_->FinalizeResult(&ignored, &ok);
+ GPR_CODEGEN_ASSERT(ignored == ops);
if (do_callback) {
CatchingCallback(func_, ok);
diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h
index 6ec1ffb8c7..5353f5feaa 100644
--- a/include/grpcpp/impl/codegen/channel_interface.h
+++ b/include/grpcpp/impl/codegen/channel_interface.h
@@ -21,7 +21,6 @@
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpcpp/impl/codegen/call.h>
-#include <grpcpp/impl/codegen/client_context.h>
#include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/time.h>
@@ -53,6 +52,12 @@ template <class W, class R>
class ClientAsyncReaderWriterFactory;
template <class R>
class ClientAsyncResponseReaderFactory;
+template <class W, class R>
+class ClientCallbackReaderWriterFactory;
+template <class R>
+class ClientCallbackReaderFactory;
+template <class W>
+class ClientCallbackWriterFactory;
class InterceptedChannel;
} // namespace internal
@@ -106,6 +111,12 @@ class ChannelInterface {
friend class ::grpc::internal::ClientAsyncReaderWriterFactory;
template <class R>
friend class ::grpc::internal::ClientAsyncResponseReaderFactory;
+ template <class W, class R>
+ friend class ::grpc::internal::ClientCallbackReaderWriterFactory;
+ template <class R>
+ friend class ::grpc::internal::ClientCallbackReaderFactory;
+ template <class W>
+ friend class ::grpc::internal::ClientCallbackWriterFactory;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
index 4baa819091..66cf9b7754 100644
--- a/include/grpcpp/impl/codegen/client_callback.h
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -22,6 +22,7 @@
#include <functional>
#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/call_op_set.h>
#include <grpcpp/impl/codegen/callback_common.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/config.h>
@@ -88,6 +89,660 @@ class CallbackUnaryCallImpl {
call.PerformOps(ops);
}
};
+} // namespace internal
+
+namespace experimental {
+
+// Forward declarations
+template <class Request, class Response>
+class ClientBidiReactor;
+template <class Response>
+class ClientReadReactor;
+template <class Request>
+class ClientWriteReactor;
+
+// NOTE: The streaming objects are not actually implemented in the public API.
+// These interfaces are provided for mocking only. Typical applications
+// will interact exclusively with the reactors that they define.
+template <class Request, class Response>
+class ClientCallbackReaderWriter {
+ public:
+ virtual ~ClientCallbackReaderWriter() {}
+ virtual void StartCall() = 0;
+ virtual void Write(const Request* req, WriteOptions options) = 0;
+ virtual void WritesDone() = 0;
+ virtual void Read(Response* resp) = 0;
+
+ protected:
+ void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
+ reactor->BindStream(this);
+ }
+};
+
+template <class Response>
+class ClientCallbackReader {
+ public:
+ virtual ~ClientCallbackReader() {}
+ virtual void StartCall() = 0;
+ virtual void Read(Response* resp) = 0;
+
+ protected:
+ void BindReactor(ClientReadReactor<Response>* reactor) {
+ reactor->BindReader(this);
+ }
+};
+
+template <class Request>
+class ClientCallbackWriter {
+ public:
+ virtual ~ClientCallbackWriter() {}
+ virtual void StartCall() = 0;
+ void Write(const Request* req) { Write(req, WriteOptions()); }
+ virtual void Write(const Request* req, WriteOptions options) = 0;
+ void WriteLast(const Request* req, WriteOptions options) {
+ Write(req, options.set_last_message());
+ }
+ virtual void WritesDone() = 0;
+
+ protected:
+ void BindReactor(ClientWriteReactor<Request>* reactor) {
+ reactor->BindWriter(this);
+ }
+};
+
+// The user must implement this reactor interface with reactions to each event
+// type that gets called by the library. An empty reaction is provided by
+// default
+template <class Request, class Response>
+class ClientBidiReactor {
+ public:
+ virtual ~ClientBidiReactor() {}
+ virtual void OnDone(const Status& s) {}
+ virtual void OnReadInitialMetadataDone(bool ok) {}
+ virtual void OnReadDone(bool ok) {}
+ virtual void OnWriteDone(bool ok) {}
+ virtual void OnWritesDoneDone(bool ok) {}
+
+ void StartCall() { stream_->StartCall(); }
+ void StartRead(Response* resp) { stream_->Read(resp); }
+ void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
+ void StartWrite(const Request* req, WriteOptions options) {
+ stream_->Write(req, std::move(options));
+ }
+ void StartWriteLast(const Request* req, WriteOptions options) {
+ StartWrite(req, std::move(options.set_last_message()));
+ }
+ void StartWritesDone() { stream_->WritesDone(); }
+
+ private:
+ friend class ClientCallbackReaderWriter<Request, Response>;
+ void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
+ stream_ = stream;
+ }
+ ClientCallbackReaderWriter<Request, Response>* stream_;
+};
+
+template <class Response>
+class ClientReadReactor {
+ public:
+ virtual ~ClientReadReactor() {}
+ virtual void OnDone(const Status& s) {}
+ virtual void OnReadInitialMetadataDone(bool ok) {}
+ virtual void OnReadDone(bool ok) {}
+
+ void StartCall() { reader_->StartCall(); }
+ void StartRead(Response* resp) { reader_->Read(resp); }
+
+ private:
+ friend class ClientCallbackReader<Response>;
+ void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
+ ClientCallbackReader<Response>* reader_;
+};
+
+template <class Request>
+class ClientWriteReactor {
+ public:
+ virtual ~ClientWriteReactor() {}
+ virtual void OnDone(const Status& s) {}
+ virtual void OnReadInitialMetadataDone(bool ok) {}
+ virtual void OnWriteDone(bool ok) {}
+ virtual void OnWritesDoneDone(bool ok) {}
+
+ void StartCall() { writer_->StartCall(); }
+ void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
+ void StartWrite(const Request* req, WriteOptions options) {
+ writer_->Write(req, std::move(options));
+ }
+ void StartWriteLast(const Request* req, WriteOptions options) {
+ StartWrite(req, std::move(options.set_last_message()));
+ }
+ void StartWritesDone() { writer_->WritesDone(); }
+
+ private:
+ friend class ClientCallbackWriter<Request>;
+ void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
+ ClientCallbackWriter<Request>* writer_;
+};
+
+} // namespace experimental
+
+namespace internal {
+
+// Forward declare factory classes for friendship
+template <class Request, class Response>
+class ClientCallbackReaderWriterFactory;
+template <class Response>
+class ClientCallbackReaderFactory;
+template <class Request>
+class ClientCallbackWriterFactory;
+
+template <class Request, class Response>
+class ClientCallbackReaderWriterImpl
+ : public ::grpc::experimental::ClientCallbackReaderWriter<Request,
+ Response> {
+ public:
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientCallbackReaderWriterImpl));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ void MaybeFinish() {
+ if (--callbacks_outstanding_ == 0) {
+ Status s = std::move(finish_status_);
+ auto* reactor = reactor_;
+ auto* call = call_.call();
+ this->~ClientCallbackReaderWriterImpl();
+ g_core_codegen_interface->grpc_call_unref(call);
+ reactor->OnDone(s);
+ }
+ }
+
+ void StartCall() override {
+ // This call initiates two batches, plus any backlog, each with a callback
+ // 1. Send initial metadata (unless corked) + recv initial metadata
+ // 2. Any read backlog
+ // 3. Recv trailing metadata, on_completion callback
+ // 4. Any write backlog
+ // 5. See if the call can finish (if other callbacks were triggered already)
+ started_ = true;
+
+ start_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
+ if (!start_corked_) {
+ start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ }
+ start_ops_.RecvInitialMetadata(context_);
+ start_ops_.set_core_cq_tag(&start_tag_);
+ call_.PerformOps(&start_ops_);
+
+ // Also set up the read and write tags so that they don't have to be set up
+ // each time
+ write_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeFinish();
+ },
+ &write_ops_);
+ write_ops_.set_core_cq_tag(&write_tag_);
+
+ read_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeFinish();
+ },
+ &read_ops_);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ if (read_ops_at_start_) {
+ call_.PerformOps(&read_ops_);
+ }
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ if (write_ops_at_start_) {
+ call_.PerformOps(&write_ops_);
+ }
+
+ if (writes_done_ops_at_start_) {
+ call_.PerformOps(&writes_done_ops_);
+ }
+ MaybeFinish();
+ }
+
+ void Read(Response* msg) override {
+ read_ops_.RecvMessage(msg);
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&read_ops_);
+ } else {
+ read_ops_at_start_ = true;
+ }
+ }
+
+ void Write(const Request* msg, WriteOptions options) override {
+ if (start_corked_) {
+ write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_corked_ = false;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok());
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&write_ops_);
+ } else {
+ write_ops_at_start_ = true;
+ }
+ }
+ void WritesDone() override {
+ if (start_corked_) {
+ writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_corked_ = false;
+ }
+ writes_done_ops_.ClientSendClose();
+ writes_done_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWritesDoneDone(ok);
+ MaybeFinish();
+ },
+ &writes_done_ops_);
+ writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&writes_done_ops_);
+ } else {
+ writes_done_ops_at_start_ = true;
+ }
+ }
+
+ private:
+ friend class ClientCallbackReaderWriterFactory<Request, Response>;
+
+ ClientCallbackReaderWriterImpl(
+ Call call, ClientContext* context,
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor)
+ : context_(context),
+ call_(call),
+ reactor_(reactor),
+ start_corked_(context_->initial_metadata_corked_) {
+ this->BindReactor(reactor);
+ }
+
+ ClientContext* context_;
+ Call call_;
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
+ CallbackWithSuccessTag start_tag_;
+ bool start_corked_;
+
+ CallOpSet<CallOpClientRecvStatus> finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ Status finish_status_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
+ CallbackWithSuccessTag write_tag_;
+ bool write_ops_at_start_{false};
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_;
+ CallbackWithSuccessTag writes_done_tag_;
+ bool writes_done_ops_at_start_{false};
+
+ CallOpSet<CallOpRecvMessage<Response>> read_ops_;
+ CallbackWithSuccessTag read_tag_;
+ bool read_ops_at_start_{false};
+
+ // Minimum of 3 callbacks to pre-register for StartCall, start, and finish
+ std::atomic_int callbacks_outstanding_{3};
+ bool started_{false};
+};
+
+template <class Request, class Response>
+class ClientCallbackReaderWriterFactory {
+ public:
+ static void Create(
+ ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
+ ClientContext* context,
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor) {
+ Call call = channel->CreateCall(method, context, channel->CallbackCQ());
+
+ g_core_codegen_interface->grpc_call_ref(call.call());
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
+ ClientCallbackReaderWriterImpl<Request, Response>(call, context,
+ reactor);
+ }
+};
+
+template <class Response>
+class ClientCallbackReaderImpl
+ : public ::grpc::experimental::ClientCallbackReader<Response> {
+ public:
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientCallbackReaderImpl));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ void MaybeFinish() {
+ if (--callbacks_outstanding_ == 0) {
+ Status s = std::move(finish_status_);
+ auto* reactor = reactor_;
+ auto* call = call_.call();
+ this->~ClientCallbackReaderImpl();
+ g_core_codegen_interface->grpc_call_unref(call);
+ reactor->OnDone(s);
+ }
+ }
+
+ void StartCall() override {
+ // This call initiates two batches, plus any backlog, each with a callback
+ // 1. Send initial metadata (unless corked) + recv initial metadata
+ // 2. Any backlog
+ // 3. Recv trailing metadata, on_completion callback
+ // 4. See if the call can finish (if other callbacks were triggered already)
+ started_ = true;
+
+ start_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
+ start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_ops_.RecvInitialMetadata(context_);
+ start_ops_.set_core_cq_tag(&start_tag_);
+ call_.PerformOps(&start_ops_);
+
+ // Also set up the read tag so it doesn't have to be set up each time
+ read_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeFinish();
+ },
+ &read_ops_);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ if (read_ops_at_start_) {
+ call_.PerformOps(&read_ops_);
+ }
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ MaybeFinish();
+ }
+
+ void Read(Response* msg) override {
+ read_ops_.RecvMessage(msg);
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&read_ops_);
+ } else {
+ read_ops_at_start_ = true;
+ }
+ }
+
+ private:
+ friend class ClientCallbackReaderFactory<Response>;
+
+ template <class Request>
+ ClientCallbackReaderImpl(
+ Call call, ClientContext* context, Request* request,
+ ::grpc::experimental::ClientReadReactor<Response>* reactor)
+ : context_(context), call_(call), reactor_(reactor) {
+ this->BindReactor(reactor);
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok());
+ start_ops_.ClientSendClose();
+ }
+
+ ClientContext* context_;
+ Call call_;
+ ::grpc::experimental::ClientReadReactor<Response>* reactor_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose,
+ CallOpRecvInitialMetadata>
+ start_ops_;
+ CallbackWithSuccessTag start_tag_;
+
+ CallOpSet<CallOpClientRecvStatus> finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ Status finish_status_;
+
+ CallOpSet<CallOpRecvMessage<Response>> read_ops_;
+ CallbackWithSuccessTag read_tag_;
+ bool read_ops_at_start_{false};
+
+ // Minimum of 3 callbacks to pre-register for StartCall, start, and finish
+ std::atomic_int callbacks_outstanding_{3};
+ bool started_{false};
+};
+
+template <class Response>
+class ClientCallbackReaderFactory {
+ public:
+ template <class Request>
+ static void Create(
+ ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const Request* request,
+ ::grpc::experimental::ClientReadReactor<Response>* reactor) {
+ Call call = channel->CreateCall(method, context, channel->CallbackCQ());
+
+ g_core_codegen_interface->grpc_call_ref(call.call());
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
+ ClientCallbackReaderImpl<Response>(call, context, request, reactor);
+ }
+};
+
+template <class Request>
+class ClientCallbackWriterImpl
+ : public ::grpc::experimental::ClientCallbackWriter<Request> {
+ public:
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientCallbackWriterImpl));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ void MaybeFinish() {
+ if (--callbacks_outstanding_ == 0) {
+ Status s = std::move(finish_status_);
+ auto* reactor = reactor_;
+ auto* call = call_.call();
+ this->~ClientCallbackWriterImpl();
+ g_core_codegen_interface->grpc_call_unref(call);
+ reactor->OnDone(s);
+ }
+ }
+
+ void StartCall() override {
+ // This call initiates two batches, plus any backlog, each with a callback
+ // 1. Send initial metadata (unless corked) + recv initial metadata
+ // 2. Recv trailing metadata, on_completion callback
+ // 3. Any backlog
+ // 4. See if the call can finish (if other callbacks were triggered already)
+ started_ = true;
+
+ start_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
+ if (!start_corked_) {
+ start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ }
+ start_ops_.RecvInitialMetadata(context_);
+ start_ops_.set_core_cq_tag(&start_tag_);
+ call_.PerformOps(&start_ops_);
+
+ // Also set up the read and write tags so that they don't have to be set up
+ // each time
+ write_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeFinish();
+ },
+ &write_ops_);
+ write_ops_.set_core_cq_tag(&write_tag_);
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ if (write_ops_at_start_) {
+ call_.PerformOps(&write_ops_);
+ }
+
+ if (writes_done_ops_at_start_) {
+ call_.PerformOps(&writes_done_ops_);
+ }
+
+ MaybeFinish();
+ }
+
+ void Write(const Request* msg, WriteOptions options) override {
+ if (start_corked_) {
+ write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_corked_ = false;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok());
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&write_ops_);
+ } else {
+ write_ops_at_start_ = true;
+ }
+ }
+ void WritesDone() override {
+ if (start_corked_) {
+ writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_corked_ = false;
+ }
+ writes_done_ops_.ClientSendClose();
+ writes_done_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWritesDoneDone(ok);
+ MaybeFinish();
+ },
+ &writes_done_ops_);
+ writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&writes_done_ops_);
+ } else {
+ writes_done_ops_at_start_ = true;
+ }
+ }
+
+ private:
+ friend class ClientCallbackWriterFactory<Request>;
+
+ template <class Response>
+ ClientCallbackWriterImpl(
+ Call call, ClientContext* context, Response* response,
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor)
+ : context_(context),
+ call_(call),
+ reactor_(reactor),
+ start_corked_(context_->initial_metadata_corked_) {
+ this->BindReactor(reactor);
+ finish_ops_.RecvMessage(response);
+ finish_ops_.AllowNoMessage();
+ }
+
+ ClientContext* context_;
+ Call call_;
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
+ CallbackWithSuccessTag start_tag_;
+ bool start_corked_;
+
+ CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ Status finish_status_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
+ CallbackWithSuccessTag write_tag_;
+ bool write_ops_at_start_{false};
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_;
+ CallbackWithSuccessTag writes_done_tag_;
+ bool writes_done_ops_at_start_{false};
+
+ // Minimum of 3 callbacks to pre-register for StartCall, start, and finish
+ std::atomic_int callbacks_outstanding_{3};
+ bool started_{false};
+};
+
+template <class Request>
+class ClientCallbackWriterFactory {
+ public:
+ template <class Response>
+ static void Create(
+ ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, Response* response,
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor) {
+ Call call = channel->CreateCall(method, context, channel->CallbackCQ());
+
+ g_core_codegen_interface->grpc_call_ref(call.call());
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
+ ClientCallbackWriterImpl<Request>(call, context, response, reactor);
+ }
+};
} // namespace internal
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h
index 75b955e760..5946488566 100644
--- a/include/grpcpp/impl/codegen/client_context.h
+++ b/include/grpcpp/impl/codegen/client_context.h
@@ -46,6 +46,7 @@
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/create_auth_context.h>
#include <grpcpp/impl/codegen/metadata_map.h>
+#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/security/auth_context.h>
#include <grpcpp/impl/codegen/slice.h>
#include <grpcpp/impl/codegen/status.h>
@@ -71,6 +72,12 @@ template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
class CallbackUnaryCallImpl;
+template <class Request, class Response>
+class ClientCallbackReaderWriterImpl;
+template <class Response>
+class ClientCallbackReaderImpl;
+template <class Request>
+class ClientCallbackWriterImpl;
} // namespace internal
template <class R>
@@ -162,6 +169,8 @@ class InteropClientContextInspector;
/// (see \a grpc::CreateCustomChannel).
///
/// \warning ClientContext instances should \em not be reused across rpcs.
+/// \warning The ClientContext instance used for creating an rpc must remain
+/// alive and valid for the lifetime of the rpc.
class ClientContext {
public:
ClientContext();
@@ -191,6 +200,13 @@ class ClientContext {
/// end in "-bin".
/// \param meta_value The metadata value. If its value is binary, the key name
/// must end in "-bin".
+ ///
+ /// Metadata must conform to the following format:
+ /// Custom-Metadata -> Binary-Header / ASCII-Header
+ /// Binary-Header -> {Header-Name "-bin" } {binary value}
+ /// ASCII-Header -> Header-Name ASCII-Value
+ /// Header-Name -> 1*( %x30-39 / %x61-7A / "_" / "-" / ".") ; 0-9 a-z _ - .
+ /// ASCII-Value -> 1*( %x20-%x7E ) ; space and printable ASCII
void AddMetadata(const grpc::string& meta_key,
const grpc::string& meta_value);
@@ -394,6 +410,12 @@ class ClientContext {
friend class ::grpc::internal::BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::CallbackUnaryCallImpl;
+ template <class Request, class Response>
+ friend class ::grpc::internal::ClientCallbackReaderWriterImpl;
+ template <class Response>
+ friend class ::grpc::internal::ClientCallbackReaderImpl;
+ template <class Request>
+ friend class ::grpc::internal::ClientCallbackWriterImpl;
// Used by friend class CallOpClientRecvStatus
void set_debug_error_string(const grpc::string& debug_error_string) {
@@ -404,12 +426,13 @@ class ClientContext {
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);
experimental::ClientRpcInfo* set_client_rpc_info(
- const char* method, grpc::ChannelInterface* channel,
+ const char* method, internal::RpcMethod::RpcType type,
+ grpc::ChannelInterface* channel,
const std::vector<
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>&
creators,
size_t interceptor_pos) {
- rpc_info_ = experimental::ClientRpcInfo(this, method, channel);
+ rpc_info_ = experimental::ClientRpcInfo(this, type, method, channel);
rpc_info_.RegisterInterceptors(creators, interceptor_pos);
return &rpc_info_;
}
diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h
index f69c99ab22..7dfe2290a3 100644
--- a/include/grpcpp/impl/codegen/client_interceptor.h
+++ b/include/grpcpp/impl/codegen/client_interceptor.h
@@ -23,6 +23,7 @@
#include <vector>
#include <grpcpp/impl/codegen/interceptor.h>
+#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/string_ref.h>
namespace grpc {
@@ -37,9 +38,17 @@ class InterceptorBatchMethodsImpl;
namespace experimental {
class ClientRpcInfo;
+// A factory interface for creation of client interceptors. A vector of
+// factories can be provided at channel creation which will be used to create a
+// new vector of client interceptors per RPC. Client interceptor authors should
+// create a subclass of ClientInterceptorFactorInterface which creates objects
+// of their interceptors.
class ClientInterceptorFactoryInterface {
public:
virtual ~ClientInterceptorFactoryInterface() {}
+ // Returns a pointer to an Interceptor object on successful creation, nullptr
+ // otherwise. If nullptr is returned, this server interceptor factory is
+ // ignored for the purposes of that RPC.
virtual Interceptor* CreateClientInterceptor(ClientRpcInfo* info) = 0;
};
} // namespace experimental
@@ -49,26 +58,74 @@ extern experimental::ClientInterceptorFactoryInterface*
g_global_client_interceptor_factory;
}
+/// ClientRpcInfo represents the state of a particular RPC as it
+/// appears to an interceptor. It is created and owned by the library and
+/// passed to the CreateClientInterceptor method of the application's
+/// ClientInterceptorFactoryInterface implementation
namespace experimental {
class ClientRpcInfo {
public:
- ClientRpcInfo() {}
+ // TODO(yashykt): Stop default-constructing ClientRpcInfo and remove UNKNOWN
+ // from the list of possible Types.
+ /// Type categorizes RPCs by unary or streaming type
+ enum class Type {
+ UNARY,
+ CLIENT_STREAMING,
+ SERVER_STREAMING,
+ BIDI_STREAMING,
+ UNKNOWN // UNKNOWN is not API and will be removed later
+ };
~ClientRpcInfo(){};
+ // Delete copy constructor but allow default move constructor
ClientRpcInfo(const ClientRpcInfo&) = delete;
ClientRpcInfo(ClientRpcInfo&&) = default;
- ClientRpcInfo& operator=(ClientRpcInfo&&) = default;
// Getter methods
- const char* method() { return method_; }
+
+ /// Return the fully-specified method name
+ const char* method() const { return method_; }
+
+ /// Return a pointer to the channel on which the RPC is being sent
ChannelInterface* channel() { return channel_; }
+
+ /// Return a pointer to the underlying ClientContext structure associated
+ /// with the RPC to support features that apply to it
grpc::ClientContext* client_context() { return ctx_; }
+ /// Return the type of the RPC (unary or a streaming flavor)
+ Type type() const { return type_; }
+
private:
- ClientRpcInfo(grpc::ClientContext* ctx, const char* method,
- grpc::ChannelInterface* channel)
- : ctx_(ctx), method_(method), channel_(channel) {}
+ static_assert(Type::UNARY ==
+ static_cast<Type>(internal::RpcMethod::NORMAL_RPC),
+ "violated expectation about Type enum");
+ static_assert(Type::CLIENT_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING),
+ "violated expectation about Type enum");
+ static_assert(Type::SERVER_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::SERVER_STREAMING),
+ "violated expectation about Type enum");
+ static_assert(Type::BIDI_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::BIDI_STREAMING),
+ "violated expectation about Type enum");
+
+ // Default constructor should only be used by ClientContext
+ ClientRpcInfo() = default;
+
+ // Constructor will only be called from ClientContext
+ ClientRpcInfo(grpc::ClientContext* ctx, internal::RpcMethod::RpcType type,
+ const char* method, grpc::ChannelInterface* channel)
+ : ctx_(ctx),
+ type_(static_cast<Type>(type)),
+ method_(method),
+ channel_(channel) {}
+
+ // Move assignment should only be used by ClientContext
+ // TODO(yashykt): Delete move assignment
+ ClientRpcInfo& operator=(ClientRpcInfo&&) = default;
+
// Runs interceptor at pos \a pos.
void RunInterceptor(
experimental::InterceptorBatchMethods* interceptor_methods, size_t pos) {
@@ -86,8 +143,11 @@ class ClientRpcInfo {
}
for (auto it = creators.begin() + interceptor_pos; it != creators.end();
++it) {
- interceptors_.push_back(std::unique_ptr<experimental::Interceptor>(
- (*it)->CreateClientInterceptor(this)));
+ auto* interceptor = (*it)->CreateClientInterceptor(this);
+ if (interceptor != nullptr) {
+ interceptors_.push_back(
+ std::unique_ptr<experimental::Interceptor>(interceptor));
+ }
}
if (internal::g_global_client_interceptor_factory != nullptr) {
interceptors_.push_back(std::unique_ptr<experimental::Interceptor>(
@@ -97,6 +157,8 @@ class ClientRpcInfo {
}
grpc::ClientContext* ctx_ = nullptr;
+ // TODO(yashykt): make type_ const once move-assignment is deleted
+ Type type_{Type::UNKNOWN};
const char* method_ = nullptr;
grpc::ChannelInterface* channel_ = nullptr;
std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_;
diff --git a/include/grpcpp/impl/codegen/client_unary_call.h b/include/grpcpp/impl/codegen/client_unary_call.h
index b1c80764f2..5151839412 100644
--- a/include/grpcpp/impl/codegen/client_unary_call.h
+++ b/include/grpcpp/impl/codegen/client_unary_call.h
@@ -69,13 +69,17 @@ class BlockingUnaryCallImpl {
ops.ClientSendClose();
ops.ClientRecvStatus(context, &status_);
call.PerformOps(&ops);
- if (cq.Pluck(&ops)) {
- if (!ops.got_message && status_.ok()) {
- status_ = Status(StatusCode::UNIMPLEMENTED,
- "No message returned for unary request");
- }
- } else {
- GPR_CODEGEN_ASSERT(!status_.ok());
+ cq.Pluck(&ops);
+ // Some of the ops might fail. If the ops fail in the core layer, status
+ // would reflect the error. But, if the ops fail in the C++ layer, the
+ // status would still be the same as the one returned by gRPC Core. This can
+ // happen if deserialization of the message fails.
+ // TODO(yashykt): If deserialization fails, but the status received is OK,
+ // then it might be a good idea to change the status to something better
+ // than StatusCode::UNIMPLEMENTED to reflect this.
+ if (!ops.got_message && status_.ok()) {
+ status_ = Status(StatusCode::UNIMPLEMENTED,
+ "No message returned for unary request");
}
}
Status status() { return status_; }
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index d603c7c700..fb38788f7d 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -307,8 +307,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
void* ignored = tag;
if (tag->FinalizeResult(&ignored, &ok)) {
GPR_CODEGEN_ASSERT(ignored == tag);
- // Ignore mutations by FinalizeResult: Pluck returns the C API status
- return ev.success != 0;
+ return ok;
}
}
}
diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h
index b977c35016..a1cb80013d 100644
--- a/include/grpcpp/impl/codegen/interceptor.h
+++ b/include/grpcpp/impl/codegen/interceptor.h
@@ -31,105 +31,144 @@ class ChannelInterface;
class Status;
namespace experimental {
-class InterceptedMessage {
- public:
- template <class M>
- bool Extract(M* msg); // returns false if definitely invalid extraction
- template <class M>
- M* MutableExtract();
- uint64_t length(); // length on wire
-};
+/// An enumeration of different possible points at which the \a Intercept
+/// method of the \a Interceptor interface may be called. Any given call
+/// to \a Intercept will include one or more of these hook points, and
+/// each hook point makes certain types of information available to the
+/// interceptor.
+/// In these enumeration names, PRE_SEND means that an interception has taken
+/// place between the time the application provided a certain type of data
+/// (e.g., initial metadata, status) and the time that that data goes to the
+/// other side. POST_SEND means that the data has been committed for going to
+/// the other side (even if it has not yet been received at the other side).
+/// PRE_RECV means an interception between the time that a certain
+/// operation has been requested and it is available. POST_RECV means that a
+/// result is available but has not yet been passed back to the application.
enum class InterceptionHookPoints {
- /* The first two in this list are for clients and servers */
+ /// The first two in this list are for clients and servers
PRE_SEND_INITIAL_METADATA,
PRE_SEND_MESSAGE,
- PRE_SEND_STATUS /* server only */,
- PRE_SEND_CLOSE /* client only */,
- /* The following three are for hijacked clients only and can only be
- registered by the global interceptor */
+ PRE_SEND_STATUS, // server only
+ PRE_SEND_CLOSE, // client only: WritesDone for stream; after write in unary
+ /// The following three are for hijacked clients only and can only be
+ /// registered by the global interceptor
PRE_RECV_INITIAL_METADATA,
PRE_RECV_MESSAGE,
PRE_RECV_STATUS,
- /* The following two are for all clients and servers */
+ /// The following two are for all clients and servers
POST_RECV_INITIAL_METADATA,
POST_RECV_MESSAGE,
- POST_RECV_STATUS /* client only */,
- POST_RECV_CLOSE /* server only */,
- /* This is a special hook point available to both clients and servers when
- TryCancel() is performed.
- - No other hook points will be present along with this.
- - It is illegal for an interceptor to block/delay this operation.
- - ALL interceptors see this hook point irrespective of whether the RPC was
- hijacked or not. */
+ POST_RECV_STATUS, // client only
+ POST_RECV_CLOSE, // server only
+ /// This is a special hook point available to both clients and servers when
+ /// TryCancel() is performed.
+ /// - No other hook points will be present along with this.
+ /// - It is illegal for an interceptor to block/delay this operation.
+ /// - ALL interceptors see this hook point irrespective of whether the
+ /// RPC was hijacked or not.
PRE_SEND_CANCEL,
NUM_INTERCEPTION_HOOKS
};
+/// Class that is passed as an argument to the \a Intercept method
+/// of the application's \a Interceptor interface implementation. It has five
+/// purposes:
+/// 1. Indicate which hook points are present at a specific interception
+/// 2. Allow an interceptor to inform the library that an RPC should
+/// continue to the next stage of its processing (which may be another
+/// interceptor or the main path of the library)
+/// 3. Allow an interceptor to hijack the processing of the RPC (only for
+/// client-side RPCs with PRE_SEND_INITIAL_METADATA) so that it does not
+/// proceed with normal processing beyond that stage
+/// 4. Access the relevant fields of an RPC at each interception point
+/// 5. Set some fields of an RPC at each interception point, when possible
class InterceptorBatchMethods {
public:
virtual ~InterceptorBatchMethods(){};
- // Queries to check whether the current batch has an interception hook point
- // of type \a type
+ /// Determine whether the current batch has an interception hook point
+ /// of type \a type
virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type) = 0;
- // Calling this will signal that the interceptor is done intercepting the
- // current batch of the RPC.
- // Proceed is a no-op if the batch contains PRE_SEND_CANCEL. Simply returning
- // from the Intercept method does the job of continuing the RPC in this case.
+ /// Signal that the interceptor is done intercepting the current batch of the
+ /// RPC. Every interceptor must either call Proceed or Hijack on each
+ /// interception. In most cases, only Proceed will be used. Explicit use of
+ /// Proceed is what enables interceptors to delay the processing of RPCs
+ /// while they perform other work.
+ /// Proceed is a no-op if the batch contains PRE_SEND_CANCEL. Simply returning
+ /// from the Intercept method does the job of continuing the RPC in this case.
+ /// This is because PRE_SEND_CANCEL is always in a separate batch and is not
+ /// allowed to be delayed.
virtual void Proceed() = 0;
- // Calling this indicates that the interceptor has hijacked the RPC (only
- // valid if the batch contains send_initial_metadata on the client side)
+ /// Indicate that the interceptor has hijacked the RPC (only valid if the
+ /// batch contains send_initial_metadata on the client side). Later
+ /// interceptors in the interceptor list will not be called. Later batches
+ /// on the same RPC will go through interception, but only up to the point
+ /// of the hijacking interceptor.
virtual void Hijack() = 0;
- // Returns a modifable ByteBuffer holding serialized form of the message to be
- // sent
+ /// Returns a modifable ByteBuffer holding the serialized form of the message
+ /// that is going to be sent. Valid for PRE_SEND_MESSAGE interceptions.
+ /// A return value of nullptr indicates that this ByteBuffer is not valid.
virtual ByteBuffer* GetSendMessage() = 0;
- // Returns a modifiable multimap of the initial metadata to be sent
+ /// Returns a modifiable multimap of the initial metadata to be sent. Valid
+ /// for PRE_SEND_INITIAL_METADATA interceptions. A value of nullptr indicates
+ /// that this field is not valid.
virtual std::multimap<grpc::string, grpc::string>*
GetSendInitialMetadata() = 0;
- // Returns the status to be sent
+ /// Returns the status to be sent. Valid for PRE_SEND_STATUS interceptions.
virtual Status GetSendStatus() = 0;
- // Modifies the status with \a status
+ /// Overwrites the status with \a status. Valid for PRE_SEND_STATUS
+ /// interceptions.
virtual void ModifySendStatus(const Status& status) = 0;
- // Returns a modifiable multimap of the trailing metadata to be sent
+ /// Returns a modifiable multimap of the trailing metadata to be sent. Valid
+ /// for PRE_SEND_STATUS interceptions. A value of nullptr indicates
+ /// that this field is not valid.
virtual std::multimap<grpc::string, grpc::string>*
GetSendTrailingMetadata() = 0;
- // Returns a pointer to the modifiable received message. Note that the message
- // is already deserialized
+ /// Returns a pointer to the modifiable received message. Note that the
+ /// message is already deserialized but the type is not set; the interceptor
+ /// should static_cast to the appropriate type before using it. This is valid
+ /// for POST_RECV_MESSAGE interceptions; nullptr for not valid
virtual void* GetRecvMessage() = 0;
- // Checks whether the RECV MESSAGE op completed successfully
- virtual bool GetRecvMessageStatus() = 0;
-
- // Returns a modifiable multimap of the received initial metadata
+ /// Returns a modifiable multimap of the received initial metadata.
+ /// Valid for POST_RECV_INITIAL_METADATA interceptions; nullptr if not valid
virtual std::multimap<grpc::string_ref, grpc::string_ref>*
GetRecvInitialMetadata() = 0;
- // Returns a modifiable view of the received status
+ /// Returns a modifiable view of the received status on POST_RECV_STATUS
+ /// interceptions; nullptr if not valid.
virtual Status* GetRecvStatus() = 0;
- // Returns a modifiable multimap of the received trailing metadata
+ /// Returns a modifiable multimap of the received trailing metadata on
+ /// POST_RECV_STATUS interceptions; nullptr if not valid
virtual std::multimap<grpc::string_ref, grpc::string_ref>*
GetRecvTrailingMetadata() = 0;
- // Gets an intercepted channel. When a call is started on this interceptor,
- // only interceptors after the current interceptor are created from the
- // factory objects registered with the channel.
+ /// Gets an intercepted channel. When a call is started on this interceptor,
+ /// only interceptors after the current interceptor are created from the
+ /// factory objects registered with the channel. This allows calls to be
+ /// started from interceptors without infinite regress through the interceptor
+ /// list.
virtual std::unique_ptr<ChannelInterface> GetInterceptedChannel() = 0;
// On a hijacked RPC, an interceptor can decide to fail a RECV MESSAGE op.
virtual void FailHijackedRecvMessage() = 0;
};
+/// Interface for an interceptor. Interceptor authors must create a class
+/// that derives from this parent class.
class Interceptor {
public:
virtual ~Interceptor() {}
+ /// The one public method of an Interceptor interface. Override this to
+ /// trigger the desired actions at the hook points described above.
virtual void Intercept(InterceptorBatchMethods* methods) = 0;
};
diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h
index b2e92dd6f3..d23b71f8a7 100644
--- a/include/grpcpp/impl/codegen/interceptor_common.h
+++ b/include/grpcpp/impl/codegen/interceptor_common.h
@@ -103,8 +103,6 @@ class InterceptorBatchMethodsImpl
void* GetRecvMessage() override { return recv_message_; }
- bool GetRecvMessageStatus() override { return *got_message_; }
-
std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
override {
return recv_initial_metadata_->map();
@@ -434,14 +432,6 @@ class CancelInterceptorBatchMethods
return nullptr;
}
- bool GetRecvMessageStatus() override {
- GPR_CODEGEN_ASSERT(
- false &&
- "It is illegal to call GetRecvMessageStatus on a method which "
- "has a Cancel notification");
- return false;
- }
-
std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
override {
GPR_CODEGEN_ASSERT(false &&
diff --git a/include/grpcpp/impl/codegen/metadata_map.h b/include/grpcpp/impl/codegen/metadata_map.h
index 0bba3ed4e3..9cec54d9f0 100644
--- a/include/grpcpp/impl/codegen/metadata_map.h
+++ b/include/grpcpp/impl/codegen/metadata_map.h
@@ -32,11 +32,9 @@ const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin";
class MetadataMap {
public:
- MetadataMap() { memset(&arr_, 0, sizeof(arr_)); }
+ MetadataMap() { Setup(); }
- ~MetadataMap() {
- g_core_codegen_interface->grpc_metadata_array_destroy(&arr_);
- }
+ ~MetadataMap() { Destroy(); }
grpc::string GetBinaryErrorDetails() {
// if filled_, extract from the multimap for O(log(n))
@@ -71,11 +69,24 @@ class MetadataMap {
}
grpc_metadata_array* arr() { return &arr_; }
+ void Reset() {
+ filled_ = false;
+ map_.clear();
+ Destroy();
+ Setup();
+ }
+
private:
bool filled_ = false;
grpc_metadata_array arr_;
std::multimap<grpc::string_ref, grpc::string_ref> map_;
+ void Destroy() {
+ g_core_codegen_interface->grpc_metadata_array_destroy(&arr_);
+ }
+
+ void Setup() { memset(&arr_, 0, sizeof(arr_)); }
+
void FillMap() {
if (filled_) return;
filled_ = true;
diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h
index b866fc16dc..1854f6ef2f 100644
--- a/include/grpcpp/impl/codegen/server_callback.h
+++ b/include/grpcpp/impl/codegen/server_callback.h
@@ -19,7 +19,9 @@
#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
+#include <atomic>
#include <functional>
+#include <type_traits>
#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/call_op_set.h>
@@ -32,19 +34,33 @@
namespace grpc {
-// forward declarations
+// Declare base class of all reactors as internal
namespace internal {
-template <class ServiceType, class RequestType, class ResponseType>
-class CallbackUnaryHandler;
+
+class ServerReactor {
+ public:
+ virtual ~ServerReactor() = default;
+ virtual void OnDone() {}
+ virtual void OnCancel() {}
+};
+
} // namespace internal
namespace experimental {
+// Forward declarations
+template <class Request, class Response>
+class ServerReadReactor;
+template <class Request, class Response>
+class ServerWriteReactor;
+template <class Request, class Response>
+class ServerBidiReactor;
+
// For unary RPCs, the exposed controller class is only an interface
// and the actual implementation is an internal class.
class ServerCallbackRpcController {
public:
- virtual ~ServerCallbackRpcController() {}
+ virtual ~ServerCallbackRpcController() = default;
// The method handler must call this function when it is done so that
// the library knows to free its resources
@@ -55,18 +71,193 @@ class ServerCallbackRpcController {
virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
};
+// NOTE: The actual streaming object classes are provided
+// as API only to support mocking. There are no implementations of
+// these class interfaces in the API.
+template <class Request>
+class ServerCallbackReader {
+ public:
+ virtual ~ServerCallbackReader() {}
+ virtual void Finish(Status s) = 0;
+ virtual void SendInitialMetadata() = 0;
+ virtual void Read(Request* msg) = 0;
+
+ protected:
+ template <class Response>
+ void BindReactor(ServerReadReactor<Request, Response>* reactor) {
+ reactor->BindReader(this);
+ }
+};
+
+template <class Response>
+class ServerCallbackWriter {
+ public:
+ virtual ~ServerCallbackWriter() {}
+
+ virtual void Finish(Status s) = 0;
+ virtual void SendInitialMetadata() = 0;
+ virtual void Write(const Response* msg, WriteOptions options) = 0;
+ virtual void WriteAndFinish(const Response* msg, WriteOptions options,
+ Status s) {
+ // Default implementation that can/should be overridden
+ Write(msg, std::move(options));
+ Finish(std::move(s));
+ };
+
+ protected:
+ template <class Request>
+ void BindReactor(ServerWriteReactor<Request, Response>* reactor) {
+ reactor->BindWriter(this);
+ }
+};
+
+template <class Request, class Response>
+class ServerCallbackReaderWriter {
+ public:
+ virtual ~ServerCallbackReaderWriter() {}
+
+ virtual void Finish(Status s) = 0;
+ virtual void SendInitialMetadata() = 0;
+ virtual void Read(Request* msg) = 0;
+ virtual void Write(const Response* msg, WriteOptions options) = 0;
+ virtual void WriteAndFinish(const Response* msg, WriteOptions options,
+ Status s) {
+ // Default implementation that can/should be overridden
+ Write(msg, std::move(options));
+ Finish(std::move(s));
+ };
+
+ protected:
+ void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
+ reactor->BindStream(this);
+ }
+};
+
+// The following classes are reactors that are to be implemented
+// by the user, returned as the result of the method handler for
+// a callback method, and activated by the call to OnStarted
+template <class Request, class Response>
+class ServerBidiReactor : public internal::ServerReactor {
+ public:
+ ~ServerBidiReactor() = default;
+ virtual void OnStarted(ServerContext*) {}
+ virtual void OnSendInitialMetadataDone(bool ok) {}
+ virtual void OnReadDone(bool ok) {}
+ virtual void OnWriteDone(bool ok) {}
+
+ void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
+ void StartRead(Request* msg) { stream_->Read(msg); }
+ void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); }
+ void StartWrite(const Response* msg, WriteOptions options) {
+ stream_->Write(msg, std::move(options));
+ }
+ void StartWriteAndFinish(const Response* msg, WriteOptions options,
+ Status s) {
+ stream_->WriteAndFinish(msg, std::move(options), std::move(s));
+ }
+ void StartWriteLast(const Response* msg, WriteOptions options) {
+ StartWrite(msg, std::move(options.set_last_message()));
+ }
+ void Finish(Status s) { stream_->Finish(std::move(s)); }
+
+ private:
+ friend class ServerCallbackReaderWriter<Request, Response>;
+ void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
+ stream_ = stream;
+ }
+
+ ServerCallbackReaderWriter<Request, Response>* stream_;
+};
+
+template <class Request, class Response>
+class ServerReadReactor : public internal::ServerReactor {
+ public:
+ ~ServerReadReactor() = default;
+ virtual void OnStarted(ServerContext*, Response* resp) {}
+ virtual void OnSendInitialMetadataDone(bool ok) {}
+ virtual void OnReadDone(bool ok) {}
+
+ void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
+ void StartRead(Request* msg) { reader_->Read(msg); }
+ void Finish(Status s) { reader_->Finish(std::move(s)); }
+
+ private:
+ friend class ServerCallbackReader<Request>;
+ void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
+
+ ServerCallbackReader<Request>* reader_;
+};
+
+template <class Request, class Response>
+class ServerWriteReactor : public internal::ServerReactor {
+ public:
+ ~ServerWriteReactor() = default;
+ virtual void OnStarted(ServerContext*, const Request* req) {}
+ virtual void OnSendInitialMetadataDone(bool ok) {}
+ virtual void OnWriteDone(bool ok) {}
+
+ void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
+ void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); }
+ void StartWrite(const Response* msg, WriteOptions options) {
+ writer_->Write(msg, std::move(options));
+ }
+ void StartWriteAndFinish(const Response* msg, WriteOptions options,
+ Status s) {
+ writer_->WriteAndFinish(msg, std::move(options), std::move(s));
+ }
+ void StartWriteLast(const Response* msg, WriteOptions options) {
+ StartWrite(msg, std::move(options.set_last_message()));
+ }
+ void Finish(Status s) { writer_->Finish(std::move(s)); }
+
+ private:
+ friend class ServerCallbackWriter<Response>;
+ void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }
+
+ ServerCallbackWriter<Response>* writer_;
+};
+
} // namespace experimental
namespace internal {
-template <class ServiceType, class RequestType, class ResponseType>
+template <class Request, class Response>
+class UnimplementedReadReactor
+ : public experimental::ServerReadReactor<Request, Response> {
+ public:
+ void OnDone() override { delete this; }
+ void OnStarted(ServerContext*, Response*) override {
+ this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
+ }
+};
+
+template <class Request, class Response>
+class UnimplementedWriteReactor
+ : public experimental::ServerWriteReactor<Request, Response> {
+ public:
+ void OnDone() override { delete this; }
+ void OnStarted(ServerContext*, const Request*) override {
+ this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
+ }
+};
+
+template <class Request, class Response>
+class UnimplementedBidiReactor
+ : public experimental::ServerBidiReactor<Request, Response> {
+ public:
+ void OnDone() override { delete this; }
+ void OnStarted(ServerContext*) override {
+ this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
+ }
+};
+
+template <class RequestType, class ResponseType>
class CallbackUnaryHandler : public MethodHandler {
public:
CallbackUnaryHandler(
std::function<void(ServerContext*, const RequestType*, ResponseType*,
experimental::ServerCallbackRpcController*)>
- func,
- ServiceType* service)
+ func)
: func_(func) {}
void RunHandler(const HandlerParameter& param) final {
// Arena allocate a controller structure (that includes request/response)
@@ -81,9 +272,8 @@ class CallbackUnaryHandler : public MethodHandler {
if (status.ok()) {
// Call the actual function handler and expect the user to call finish
- CatchingCallback(std::move(func_), param.server_context,
- controller->request(), controller->response(),
- controller);
+ CatchingCallback(func_, param.server_context, controller->request(),
+ controller->response(), controller);
} else {
// if deserialization failed, we need to fail the call
controller->Finish(status);
@@ -117,79 +307,579 @@ class CallbackUnaryHandler : public MethodHandler {
: public experimental::ServerCallbackRpcController {
public:
void Finish(Status s) override {
- finish_tag_.Set(
- call_.call(),
- [this](bool) {
- grpc_call* call = call_.call();
- auto call_requester = std::move(call_requester_);
- this->~ServerCallbackRpcControllerImpl(); // explicitly call
- // destructor
- g_core_codegen_interface->grpc_call_unref(call);
- call_requester();
- },
- &finish_buf_);
+ finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
+ &finish_ops_);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
- finish_buf_.set_compression_level(ctx_->compression_level());
+ finish_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (s.ok()) {
- finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
- finish_buf_.SendMessage(resp_));
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
+ finish_ops_.SendMessage(resp_));
} else {
- finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
}
- finish_buf_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
}
void SendInitialMetadata(std::function<void(bool)> f) override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
-
- meta_tag_.Set(call_.call(), std::move(f), &meta_buf_);
- meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
+ callbacks_outstanding_++;
+ // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
+ // and if performance of this operation matters
+ meta_tag_.Set(call_.call(),
+ [this, f](bool ok) {
+ f(ok);
+ MaybeDone();
+ },
+ &meta_ops_);
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
- meta_buf_.set_compression_level(ctx_->compression_level());
+ meta_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
- meta_buf_.set_core_cq_tag(&meta_tag_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
}
private:
- template <class SrvType, class ReqType, class RespType>
- friend class CallbackUnaryHandler;
+ friend class CallbackUnaryHandler<RequestType, ResponseType>;
ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
- RequestType* req,
+ const RequestType* req,
std::function<void()> call_requester)
: ctx_(ctx),
call_(*call),
req_(req),
- call_requester_(std::move(call_requester)) {}
+ call_requester_(std::move(call_requester)) {
+ ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
+ }
~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
- RequestType* request() { return req_; }
+ const RequestType* request() { return req_; }
ResponseType* response() { return &resp_; }
- CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+ void MaybeDone() {
+ if (--callbacks_outstanding_ == 0) {
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
+ }
+ }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallbackWithSuccessTag meta_tag_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
- finish_buf_;
+ finish_ops_;
CallbackWithSuccessTag finish_tag_;
ServerContext* ctx_;
Call call_;
- RequestType* req_;
+ const RequestType* req_;
ResponseType resp_;
std::function<void()> call_requester_;
+ std::atomic_int callbacks_outstanding_{
+ 2}; // reserve for Finish and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackClientStreamingHandler : public MethodHandler {
+ public:
+ CallbackClientStreamingHandler(
+ std::function<
+ experimental::ServerReadReactor<RequestType, ResponseType>*()>
+ func)
+ : func_(std::move(func)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ // Arena allocate a reader structure (that includes response)
+ g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ experimental::ServerReadReactor<RequestType, ResponseType>* reactor =
+ param.status.ok()
+ ? CatchingReactorCreator<
+ experimental::ServerReadReactor<RequestType, ResponseType>>(
+ func_)
+ : nullptr;
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new UnimplementedReadReactor<RequestType, ResponseType>;
+ }
+
+ auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackReaderImpl)))
+ ServerCallbackReaderImpl(param.server_context, param.call,
+ std::move(param.call_requester), reactor);
+
+ reader->BindReactor(reactor);
+ reactor->OnStarted(param.server_context, reader->response());
+ reader->MaybeDone();
+ }
+
+ private:
+ std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
+ func_;
+
+ class ServerCallbackReaderImpl
+ : public experimental::ServerCallbackReader<RequestType> {
+ public:
+ void Finish(Status s) override {
+ finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
+ &finish_ops_);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (s.ok()) {
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
+ finish_ops_.SendMessage(resp_));
+ } else {
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ }
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ callbacks_outstanding_++;
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnSendInitialMetadataDone(ok);
+ MaybeDone();
+ },
+ &meta_ops_);
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Read(RequestType* req) override {
+ callbacks_outstanding_++;
+ read_ops_.RecvMessage(req);
+ call_.PerformOps(&read_ops_);
+ }
+
+ private:
+ friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
+
+ ServerCallbackReaderImpl(
+ ServerContext* ctx, Call* call, std::function<void()> call_requester,
+ experimental::ServerReadReactor<RequestType, ResponseType>* reactor)
+ : ctx_(ctx),
+ call_(*call),
+ call_requester_(std::move(call_requester)),
+ reactor_(reactor) {
+ ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
+ read_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeDone();
+ },
+ &read_ops_);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ }
+
+ ~ServerCallbackReaderImpl() {}
+
+ ResponseType* response() { return &resp_; }
+
+ void MaybeDone() {
+ if (--callbacks_outstanding_ == 0) {
+ reactor_->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ this->~ServerCallbackReaderImpl(); // explicitly call destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
+ }
+ }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallbackWithSuccessTag meta_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
+ CallbackWithSuccessTag read_tag_;
+
+ ServerContext* ctx_;
+ Call call_;
+ ResponseType resp_;
+ std::function<void()> call_requester_;
+ experimental::ServerReadReactor<RequestType, ResponseType>* reactor_;
+ std::atomic_int callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackServerStreamingHandler : public MethodHandler {
+ public:
+ CallbackServerStreamingHandler(
+ std::function<
+ experimental::ServerWriteReactor<RequestType, ResponseType>*()>
+ func)
+ : func_(std::move(func)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ // Arena allocate a writer structure
+ g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ experimental::ServerWriteReactor<RequestType, ResponseType>* reactor =
+ param.status.ok()
+ ? CatchingReactorCreator<
+ experimental::ServerWriteReactor<RequestType, ResponseType>>(
+ func_)
+ : nullptr;
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new UnimplementedWriteReactor<RequestType, ResponseType>;
+ }
+
+ auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackWriterImpl)))
+ ServerCallbackWriterImpl(param.server_context, param.call,
+ static_cast<RequestType*>(param.request),
+ std::move(param.call_requester), reactor);
+ writer->BindReactor(reactor);
+ reactor->OnStarted(param.server_context, writer->request());
+ writer->MaybeDone();
+ }
+
+ void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
+ Status* status) final {
+ ByteBuffer buf;
+ buf.set_buffer(req);
+ auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call, sizeof(RequestType))) RequestType();
+ *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
+ buf.Release();
+ if (status->ok()) {
+ return request;
+ }
+ request->~RequestType();
+ return nullptr;
+ }
+
+ private:
+ std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
+ func_;
+
+ class ServerCallbackWriterImpl
+ : public experimental::ServerCallbackWriter<ResponseType> {
+ public:
+ void Finish(Status s) override {
+ finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
+ &finish_ops_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ callbacks_outstanding_++;
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnSendInitialMetadataDone(ok);
+ MaybeDone();
+ },
+ &meta_ops_);
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Write(const ResponseType* resp, WriteOptions options) override {
+ callbacks_outstanding_++;
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ if (!ctx_->sent_initial_metadata_) {
+ write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ write_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const ResponseType* resp, WriteOptions options,
+ Status s) override {
+ // This combines the write into the finish callback
+ // Don't send any message if the status is bad
+ if (s.ok()) {
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok());
+ }
+ Finish(std::move(s));
+ }
+
+ private:
+ friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
+
+ ServerCallbackWriterImpl(
+ ServerContext* ctx, Call* call, const RequestType* req,
+ std::function<void()> call_requester,
+ experimental::ServerWriteReactor<RequestType, ResponseType>* reactor)
+ : ctx_(ctx),
+ call_(*call),
+ req_(req),
+ call_requester_(std::move(call_requester)),
+ reactor_(reactor) {
+ ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
+ write_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeDone();
+ },
+ &write_ops_);
+ write_ops_.set_core_cq_tag(&write_tag_);
+ }
+ ~ServerCallbackWriterImpl() { req_->~RequestType(); }
+
+ const RequestType* request() { return req_; }
+
+ void MaybeDone() {
+ if (--callbacks_outstanding_ == 0) {
+ reactor_->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ this->~ServerCallbackWriterImpl(); // explicitly call destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
+ }
+ }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallbackWithSuccessTag meta_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallbackWithSuccessTag write_tag_;
+
+ ServerContext* ctx_;
+ Call call_;
+ const RequestType* req_;
+ std::function<void()> call_requester_;
+ experimental::ServerWriteReactor<RequestType, ResponseType>* reactor_;
+ std::atomic_int callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackBidiHandler : public MethodHandler {
+ public:
+ CallbackBidiHandler(
+ std::function<
+ experimental::ServerBidiReactor<RequestType, ResponseType>*()>
+ func)
+ : func_(std::move(func)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ experimental::ServerBidiReactor<RequestType, ResponseType>* reactor =
+ param.status.ok()
+ ? CatchingReactorCreator<
+ experimental::ServerBidiReactor<RequestType, ResponseType>>(
+ func_)
+ : nullptr;
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new UnimplementedBidiReactor<RequestType, ResponseType>;
+ }
+
+ auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
+ ServerCallbackReaderWriterImpl(param.server_context, param.call,
+ std::move(param.call_requester),
+ reactor);
+
+ stream->BindReactor(reactor);
+ reactor->OnStarted(param.server_context);
+ stream->MaybeDone();
+ }
+
+ private:
+ std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
+ func_;
+
+ class ServerCallbackReaderWriterImpl
+ : public experimental::ServerCallbackReaderWriter<RequestType,
+ ResponseType> {
+ public:
+ void Finish(Status s) override {
+ finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
+ &finish_ops_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ callbacks_outstanding_++;
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnSendInitialMetadataDone(ok);
+ MaybeDone();
+ },
+ &meta_ops_);
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Write(const ResponseType* resp, WriteOptions options) override {
+ callbacks_outstanding_++;
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ if (!ctx_->sent_initial_metadata_) {
+ write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ write_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const ResponseType* resp, WriteOptions options,
+ Status s) override {
+ // Don't send any message if the status is bad
+ if (s.ok()) {
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok());
+ }
+ Finish(std::move(s));
+ }
+
+ void Read(RequestType* req) override {
+ callbacks_outstanding_++;
+ read_ops_.RecvMessage(req);
+ call_.PerformOps(&read_ops_);
+ }
+
+ private:
+ friend class CallbackBidiHandler<RequestType, ResponseType>;
+
+ ServerCallbackReaderWriterImpl(
+ ServerContext* ctx, Call* call, std::function<void()> call_requester,
+ experimental::ServerBidiReactor<RequestType, ResponseType>* reactor)
+ : ctx_(ctx),
+ call_(*call),
+ call_requester_(std::move(call_requester)),
+ reactor_(reactor) {
+ ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
+ write_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeDone();
+ },
+ &write_ops_);
+ write_ops_.set_core_cq_tag(&write_tag_);
+ read_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeDone();
+ },
+ &read_ops_);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ }
+ ~ServerCallbackReaderWriterImpl() {}
+
+ void MaybeDone() {
+ if (--callbacks_outstanding_ == 0) {
+ reactor_->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
+ }
+ }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallbackWithSuccessTag meta_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallbackWithSuccessTag write_tag_;
+ CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
+ CallbackWithSuccessTag read_tag_;
+
+ ServerContext* ctx_;
+ Call call_;
+ std::function<void()> call_requester_;
+ experimental::ServerBidiReactor<RequestType, ResponseType>* reactor_;
+ std::atomic_int callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
};
};
diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h
index 82ee862f61..affe61b547 100644
--- a/include/grpcpp/impl/codegen/server_context.h
+++ b/include/grpcpp/impl/codegen/server_context.h
@@ -66,13 +66,20 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
-template <class ServiceType, class RequestType, class ResponseType>
+template <class RequestType, class ResponseType>
class CallbackUnaryHandler;
+template <class RequestType, class ResponseType>
+class CallbackClientStreamingHandler;
+template <class RequestType, class ResponseType>
+class CallbackServerStreamingHandler;
+template <class RequestType, class ResponseType>
+class CallbackBidiHandler;
template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
class Call;
+class ServerReactor;
} // namespace internal
class CompletionQueue;
@@ -124,6 +131,13 @@ class ServerContext {
/// end in "-bin".
/// \param value The metadata value. If its value is binary, the key name
/// must end in "-bin".
+ ///
+ /// Metadata must conform to the following format:
+ /// Custom-Metadata -> Binary-Header / ASCII-Header
+ /// Binary-Header -> {Header-Name "-bin" } {binary value}
+ /// ASCII-Header -> Header-Name ASCII-Value
+ /// Header-Name -> 1*( %x30-39 / %x61-7A / "_" / "-" / ".") ; 0-9 a-z _ - .
+ /// ASCII-Value -> 1*( %x20-%x7E ) ; space and printable ASCII
void AddInitialMetadata(const grpc::string& key, const grpc::string& value);
/// Add the (\a key, \a value) pair to the initial metadata
@@ -138,6 +152,13 @@ class ServerContext {
/// it must end in "-bin".
/// \param value The metadata value. If its value is binary, the key name
/// must end in "-bin".
+ ///
+ /// Metadata must conform to the following format:
+ /// Custom-Metadata -> Binary-Header / ASCII-Header
+ /// Binary-Header -> {Header-Name "-bin" } {binary value}
+ /// ASCII-Header -> Header-Name ASCII-Value
+ /// Header-Name -> 1*( %x30-39 / %x61-7A / "_" / "-" / ".") ; 0-9 a-z _ - .
+ /// ASCII-Value -> 1*( %x20-%x7E ) ; space and printable ASCII
void AddTrailingMetadata(const grpc::string& key, const grpc::string& value);
/// IsCancelled is always safe to call when using sync or callback API.
@@ -270,8 +291,14 @@ class ServerContext {
friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler;
- template <class ServiceType, class RequestType, class ResponseType>
+ template <class RequestType, class ResponseType>
friend class ::grpc::internal::CallbackUnaryHandler;
+ template <class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackClientStreamingHandler;
+ template <class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackServerStreamingHandler;
+ template <class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackBidiHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
friend class ::grpc::ClientContext;
@@ -282,7 +309,9 @@ class ServerContext {
class CompletionOp;
- void BeginCompletionOp(internal::Call* call, bool callback);
+ void BeginCompletionOp(internal::Call* call,
+ std::function<void(bool)> callback,
+ internal::ServerReactor* reactor);
/// Return the tag queued by BeginCompletionOp()
internal::CompletionQueueTag* GetCompletionOpTag();
@@ -299,12 +328,12 @@ class ServerContext {
uint32_t initial_metadata_flags() const { return 0; }
experimental::ServerRpcInfo* set_server_rpc_info(
- const char* method,
+ const char* method, internal::RpcMethod::RpcType type,
const std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>&
creators) {
if (creators.size() != 0) {
- rpc_info_ = new experimental::ServerRpcInfo(this, method);
+ rpc_info_ = new experimental::ServerRpcInfo(this, method, type);
rpc_info_->RegisterInterceptors(creators);
}
return rpc_info_;
diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h
index 5fb5df28b7..3e71b3fc55 100644
--- a/include/grpcpp/impl/codegen/server_interceptor.h
+++ b/include/grpcpp/impl/codegen/server_interceptor.h
@@ -23,6 +23,7 @@
#include <vector>
#include <grpcpp/impl/codegen/interceptor.h>
+#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/string_ref.h>
namespace grpc {
@@ -36,27 +37,66 @@ class InterceptorBatchMethodsImpl;
namespace experimental {
class ServerRpcInfo;
+// A factory interface for creation of server interceptors. A vector of
+// factories can be provided to ServerBuilder which will be used to create a new
+// vector of server interceptors per RPC. Server interceptor authors should
+// create a subclass of ServerInterceptorFactorInterface which creates objects
+// of their interceptors.
class ServerInterceptorFactoryInterface {
public:
virtual ~ServerInterceptorFactoryInterface() {}
+ // Returns a pointer to an Interceptor object on successful creation, nullptr
+ // otherwise. If nullptr is returned, this server interceptor factory is
+ // ignored for the purposes of that RPC.
virtual Interceptor* CreateServerInterceptor(ServerRpcInfo* info) = 0;
};
+/// ServerRpcInfo represents the state of a particular RPC as it
+/// appears to an interceptor. It is created and owned by the library and
+/// passed to the CreateServerInterceptor method of the application's
+/// ServerInterceptorFactoryInterface implementation
class ServerRpcInfo {
public:
+ /// Type categorizes RPCs by unary or streaming type
+ enum class Type { UNARY, CLIENT_STREAMING, SERVER_STREAMING, BIDI_STREAMING };
+
~ServerRpcInfo(){};
+ // Delete all copy and move constructors and assignments
ServerRpcInfo(const ServerRpcInfo&) = delete;
- ServerRpcInfo(ServerRpcInfo&&) = default;
- ServerRpcInfo& operator=(ServerRpcInfo&&) = default;
+ ServerRpcInfo& operator=(const ServerRpcInfo&) = delete;
+ ServerRpcInfo(ServerRpcInfo&&) = delete;
+ ServerRpcInfo& operator=(ServerRpcInfo&&) = delete;
// Getter methods
- const char* method() { return method_; }
+
+ /// Return the fully-specified method name
+ const char* method() const { return method_; }
+
+ /// Return the type of the RPC (unary or a streaming flavor)
+ Type type() const { return type_; }
+
+ /// Return a pointer to the underlying ServerContext structure associated
+ /// with the RPC to support features that apply to it
grpc::ServerContext* server_context() { return ctx_; }
private:
- ServerRpcInfo(grpc::ServerContext* ctx, const char* method)
- : ctx_(ctx), method_(method) {
+ static_assert(Type::UNARY ==
+ static_cast<Type>(internal::RpcMethod::NORMAL_RPC),
+ "violated expectation about Type enum");
+ static_assert(Type::CLIENT_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING),
+ "violated expectation about Type enum");
+ static_assert(Type::SERVER_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::SERVER_STREAMING),
+ "violated expectation about Type enum");
+ static_assert(Type::BIDI_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::BIDI_STREAMING),
+ "violated expectation about Type enum");
+
+ ServerRpcInfo(grpc::ServerContext* ctx, const char* method,
+ internal::RpcMethod::RpcType type)
+ : ctx_(ctx), method_(method), type_(static_cast<Type>(type)) {
ref_.store(1);
}
@@ -72,8 +112,11 @@ class ServerRpcInfo {
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>&
creators) {
for (const auto& creator : creators) {
- interceptors_.push_back(std::unique_ptr<experimental::Interceptor>(
- creator->CreateServerInterceptor(this)));
+ auto* interceptor = creator->CreateServerInterceptor(this);
+ if (interceptor != nullptr) {
+ interceptors_.push_back(
+ std::unique_ptr<experimental::Interceptor>(interceptor));
+ }
}
}
@@ -86,6 +129,7 @@ class ServerRpcInfo {
grpc::ServerContext* ctx_ = nullptr;
const char* method_ = nullptr;
+ const Type type_;
std::atomic_int ref_;
std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_;
diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h
index 23d1f445b1..890a5650d0 100644
--- a/include/grpcpp/impl/codegen/server_interface.h
+++ b/include/grpcpp/impl/codegen/server_interface.h
@@ -174,13 +174,14 @@ class ServerInterface : public internal::CallHook {
bool done_intercepting_;
};
+ /// RegisteredAsyncRequest is not part of the C++ API
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
- const char* name);
+ const char* name, internal::RpcMethod::RpcType type);
virtual bool FinalizeResult(void** tag, bool* status) override {
/* If we are done intercepting, then there is nothing more for us to do */
@@ -189,7 +190,7 @@ class ServerInterface : public internal::CallHook {
}
call_wrapper_ = internal::Call(
call_, server_, call_cq_, server_->max_receive_message_size(),
- context_->set_server_rpc_info(name_,
+ context_->set_server_rpc_info(name_, type_,
*server_->interceptor_creators()));
return BaseAsyncRequest::FinalizeResult(tag, status);
}
@@ -198,6 +199,7 @@ class ServerInterface : public internal::CallHook {
void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq);
const char* name_;
+ const internal::RpcMethod::RpcType type_;
};
class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
@@ -207,9 +209,9 @@ class ServerInterface : public internal::CallHook {
internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
- : RegisteredAsyncRequest(server, context, stream, call_cq,
- notification_cq, tag,
- registered_method->name()) {
+ : RegisteredAsyncRequest(
+ server, context, stream, call_cq, notification_cq, tag,
+ registered_method->name(), registered_method->method_type()) {
IssueRequest(registered_method->server_tag(), nullptr, notification_cq);
}
@@ -225,9 +227,9 @@ class ServerInterface : public internal::CallHook {
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
- : RegisteredAsyncRequest(server, context, stream, call_cq,
- notification_cq, tag,
- registered_method->name()),
+ : RegisteredAsyncRequest(
+ server, context, stream, call_cq, notification_cq, tag,
+ registered_method->name(), registered_method->method_type()),
registered_method_(registered_method),
server_(server),
context_(context),
diff --git a/include/grpcpp/security/credentials.h b/include/grpcpp/security/credentials.h
index 8dfbdec3e6..d8c9e04d77 100644
--- a/include/grpcpp/security/credentials.h
+++ b/include/grpcpp/security/credentials.h
@@ -46,8 +46,8 @@ std::shared_ptr<Channel> CreateCustomChannelWithInterceptors(
const grpc::string& target,
const std::shared_ptr<ChannelCredentials>& creds,
const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
} // namespace experimental
@@ -80,8 +80,8 @@ class ChannelCredentials : private GrpcLibraryCodegen {
const grpc::string& target,
const std::shared_ptr<ChannelCredentials>& creds,
const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
virtual std::shared_ptr<Channel> CreateChannel(
@@ -91,8 +91,8 @@ class ChannelCredentials : private GrpcLibraryCodegen {
// implemented as a virtual function so that it does not break API.
virtual std::shared_ptr<Channel> CreateChannelWithInterceptors(
const grpc::string& target, const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators) {
return nullptr;
};
diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h
index a14a4da578..cdcac186cb 100644
--- a/include/grpcpp/server.h
+++ b/include/grpcpp/server.h
@@ -111,8 +111,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
/// interceptors
std::shared_ptr<Channel> InProcessChannelWithInterceptors(
const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
private:
diff --git a/include/grpcpp/support/client_interceptor.h b/include/grpcpp/support/client_interceptor.h
new file mode 100644
index 0000000000..50810e3fe3
--- /dev/null
+++ b/include/grpcpp/support/client_interceptor.h
@@ -0,0 +1,24 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H
+#define GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H
+
+#include <grpcpp/impl/codegen/client_interceptor.h>
+
+#endif // GRPCPP_SUPPORT_CLIENT_INTERCEPTOR_H
diff --git a/include/grpcpp/support/interceptor.h b/include/grpcpp/support/interceptor.h
new file mode 100644
index 0000000000..7ff79516ba
--- /dev/null
+++ b/include/grpcpp/support/interceptor.h
@@ -0,0 +1,24 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_SUPPORT_INTERCEPTOR_H
+#define GRPCPP_SUPPORT_INTERCEPTOR_H
+
+#include <grpcpp/impl/codegen/interceptor.h>
+
+#endif // GRPCPP_SUPPORT_INTERCEPTOR_H
diff --git a/include/grpcpp/support/server_interceptor.h b/include/grpcpp/support/server_interceptor.h
new file mode 100644
index 0000000000..b0a6229b66
--- /dev/null
+++ b/include/grpcpp/support/server_interceptor.h
@@ -0,0 +1,24 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H
+#define GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H
+
+#include <grpcpp/impl/codegen/server_interceptor.h>
+
+#endif // GRPCPP_SUPPORT_SERVER_INTERCEPTOR_H