aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-12-18 11:40:52 -0800
committerGravatar Yash Tibrewal <yashkt@google.com>2018-12-18 11:40:52 -0800
commitbb303513dce4e78d898035b9f7f5ae11b7b452dd (patch)
tree708cacfd6a93bdfff3fa76e813afe909f3e62fe6 /include
parent00c9c40004d011f01c72d253a530edb3364992bf (diff)
parentd198607457a6f5d47e8c134277ebfe3706f9476b (diff)
Merge master
Diffstat (limited to 'include')
-rw-r--r--include/grpc/grpc.h3
-rw-r--r--include/grpc/grpc_security_constants.h6
-rw-r--r--include/grpc/impl/codegen/atm_gcc_sync.h2
-rw-r--r--include/grpc/impl/codegen/atm_windows.h2
-rw-r--r--include/grpc/impl/codegen/grpc_types.h7
-rw-r--r--include/grpc/impl/codegen/port_platform.h9
-rw-r--r--include/grpcpp/alarm.h93
-rw-r--r--include/grpcpp/alarm_impl.h116
-rw-r--r--include/grpcpp/channel.h8
-rw-r--r--include/grpcpp/create_channel.h4
-rw-r--r--include/grpcpp/generic/generic_stub.h5
-rw-r--r--include/grpcpp/health_check_service_interface.h4
-rw-r--r--include/grpcpp/impl/codegen/byte_buffer.h19
-rw-r--r--include/grpcpp/impl/codegen/call_op_set.h3
-rw-r--r--include/grpcpp/impl/codegen/callback_common.h27
-rw-r--r--include/grpcpp/impl/codegen/channel_interface.h13
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h655
-rw-r--r--include/grpcpp/impl/codegen/client_context.h20
-rw-r--r--include/grpcpp/impl/codegen/client_interceptor.h48
-rw-r--r--include/grpcpp/impl/codegen/client_unary_call.h18
-rw-r--r--include/grpcpp/impl/codegen/completion_queue.h3
-rw-r--r--include/grpcpp/impl/codegen/server_callback.h774
-rw-r--r--include/grpcpp/impl/codegen/server_context.h25
-rw-r--r--include/grpcpp/impl/codegen/server_interceptor.h29
-rw-r--r--include/grpcpp/impl/codegen/server_interface.h18
-rw-r--r--include/grpcpp/security/credentials.h12
-rw-r--r--include/grpcpp/server.h4
27 files changed, 1732 insertions, 195 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index d3b74cabab..fec7f5269e 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -511,7 +511,8 @@ GRPCAPI char* grpc_channelz_get_server(intptr_t server_id);
/* Gets all server sockets that exist in the server. */
GRPCAPI char* grpc_channelz_get_server_sockets(intptr_t server_id,
- intptr_t start_socket_id);
+ intptr_t start_socket_id,
+ intptr_t max_results);
/* Returns a single Channel, or else a NOT_FOUND code. The returned string
is allocated and must be freed by the application. */
diff --git a/include/grpc/grpc_security_constants.h b/include/grpc/grpc_security_constants.h
index f935557f2d..a082f67010 100644
--- a/include/grpc/grpc_security_constants.h
+++ b/include/grpc/grpc_security_constants.h
@@ -106,10 +106,10 @@ typedef enum {
} grpc_ssl_client_certificate_request_type;
/**
- * Type of local connection for which local channel/server credentials will be
- * applied. It only supports UDS for now.
+ * Type of local connections for which local channel/server credentials will be
+ * applied. It supports UDS and local TCP connections.
*/
-typedef enum { UDS = 0 } grpc_local_connect_type;
+typedef enum { UDS = 0, LOCAL_TCP } grpc_local_connect_type;
#ifdef __cplusplus
}
diff --git a/include/grpc/impl/codegen/atm_gcc_sync.h b/include/grpc/impl/codegen/atm_gcc_sync.h
index c0010a3469..728c3d5412 100644
--- a/include/grpc/impl/codegen/atm_gcc_sync.h
+++ b/include/grpc/impl/codegen/atm_gcc_sync.h
@@ -26,6 +26,8 @@
typedef intptr_t gpr_atm;
#define GPR_ATM_MAX INTPTR_MAX
#define GPR_ATM_MIN INTPTR_MIN
+#define GPR_ATM_INC_CAS_THEN(blah) blah
+#define GPR_ATM_INC_ADD_THEN(blah) blah
#define GPR_ATM_COMPILE_BARRIER_() __asm__ __volatile__("" : : : "memory")
diff --git a/include/grpc/impl/codegen/atm_windows.h b/include/grpc/impl/codegen/atm_windows.h
index f6b27e5df7..c016b90095 100644
--- a/include/grpc/impl/codegen/atm_windows.h
+++ b/include/grpc/impl/codegen/atm_windows.h
@@ -25,6 +25,8 @@
typedef intptr_t gpr_atm;
#define GPR_ATM_MAX INTPTR_MAX
#define GPR_ATM_MIN INTPTR_MIN
+#define GPR_ATM_INC_CAS_THEN(blah) blah
+#define GPR_ATM_INC_ADD_THEN(blah) blah
#define gpr_atm_full_barrier MemoryBarrier
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 17a43fab0f..a9fb27946e 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -293,7 +293,7 @@ typedef struct {
"grpc.max_channel_trace_event_memory_per_node"
/** If non-zero, gRPC library will track stats and information at at per channel
* level. Disabling channelz naturally disables channel tracing. The default
- * is for channelz to be disabled. */
+ * is for channelz to be enabled. */
#define GRPC_ARG_ENABLE_CHANNELZ "grpc.enable_channelz"
/** If non-zero, Cronet transport will coalesce packets to fewer frames
* when possible. */
@@ -350,6 +350,11 @@ typedef struct {
/** If set, inhibits health checking (which may be enabled via the
* service config.) */
#define GRPC_ARG_INHIBIT_HEALTH_CHECKING "grpc.inhibit_health_checking"
+/** If set, determines the number of milliseconds that the c-ares based
+ * DNS resolver will wait on queries before cancelling them. The default value
+ * is 10000. Setting this to "0" will disable c-ares query timeouts
+ * entirely. */
+#define GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS "grpc.dns_ares_query_timeout"
/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a
diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h
index b2028a6305..031c0c36ae 100644
--- a/include/grpc/impl/codegen/port_platform.h
+++ b/include/grpc/impl/codegen/port_platform.h
@@ -526,6 +526,15 @@ typedef unsigned __int64 uint64_t;
#endif /* GPR_ATTRIBUTE_NO_TSAN (2) */
#endif /* GPR_ATTRIBUTE_NO_TSAN (1) */
+/* GRPC_TSAN_ENABLED will be defined, when compiled with thread sanitizer. */
+#if defined(__SANITIZE_THREAD__)
+#define GRPC_TSAN_ENABLED
+#elif defined(__has_feature)
+#if __has_feature(thread_sanitizer)
+#define GRPC_TSAN_ENABLED
+#endif
+#endif
+
/* GRPC_ALLOW_EXCEPTIONS should be 0 or 1 if exceptions are allowed or not */
#ifndef GRPC_ALLOW_EXCEPTIONS
/* If not already set, set to 1 on Windows (style guide standard) but to
diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h
index 365feb4eb9..2343c1149c 100644
--- a/include/grpcpp/alarm.h
+++ b/include/grpcpp/alarm.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015 gRPC authors.
+ * Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,99 +16,14 @@
*
*/
-/// An Alarm posts the user provided tag to its associated completion queue upon
-/// expiry or cancellation.
#ifndef GRPCPP_ALARM_H
#define GRPCPP_ALARM_H
-#include <functional>
-
-#include <grpc/grpc.h>
-#include <grpcpp/impl/codegen/completion_queue.h>
-#include <grpcpp/impl/codegen/completion_queue_tag.h>
-#include <grpcpp/impl/codegen/grpc_library.h>
-#include <grpcpp/impl/codegen/time.h>
-#include <grpcpp/impl/grpc_library.h>
+#include <grpcpp/alarm_impl.h>
namespace grpc {
-/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h).
-class Alarm : private GrpcLibraryCodegen {
- public:
- /// Create an unset completion queue alarm
- Alarm();
-
- /// Destroy the given completion queue alarm, cancelling it in the process.
- ~Alarm();
-
- /// DEPRECATED: Create and set a completion queue alarm instance associated to
- /// \a cq.
- /// This form is deprecated because it is inherently racy.
- /// \internal We rely on the presence of \a cq for grpc initialization. If \a
- /// cq were ever to be removed, a reference to a static
- /// internal::GrpcLibraryInitializer instance would need to be introduced
- /// here. \endinternal.
- template <typename T>
- Alarm(CompletionQueue* cq, const T& deadline, void* tag) : Alarm() {
- SetInternal(cq, TimePoint<T>(deadline).raw_time(), tag);
- }
-
- /// Trigger an alarm instance on completion queue \a cq at the specified time.
- /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel),
- /// an event with tag \a tag will be added to \a cq. If the alarm expired, the
- /// event's success bit will be true, false otherwise (ie, upon cancellation).
- template <typename T>
- void Set(CompletionQueue* cq, const T& deadline, void* tag) {
- SetInternal(cq, TimePoint<T>(deadline).raw_time(), tag);
- }
-
- /// Alarms aren't copyable.
- Alarm(const Alarm&) = delete;
- Alarm& operator=(const Alarm&) = delete;
-
- /// Alarms are movable.
- Alarm(Alarm&& rhs) : alarm_(rhs.alarm_) { rhs.alarm_ = nullptr; }
- Alarm& operator=(Alarm&& rhs) {
- alarm_ = rhs.alarm_;
- rhs.alarm_ = nullptr;
- return *this;
- }
-
- /// Cancel a completion queue alarm. Calling this function over an alarm that
- /// has already fired has no effect.
- void Cancel();
-
- /// NOTE: class experimental_type is not part of the public API of this class
- /// TODO(vjpai): Move these contents to the public API of Alarm when
- /// they are no longer experimental
- class experimental_type {
- public:
- explicit experimental_type(Alarm* alarm) : alarm_(alarm) {}
-
- /// Set an alarm to invoke callback \a f. The argument to the callback
- /// states whether the alarm expired at \a deadline (true) or was cancelled
- /// (false)
- template <typename T>
- void Set(const T& deadline, std::function<void(bool)> f) {
- alarm_->SetInternal(TimePoint<T>(deadline).raw_time(), std::move(f));
- }
-
- private:
- Alarm* alarm_;
- };
-
- /// NOTE: The function experimental() is not stable public API. It is a view
- /// to the experimental components of this class. It may be changed or removed
- /// at any time.
- experimental_type experimental() { return experimental_type(this); }
-
- private:
- void SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag);
- void SetInternal(gpr_timespec deadline, std::function<void(bool)> f);
-
- internal::CompletionQueueTag* alarm_;
-};
-
-} // namespace grpc
+typedef ::grpc_impl::Alarm Alarm;
+}
#endif // GRPCPP_ALARM_H
diff --git a/include/grpcpp/alarm_impl.h b/include/grpcpp/alarm_impl.h
new file mode 100644
index 0000000000..7844e7c886
--- /dev/null
+++ b/include/grpcpp/alarm_impl.h
@@ -0,0 +1,116 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/// An Alarm posts the user provided tag to its associated completion queue upon
+/// expiry or cancellation.
+#ifndef GRPCPP_ALARM_IMPL_H
+#define GRPCPP_ALARM_IMPL_H
+
+#include <functional>
+
+#include <grpc/grpc.h>
+#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/impl/codegen/completion_queue_tag.h>
+#include <grpcpp/impl/codegen/grpc_library.h>
+#include <grpcpp/impl/codegen/time.h>
+#include <grpcpp/impl/grpc_library.h>
+
+namespace grpc_impl {
+
+/// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h).
+class Alarm : private ::grpc::GrpcLibraryCodegen {
+ public:
+ /// Create an unset completion queue alarm
+ Alarm();
+
+ /// Destroy the given completion queue alarm, cancelling it in the process.
+ ~Alarm();
+
+ /// DEPRECATED: Create and set a completion queue alarm instance associated to
+ /// \a cq.
+ /// This form is deprecated because it is inherently racy.
+ /// \internal We rely on the presence of \a cq for grpc initialization. If \a
+ /// cq were ever to be removed, a reference to a static
+ /// internal::GrpcLibraryInitializer instance would need to be introduced
+ /// here. \endinternal.
+ template <typename T>
+ Alarm(::grpc::CompletionQueue* cq, const T& deadline, void* tag) : Alarm() {
+ SetInternal(cq, ::grpc::TimePoint<T>(deadline).raw_time(), tag);
+ }
+
+ /// Trigger an alarm instance on completion queue \a cq at the specified time.
+ /// Once the alarm expires (at \a deadline) or it's cancelled (see \a Cancel),
+ /// an event with tag \a tag will be added to \a cq. If the alarm expired, the
+ /// event's success bit will be true, false otherwise (ie, upon cancellation).
+ template <typename T>
+ void Set(::grpc::CompletionQueue* cq, const T& deadline, void* tag) {
+ SetInternal(cq, ::grpc::TimePoint<T>(deadline).raw_time(), tag);
+ }
+
+ /// Alarms aren't copyable.
+ Alarm(const Alarm&) = delete;
+ Alarm& operator=(const Alarm&) = delete;
+
+ /// Alarms are movable.
+ Alarm(Alarm&& rhs) : alarm_(rhs.alarm_) { rhs.alarm_ = nullptr; }
+ Alarm& operator=(Alarm&& rhs) {
+ alarm_ = rhs.alarm_;
+ rhs.alarm_ = nullptr;
+ return *this;
+ }
+
+ /// Cancel a completion queue alarm. Calling this function over an alarm that
+ /// has already fired has no effect.
+ void Cancel();
+
+ /// NOTE: class experimental_type is not part of the public API of this class
+ /// TODO(vjpai): Move these contents to the public API of Alarm when
+ /// they are no longer experimental
+ class experimental_type {
+ public:
+ explicit experimental_type(Alarm* alarm) : alarm_(alarm) {}
+
+ /// Set an alarm to invoke callback \a f. The argument to the callback
+ /// states whether the alarm expired at \a deadline (true) or was cancelled
+ /// (false)
+ template <typename T>
+ void Set(const T& deadline, std::function<void(bool)> f) {
+ alarm_->SetInternal(::grpc::TimePoint<T>(deadline).raw_time(),
+ std::move(f));
+ }
+
+ private:
+ Alarm* alarm_;
+ };
+
+ /// NOTE: The function experimental() is not stable public API. It is a view
+ /// to the experimental components of this class. It may be changed or removed
+ /// at any time.
+ experimental_type experimental() { return experimental_type(this); }
+
+ private:
+ void SetInternal(::grpc::CompletionQueue* cq, gpr_timespec deadline,
+ void* tag);
+ void SetInternal(gpr_timespec deadline, std::function<void(bool)> f);
+
+ ::grpc::internal::CompletionQueueTag* alarm_;
+};
+
+} // namespace grpc_impl
+
+#endif // GRPCPP_ALARM_IMPL_H
diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h
index 4502b94b17..ee83396069 100644
--- a/include/grpcpp/channel.h
+++ b/include/grpcpp/channel.h
@@ -65,13 +65,13 @@ class Channel final : public ChannelInterface,
friend void experimental::ChannelResetConnectionBackoff(Channel* channel);
friend std::shared_ptr<Channel> CreateChannelInternal(
const grpc::string& host, grpc_channel* c_channel,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
friend class internal::InterceptedChannel;
Channel(const grpc::string& host, grpc_channel* c_channel,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
internal::Call CreateCall(const internal::RpcMethod& method,
diff --git a/include/grpcpp/create_channel.h b/include/grpcpp/create_channel.h
index 43188d09e7..e8a2a70581 100644
--- a/include/grpcpp/create_channel.h
+++ b/include/grpcpp/create_channel.h
@@ -70,8 +70,8 @@ std::shared_ptr<Channel> CreateCustomChannelWithInterceptors(
const grpc::string& target,
const std::shared_ptr<ChannelCredentials>& creds,
const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
} // namespace experimental
} // namespace grpc
diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h
index d509d9a520..eb014184e4 100644
--- a/include/grpcpp/generic/generic_stub.h
+++ b/include/grpcpp/generic/generic_stub.h
@@ -24,6 +24,7 @@
#include <grpcpp/support/async_stream.h>
#include <grpcpp/support/async_unary_call.h>
#include <grpcpp/support/byte_buffer.h>
+#include <grpcpp/support/client_callback.h>
#include <grpcpp/support/status.h>
namespace grpc {
@@ -76,6 +77,10 @@ class GenericStub final {
const ByteBuffer* request, ByteBuffer* response,
std::function<void(Status)> on_completion);
+ void PrepareBidiStreamingCall(
+ ClientContext* context, const grpc::string& method,
+ experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor);
+
private:
GenericStub* stub_;
};
diff --git a/include/grpcpp/health_check_service_interface.h b/include/grpcpp/health_check_service_interface.h
index b45a699bda..dfd4c3983a 100644
--- a/include/grpcpp/health_check_service_interface.h
+++ b/include/grpcpp/health_check_service_interface.h
@@ -37,6 +37,10 @@ class HealthCheckServiceInterface {
bool serving) = 0;
/// Apply to all registered service names.
virtual void SetServingStatus(bool serving) = 0;
+
+ /// Set all registered service names to not serving and prevent future
+ /// state changes.
+ virtual void Shutdown() {}
};
/// Enable/disable the default health checking service. This applies to all C++
diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h
index abba5549b8..a77e36dfc5 100644
--- a/include/grpcpp/impl/codegen/byte_buffer.h
+++ b/include/grpcpp/impl/codegen/byte_buffer.h
@@ -45,8 +45,10 @@ template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
-template <class ServiceType, class RequestType, class ResponseType>
+template <class RequestType, class ResponseType>
class CallbackUnaryHandler;
+template <class RequestType, class ResponseType>
+class CallbackServerStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
template <class R>
@@ -91,7 +93,9 @@ class ByteBuffer final {
}
/// Constuct a byte buffer by referencing elements of existing buffer
- /// \a buf. Wrapper of core function grpc_byte_buffer_copy
+ /// \a buf. Wrapper of core function grpc_byte_buffer_copy . This is not
+ /// a deep copy; it is just a referencing. As a result, its performance is
+ /// size-independent.
ByteBuffer(const ByteBuffer& buf);
~ByteBuffer() {
@@ -100,6 +104,9 @@ class ByteBuffer final {
}
}
+ /// Wrapper of core function grpc_byte_buffer_copy . This is not
+ /// a deep copy; it is just a referencing. As a result, its performance is
+ /// size-independent.
ByteBuffer& operator=(const ByteBuffer&);
/// Dump (read) the buffer contents into \a slices.
@@ -115,7 +122,9 @@ class ByteBuffer final {
/// Make a duplicate copy of the internals of this byte
/// buffer so that we have our own owned version of it.
- /// bbuf.Duplicate(); is equivalent to bbuf=bbuf; but is actually readable
+ /// bbuf.Duplicate(); is equivalent to bbuf=bbuf; but is actually readable.
+ /// This is not a deep copy; it is a referencing and its performance
+ /// is size-independent.
void Duplicate() {
buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buffer_);
}
@@ -156,8 +165,10 @@ class ByteBuffer final {
friend class internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class internal::ServerStreamingHandler;
- template <class ServiceType, class RequestType, class ResponseType>
+ template <class RequestType, class ResponseType>
friend class internal::CallbackUnaryHandler;
+ template <class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackServerStreamingHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
template <class R>
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h
index ac3ba17bd9..3db9f48bff 100644
--- a/include/grpcpp/impl/codegen/call_op_set.h
+++ b/include/grpcpp/impl/codegen/call_op_set.h
@@ -343,6 +343,9 @@ class CallOpSendMessage {
// We had already registered failed_send_ earlier. No need to do it again.
}
send_buf_.Clear();
+ // The contents of the SendMessage value that was previously set
+ // has had its references stolen by core's operations
+ interceptor_methods->SetSendMessage(nullptr);
}
void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h
index 51367cf550..a3c8c41246 100644
--- a/include/grpcpp/impl/codegen/callback_common.h
+++ b/include/grpcpp/impl/codegen/callback_common.h
@@ -32,6 +32,8 @@ namespace grpc {
namespace internal {
/// An exception-safe way of invoking a user-specified callback function
+// TODO(vjpai): decide whether it is better for this to take a const lvalue
+// parameter or an rvalue parameter, or if it even matters
template <class Func, class... Args>
void CatchingCallback(Func&& func, Args&&... args) {
#if GRPC_ALLOW_EXCEPTIONS
@@ -45,6 +47,20 @@ void CatchingCallback(Func&& func, Args&&... args) {
#endif // GRPC_ALLOW_EXCEPTIONS
}
+template <class ReturnType, class Func, class... Args>
+ReturnType* CatchingReactorCreator(Func&& func, Args&&... args) {
+#if GRPC_ALLOW_EXCEPTIONS
+ try {
+ return func(std::forward<Args>(args)...);
+ } catch (...) {
+ // fail the RPC, don't crash the library
+ return nullptr;
+ }
+#else // GRPC_ALLOW_EXCEPTIONS
+ return func(std::forward<Args>(args)...);
+#endif // GRPC_ALLOW_EXCEPTIONS
+}
+
// The contract on these tags is that they are single-shot. They must be
// constructed and then fired at exactly one point. There is no expectation
// that they can be reused without reconstruction.
@@ -145,18 +161,19 @@ class CallbackWithSuccessTag
// or on a tag that has been Set before unless the tag has been cleared.
void Set(grpc_call* call, std::function<void(bool)> f,
CompletionQueueTag* ops) {
+ GPR_CODEGEN_ASSERT(call_ == nullptr);
+ g_core_codegen_interface->grpc_call_ref(call);
call_ = call;
func_ = std::move(f);
ops_ = ops;
- g_core_codegen_interface->grpc_call_ref(call);
functor_run = &CallbackWithSuccessTag::StaticRun;
}
void Clear() {
if (call_ != nullptr) {
- func_ = nullptr;
grpc_call* call = call_;
call_ = nullptr;
+ func_ = nullptr;
g_core_codegen_interface->grpc_call_unref(call);
}
}
@@ -182,11 +199,11 @@ class CallbackWithSuccessTag
}
void Run(bool ok) {
void* ignored = ops_;
- bool new_ok = ok;
// Allow a "false" return value from FinalizeResult to silence the
// callback, just as it silences a CQ tag in the async cases
- bool do_callback = ops_->FinalizeResult(&ignored, &new_ok);
- GPR_CODEGEN_ASSERT(ignored == ops_);
+ auto* ops = ops_;
+ bool do_callback = ops_->FinalizeResult(&ignored, &ok);
+ GPR_CODEGEN_ASSERT(ignored == ops);
if (do_callback) {
CatchingCallback(func_, ok);
diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h
index 6ec1ffb8c7..5353f5feaa 100644
--- a/include/grpcpp/impl/codegen/channel_interface.h
+++ b/include/grpcpp/impl/codegen/channel_interface.h
@@ -21,7 +21,6 @@
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpcpp/impl/codegen/call.h>
-#include <grpcpp/impl/codegen/client_context.h>
#include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/time.h>
@@ -53,6 +52,12 @@ template <class W, class R>
class ClientAsyncReaderWriterFactory;
template <class R>
class ClientAsyncResponseReaderFactory;
+template <class W, class R>
+class ClientCallbackReaderWriterFactory;
+template <class R>
+class ClientCallbackReaderFactory;
+template <class W>
+class ClientCallbackWriterFactory;
class InterceptedChannel;
} // namespace internal
@@ -106,6 +111,12 @@ class ChannelInterface {
friend class ::grpc::internal::ClientAsyncReaderWriterFactory;
template <class R>
friend class ::grpc::internal::ClientAsyncResponseReaderFactory;
+ template <class W, class R>
+ friend class ::grpc::internal::ClientCallbackReaderWriterFactory;
+ template <class R>
+ friend class ::grpc::internal::ClientCallbackReaderFactory;
+ template <class W>
+ friend class ::grpc::internal::ClientCallbackWriterFactory;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
index 4baa819091..66cf9b7754 100644
--- a/include/grpcpp/impl/codegen/client_callback.h
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -22,6 +22,7 @@
#include <functional>
#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/call_op_set.h>
#include <grpcpp/impl/codegen/callback_common.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/config.h>
@@ -88,6 +89,660 @@ class CallbackUnaryCallImpl {
call.PerformOps(ops);
}
};
+} // namespace internal
+
+namespace experimental {
+
+// Forward declarations
+template <class Request, class Response>
+class ClientBidiReactor;
+template <class Response>
+class ClientReadReactor;
+template <class Request>
+class ClientWriteReactor;
+
+// NOTE: The streaming objects are not actually implemented in the public API.
+// These interfaces are provided for mocking only. Typical applications
+// will interact exclusively with the reactors that they define.
+template <class Request, class Response>
+class ClientCallbackReaderWriter {
+ public:
+ virtual ~ClientCallbackReaderWriter() {}
+ virtual void StartCall() = 0;
+ virtual void Write(const Request* req, WriteOptions options) = 0;
+ virtual void WritesDone() = 0;
+ virtual void Read(Response* resp) = 0;
+
+ protected:
+ void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
+ reactor->BindStream(this);
+ }
+};
+
+template <class Response>
+class ClientCallbackReader {
+ public:
+ virtual ~ClientCallbackReader() {}
+ virtual void StartCall() = 0;
+ virtual void Read(Response* resp) = 0;
+
+ protected:
+ void BindReactor(ClientReadReactor<Response>* reactor) {
+ reactor->BindReader(this);
+ }
+};
+
+template <class Request>
+class ClientCallbackWriter {
+ public:
+ virtual ~ClientCallbackWriter() {}
+ virtual void StartCall() = 0;
+ void Write(const Request* req) { Write(req, WriteOptions()); }
+ virtual void Write(const Request* req, WriteOptions options) = 0;
+ void WriteLast(const Request* req, WriteOptions options) {
+ Write(req, options.set_last_message());
+ }
+ virtual void WritesDone() = 0;
+
+ protected:
+ void BindReactor(ClientWriteReactor<Request>* reactor) {
+ reactor->BindWriter(this);
+ }
+};
+
+// The user must implement this reactor interface with reactions to each event
+// type that gets called by the library. An empty reaction is provided by
+// default
+template <class Request, class Response>
+class ClientBidiReactor {
+ public:
+ virtual ~ClientBidiReactor() {}
+ virtual void OnDone(const Status& s) {}
+ virtual void OnReadInitialMetadataDone(bool ok) {}
+ virtual void OnReadDone(bool ok) {}
+ virtual void OnWriteDone(bool ok) {}
+ virtual void OnWritesDoneDone(bool ok) {}
+
+ void StartCall() { stream_->StartCall(); }
+ void StartRead(Response* resp) { stream_->Read(resp); }
+ void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
+ void StartWrite(const Request* req, WriteOptions options) {
+ stream_->Write(req, std::move(options));
+ }
+ void StartWriteLast(const Request* req, WriteOptions options) {
+ StartWrite(req, std::move(options.set_last_message()));
+ }
+ void StartWritesDone() { stream_->WritesDone(); }
+
+ private:
+ friend class ClientCallbackReaderWriter<Request, Response>;
+ void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
+ stream_ = stream;
+ }
+ ClientCallbackReaderWriter<Request, Response>* stream_;
+};
+
+template <class Response>
+class ClientReadReactor {
+ public:
+ virtual ~ClientReadReactor() {}
+ virtual void OnDone(const Status& s) {}
+ virtual void OnReadInitialMetadataDone(bool ok) {}
+ virtual void OnReadDone(bool ok) {}
+
+ void StartCall() { reader_->StartCall(); }
+ void StartRead(Response* resp) { reader_->Read(resp); }
+
+ private:
+ friend class ClientCallbackReader<Response>;
+ void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
+ ClientCallbackReader<Response>* reader_;
+};
+
+template <class Request>
+class ClientWriteReactor {
+ public:
+ virtual ~ClientWriteReactor() {}
+ virtual void OnDone(const Status& s) {}
+ virtual void OnReadInitialMetadataDone(bool ok) {}
+ virtual void OnWriteDone(bool ok) {}
+ virtual void OnWritesDoneDone(bool ok) {}
+
+ void StartCall() { writer_->StartCall(); }
+ void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
+ void StartWrite(const Request* req, WriteOptions options) {
+ writer_->Write(req, std::move(options));
+ }
+ void StartWriteLast(const Request* req, WriteOptions options) {
+ StartWrite(req, std::move(options.set_last_message()));
+ }
+ void StartWritesDone() { writer_->WritesDone(); }
+
+ private:
+ friend class ClientCallbackWriter<Request>;
+ void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
+ ClientCallbackWriter<Request>* writer_;
+};
+
+} // namespace experimental
+
+namespace internal {
+
+// Forward declare factory classes for friendship
+template <class Request, class Response>
+class ClientCallbackReaderWriterFactory;
+template <class Response>
+class ClientCallbackReaderFactory;
+template <class Request>
+class ClientCallbackWriterFactory;
+
+template <class Request, class Response>
+class ClientCallbackReaderWriterImpl
+ : public ::grpc::experimental::ClientCallbackReaderWriter<Request,
+ Response> {
+ public:
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientCallbackReaderWriterImpl));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ void MaybeFinish() {
+ if (--callbacks_outstanding_ == 0) {
+ Status s = std::move(finish_status_);
+ auto* reactor = reactor_;
+ auto* call = call_.call();
+ this->~ClientCallbackReaderWriterImpl();
+ g_core_codegen_interface->grpc_call_unref(call);
+ reactor->OnDone(s);
+ }
+ }
+
+ void StartCall() override {
+ // This call initiates two batches, plus any backlog, each with a callback
+ // 1. Send initial metadata (unless corked) + recv initial metadata
+ // 2. Any read backlog
+ // 3. Recv trailing metadata, on_completion callback
+ // 4. Any write backlog
+ // 5. See if the call can finish (if other callbacks were triggered already)
+ started_ = true;
+
+ start_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
+ if (!start_corked_) {
+ start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ }
+ start_ops_.RecvInitialMetadata(context_);
+ start_ops_.set_core_cq_tag(&start_tag_);
+ call_.PerformOps(&start_ops_);
+
+ // Also set up the read and write tags so that they don't have to be set up
+ // each time
+ write_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeFinish();
+ },
+ &write_ops_);
+ write_ops_.set_core_cq_tag(&write_tag_);
+
+ read_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeFinish();
+ },
+ &read_ops_);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ if (read_ops_at_start_) {
+ call_.PerformOps(&read_ops_);
+ }
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ if (write_ops_at_start_) {
+ call_.PerformOps(&write_ops_);
+ }
+
+ if (writes_done_ops_at_start_) {
+ call_.PerformOps(&writes_done_ops_);
+ }
+ MaybeFinish();
+ }
+
+ void Read(Response* msg) override {
+ read_ops_.RecvMessage(msg);
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&read_ops_);
+ } else {
+ read_ops_at_start_ = true;
+ }
+ }
+
+ void Write(const Request* msg, WriteOptions options) override {
+ if (start_corked_) {
+ write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_corked_ = false;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok());
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&write_ops_);
+ } else {
+ write_ops_at_start_ = true;
+ }
+ }
+ void WritesDone() override {
+ if (start_corked_) {
+ writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_corked_ = false;
+ }
+ writes_done_ops_.ClientSendClose();
+ writes_done_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWritesDoneDone(ok);
+ MaybeFinish();
+ },
+ &writes_done_ops_);
+ writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&writes_done_ops_);
+ } else {
+ writes_done_ops_at_start_ = true;
+ }
+ }
+
+ private:
+ friend class ClientCallbackReaderWriterFactory<Request, Response>;
+
+ ClientCallbackReaderWriterImpl(
+ Call call, ClientContext* context,
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor)
+ : context_(context),
+ call_(call),
+ reactor_(reactor),
+ start_corked_(context_->initial_metadata_corked_) {
+ this->BindReactor(reactor);
+ }
+
+ ClientContext* context_;
+ Call call_;
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
+ CallbackWithSuccessTag start_tag_;
+ bool start_corked_;
+
+ CallOpSet<CallOpClientRecvStatus> finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ Status finish_status_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
+ CallbackWithSuccessTag write_tag_;
+ bool write_ops_at_start_{false};
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_;
+ CallbackWithSuccessTag writes_done_tag_;
+ bool writes_done_ops_at_start_{false};
+
+ CallOpSet<CallOpRecvMessage<Response>> read_ops_;
+ CallbackWithSuccessTag read_tag_;
+ bool read_ops_at_start_{false};
+
+ // Minimum of 3 callbacks to pre-register for StartCall, start, and finish
+ std::atomic_int callbacks_outstanding_{3};
+ bool started_{false};
+};
+
+template <class Request, class Response>
+class ClientCallbackReaderWriterFactory {
+ public:
+ static void Create(
+ ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
+ ClientContext* context,
+ ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor) {
+ Call call = channel->CreateCall(method, context, channel->CallbackCQ());
+
+ g_core_codegen_interface->grpc_call_ref(call.call());
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
+ ClientCallbackReaderWriterImpl<Request, Response>(call, context,
+ reactor);
+ }
+};
+
+template <class Response>
+class ClientCallbackReaderImpl
+ : public ::grpc::experimental::ClientCallbackReader<Response> {
+ public:
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientCallbackReaderImpl));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ void MaybeFinish() {
+ if (--callbacks_outstanding_ == 0) {
+ Status s = std::move(finish_status_);
+ auto* reactor = reactor_;
+ auto* call = call_.call();
+ this->~ClientCallbackReaderImpl();
+ g_core_codegen_interface->grpc_call_unref(call);
+ reactor->OnDone(s);
+ }
+ }
+
+ void StartCall() override {
+ // This call initiates two batches, plus any backlog, each with a callback
+ // 1. Send initial metadata (unless corked) + recv initial metadata
+ // 2. Any backlog
+ // 3. Recv trailing metadata, on_completion callback
+ // 4. See if the call can finish (if other callbacks were triggered already)
+ started_ = true;
+
+ start_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
+ start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_ops_.RecvInitialMetadata(context_);
+ start_ops_.set_core_cq_tag(&start_tag_);
+ call_.PerformOps(&start_ops_);
+
+ // Also set up the read tag so it doesn't have to be set up each time
+ read_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeFinish();
+ },
+ &read_ops_);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ if (read_ops_at_start_) {
+ call_.PerformOps(&read_ops_);
+ }
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ MaybeFinish();
+ }
+
+ void Read(Response* msg) override {
+ read_ops_.RecvMessage(msg);
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&read_ops_);
+ } else {
+ read_ops_at_start_ = true;
+ }
+ }
+
+ private:
+ friend class ClientCallbackReaderFactory<Response>;
+
+ template <class Request>
+ ClientCallbackReaderImpl(
+ Call call, ClientContext* context, Request* request,
+ ::grpc::experimental::ClientReadReactor<Response>* reactor)
+ : context_(context), call_(call), reactor_(reactor) {
+ this->BindReactor(reactor);
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok());
+ start_ops_.ClientSendClose();
+ }
+
+ ClientContext* context_;
+ Call call_;
+ ::grpc::experimental::ClientReadReactor<Response>* reactor_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose,
+ CallOpRecvInitialMetadata>
+ start_ops_;
+ CallbackWithSuccessTag start_tag_;
+
+ CallOpSet<CallOpClientRecvStatus> finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ Status finish_status_;
+
+ CallOpSet<CallOpRecvMessage<Response>> read_ops_;
+ CallbackWithSuccessTag read_tag_;
+ bool read_ops_at_start_{false};
+
+ // Minimum of 3 callbacks to pre-register for StartCall, start, and finish
+ std::atomic_int callbacks_outstanding_{3};
+ bool started_{false};
+};
+
+template <class Response>
+class ClientCallbackReaderFactory {
+ public:
+ template <class Request>
+ static void Create(
+ ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const Request* request,
+ ::grpc::experimental::ClientReadReactor<Response>* reactor) {
+ Call call = channel->CreateCall(method, context, channel->CallbackCQ());
+
+ g_core_codegen_interface->grpc_call_ref(call.call());
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
+ ClientCallbackReaderImpl<Response>(call, context, request, reactor);
+ }
+};
+
+template <class Request>
+class ClientCallbackWriterImpl
+ : public ::grpc::experimental::ClientCallbackWriter<Request> {
+ public:
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientCallbackWriterImpl));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ void MaybeFinish() {
+ if (--callbacks_outstanding_ == 0) {
+ Status s = std::move(finish_status_);
+ auto* reactor = reactor_;
+ auto* call = call_.call();
+ this->~ClientCallbackWriterImpl();
+ g_core_codegen_interface->grpc_call_unref(call);
+ reactor->OnDone(s);
+ }
+ }
+
+ void StartCall() override {
+ // This call initiates two batches, plus any backlog, each with a callback
+ // 1. Send initial metadata (unless corked) + recv initial metadata
+ // 2. Recv trailing metadata, on_completion callback
+ // 3. Any backlog
+ // 4. See if the call can finish (if other callbacks were triggered already)
+ started_ = true;
+
+ start_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadInitialMetadataDone(ok);
+ MaybeFinish();
+ },
+ &start_ops_);
+ if (!start_corked_) {
+ start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ }
+ start_ops_.RecvInitialMetadata(context_);
+ start_ops_.set_core_cq_tag(&start_tag_);
+ call_.PerformOps(&start_ops_);
+
+ // Also set up the read and write tags so that they don't have to be set up
+ // each time
+ write_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeFinish();
+ },
+ &write_ops_);
+ write_ops_.set_core_cq_tag(&write_tag_);
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ if (write_ops_at_start_) {
+ call_.PerformOps(&write_ops_);
+ }
+
+ if (writes_done_ops_at_start_) {
+ call_.PerformOps(&writes_done_ops_);
+ }
+
+ MaybeFinish();
+ }
+
+ void Write(const Request* msg, WriteOptions options) override {
+ if (start_corked_) {
+ write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_corked_ = false;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok());
+
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ write_ops_.ClientSendClose();
+ }
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&write_ops_);
+ } else {
+ write_ops_at_start_ = true;
+ }
+ }
+ void WritesDone() override {
+ if (start_corked_) {
+ writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
+ context_->initial_metadata_flags());
+ start_corked_ = false;
+ }
+ writes_done_ops_.ClientSendClose();
+ writes_done_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWritesDoneDone(ok);
+ MaybeFinish();
+ },
+ &writes_done_ops_);
+ writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
+ callbacks_outstanding_++;
+ if (started_) {
+ call_.PerformOps(&writes_done_ops_);
+ } else {
+ writes_done_ops_at_start_ = true;
+ }
+ }
+
+ private:
+ friend class ClientCallbackWriterFactory<Request>;
+
+ template <class Response>
+ ClientCallbackWriterImpl(
+ Call call, ClientContext* context, Response* response,
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor)
+ : context_(context),
+ call_(call),
+ reactor_(reactor),
+ start_corked_(context_->initial_metadata_corked_) {
+ this->BindReactor(reactor);
+ finish_ops_.RecvMessage(response);
+ finish_ops_.AllowNoMessage();
+ }
+
+ ClientContext* context_;
+ Call call_;
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
+ CallbackWithSuccessTag start_tag_;
+ bool start_corked_;
+
+ CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ Status finish_status_;
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ write_ops_;
+ CallbackWithSuccessTag write_tag_;
+ bool write_ops_at_start_{false};
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_;
+ CallbackWithSuccessTag writes_done_tag_;
+ bool writes_done_ops_at_start_{false};
+
+ // Minimum of 3 callbacks to pre-register for StartCall, start, and finish
+ std::atomic_int callbacks_outstanding_{3};
+ bool started_{false};
+};
+
+template <class Request>
+class ClientCallbackWriterFactory {
+ public:
+ template <class Response>
+ static void Create(
+ ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, Response* response,
+ ::grpc::experimental::ClientWriteReactor<Request>* reactor) {
+ Call call = channel->CreateCall(method, context, channel->CallbackCQ());
+
+ g_core_codegen_interface->grpc_call_ref(call.call());
+ new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
+ ClientCallbackWriterImpl<Request>(call, context, response, reactor);
+ }
+};
} // namespace internal
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h
index 75b955e760..0a71f3d9b6 100644
--- a/include/grpcpp/impl/codegen/client_context.h
+++ b/include/grpcpp/impl/codegen/client_context.h
@@ -46,6 +46,7 @@
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/create_auth_context.h>
#include <grpcpp/impl/codegen/metadata_map.h>
+#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/security/auth_context.h>
#include <grpcpp/impl/codegen/slice.h>
#include <grpcpp/impl/codegen/status.h>
@@ -71,6 +72,12 @@ template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
class CallbackUnaryCallImpl;
+template <class Request, class Response>
+class ClientCallbackReaderWriterImpl;
+template <class Response>
+class ClientCallbackReaderImpl;
+template <class Request>
+class ClientCallbackWriterImpl;
} // namespace internal
template <class R>
@@ -162,6 +169,8 @@ class InteropClientContextInspector;
/// (see \a grpc::CreateCustomChannel).
///
/// \warning ClientContext instances should \em not be reused across rpcs.
+/// \warning The ClientContext instance used for creating an rpc must remain
+/// alive and valid for the lifetime of the rpc.
class ClientContext {
public:
ClientContext();
@@ -394,6 +403,12 @@ class ClientContext {
friend class ::grpc::internal::BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::CallbackUnaryCallImpl;
+ template <class Request, class Response>
+ friend class ::grpc::internal::ClientCallbackReaderWriterImpl;
+ template <class Response>
+ friend class ::grpc::internal::ClientCallbackReaderImpl;
+ template <class Request>
+ friend class ::grpc::internal::ClientCallbackWriterImpl;
// Used by friend class CallOpClientRecvStatus
void set_debug_error_string(const grpc::string& debug_error_string) {
@@ -404,12 +419,13 @@ class ClientContext {
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);
experimental::ClientRpcInfo* set_client_rpc_info(
- const char* method, grpc::ChannelInterface* channel,
+ const char* method, internal::RpcMethod::RpcType type,
+ grpc::ChannelInterface* channel,
const std::vector<
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>&
creators,
size_t interceptor_pos) {
- rpc_info_ = experimental::ClientRpcInfo(this, method, channel);
+ rpc_info_ = experimental::ClientRpcInfo(this, type, method, channel);
rpc_info_.RegisterInterceptors(creators, interceptor_pos);
return &rpc_info_;
}
diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h
index f69c99ab22..2bae11a251 100644
--- a/include/grpcpp/impl/codegen/client_interceptor.h
+++ b/include/grpcpp/impl/codegen/client_interceptor.h
@@ -23,6 +23,7 @@
#include <vector>
#include <grpcpp/impl/codegen/interceptor.h>
+#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/string_ref.h>
namespace grpc {
@@ -52,23 +53,56 @@ extern experimental::ClientInterceptorFactoryInterface*
namespace experimental {
class ClientRpcInfo {
public:
- ClientRpcInfo() {}
+ // TODO(yashykt): Stop default-constructing ClientRpcInfo and remove UNKNOWN
+ // from the list of possible Types.
+ enum class Type {
+ UNARY,
+ CLIENT_STREAMING,
+ SERVER_STREAMING,
+ BIDI_STREAMING,
+ UNKNOWN // UNKNOWN is not API and will be removed later
+ };
~ClientRpcInfo(){};
ClientRpcInfo(const ClientRpcInfo&) = delete;
ClientRpcInfo(ClientRpcInfo&&) = default;
- ClientRpcInfo& operator=(ClientRpcInfo&&) = default;
// Getter methods
- const char* method() { return method_; }
+ const char* method() const { return method_; }
ChannelInterface* channel() { return channel_; }
grpc::ClientContext* client_context() { return ctx_; }
+ Type type() const { return type_; }
private:
- ClientRpcInfo(grpc::ClientContext* ctx, const char* method,
- grpc::ChannelInterface* channel)
- : ctx_(ctx), method_(method), channel_(channel) {}
+ static_assert(Type::UNARY ==
+ static_cast<Type>(internal::RpcMethod::NORMAL_RPC),
+ "violated expectation about Type enum");
+ static_assert(Type::CLIENT_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING),
+ "violated expectation about Type enum");
+ static_assert(Type::SERVER_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::SERVER_STREAMING),
+ "violated expectation about Type enum");
+ static_assert(Type::BIDI_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::BIDI_STREAMING),
+ "violated expectation about Type enum");
+
+ // Default constructor should only be used by ClientContext
+ ClientRpcInfo() = default;
+
+ // Constructor will only be called from ClientContext
+ ClientRpcInfo(grpc::ClientContext* ctx, internal::RpcMethod::RpcType type,
+ const char* method, grpc::ChannelInterface* channel)
+ : ctx_(ctx),
+ type_(static_cast<Type>(type)),
+ method_(method),
+ channel_(channel) {}
+
+ // Move assignment should only be used by ClientContext
+ // TODO(yashykt): Delete move assignment
+ ClientRpcInfo& operator=(ClientRpcInfo&&) = default;
+
// Runs interceptor at pos \a pos.
void RunInterceptor(
experimental::InterceptorBatchMethods* interceptor_methods, size_t pos) {
@@ -97,6 +131,8 @@ class ClientRpcInfo {
}
grpc::ClientContext* ctx_ = nullptr;
+ // TODO(yashykt): make type_ const once move-assignment is deleted
+ Type type_{Type::UNKNOWN};
const char* method_ = nullptr;
grpc::ChannelInterface* channel_ = nullptr;
std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_;
diff --git a/include/grpcpp/impl/codegen/client_unary_call.h b/include/grpcpp/impl/codegen/client_unary_call.h
index b1c80764f2..5151839412 100644
--- a/include/grpcpp/impl/codegen/client_unary_call.h
+++ b/include/grpcpp/impl/codegen/client_unary_call.h
@@ -69,13 +69,17 @@ class BlockingUnaryCallImpl {
ops.ClientSendClose();
ops.ClientRecvStatus(context, &status_);
call.PerformOps(&ops);
- if (cq.Pluck(&ops)) {
- if (!ops.got_message && status_.ok()) {
- status_ = Status(StatusCode::UNIMPLEMENTED,
- "No message returned for unary request");
- }
- } else {
- GPR_CODEGEN_ASSERT(!status_.ok());
+ cq.Pluck(&ops);
+ // Some of the ops might fail. If the ops fail in the core layer, status
+ // would reflect the error. But, if the ops fail in the C++ layer, the
+ // status would still be the same as the one returned by gRPC Core. This can
+ // happen if deserialization of the message fails.
+ // TODO(yashykt): If deserialization fails, but the status received is OK,
+ // then it might be a good idea to change the status to something better
+ // than StatusCode::UNIMPLEMENTED to reflect this.
+ if (!ops.got_message && status_.ok()) {
+ status_ = Status(StatusCode::UNIMPLEMENTED,
+ "No message returned for unary request");
}
}
Status status() { return status_; }
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index d603c7c700..fb38788f7d 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -307,8 +307,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
void* ignored = tag;
if (tag->FinalizeResult(&ignored, &ok)) {
GPR_CODEGEN_ASSERT(ignored == tag);
- // Ignore mutations by FinalizeResult: Pluck returns the C API status
- return ev.success != 0;
+ return ok;
}
}
}
diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h
index b866fc16dc..1854f6ef2f 100644
--- a/include/grpcpp/impl/codegen/server_callback.h
+++ b/include/grpcpp/impl/codegen/server_callback.h
@@ -19,7 +19,9 @@
#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
+#include <atomic>
#include <functional>
+#include <type_traits>
#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/call_op_set.h>
@@ -32,19 +34,33 @@
namespace grpc {
-// forward declarations
+// Declare base class of all reactors as internal
namespace internal {
-template <class ServiceType, class RequestType, class ResponseType>
-class CallbackUnaryHandler;
+
+class ServerReactor {
+ public:
+ virtual ~ServerReactor() = default;
+ virtual void OnDone() {}
+ virtual void OnCancel() {}
+};
+
} // namespace internal
namespace experimental {
+// Forward declarations
+template <class Request, class Response>
+class ServerReadReactor;
+template <class Request, class Response>
+class ServerWriteReactor;
+template <class Request, class Response>
+class ServerBidiReactor;
+
// For unary RPCs, the exposed controller class is only an interface
// and the actual implementation is an internal class.
class ServerCallbackRpcController {
public:
- virtual ~ServerCallbackRpcController() {}
+ virtual ~ServerCallbackRpcController() = default;
// The method handler must call this function when it is done so that
// the library knows to free its resources
@@ -55,18 +71,193 @@ class ServerCallbackRpcController {
virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
};
+// NOTE: The actual streaming object classes are provided
+// as API only to support mocking. There are no implementations of
+// these class interfaces in the API.
+template <class Request>
+class ServerCallbackReader {
+ public:
+ virtual ~ServerCallbackReader() {}
+ virtual void Finish(Status s) = 0;
+ virtual void SendInitialMetadata() = 0;
+ virtual void Read(Request* msg) = 0;
+
+ protected:
+ template <class Response>
+ void BindReactor(ServerReadReactor<Request, Response>* reactor) {
+ reactor->BindReader(this);
+ }
+};
+
+template <class Response>
+class ServerCallbackWriter {
+ public:
+ virtual ~ServerCallbackWriter() {}
+
+ virtual void Finish(Status s) = 0;
+ virtual void SendInitialMetadata() = 0;
+ virtual void Write(const Response* msg, WriteOptions options) = 0;
+ virtual void WriteAndFinish(const Response* msg, WriteOptions options,
+ Status s) {
+ // Default implementation that can/should be overridden
+ Write(msg, std::move(options));
+ Finish(std::move(s));
+ };
+
+ protected:
+ template <class Request>
+ void BindReactor(ServerWriteReactor<Request, Response>* reactor) {
+ reactor->BindWriter(this);
+ }
+};
+
+template <class Request, class Response>
+class ServerCallbackReaderWriter {
+ public:
+ virtual ~ServerCallbackReaderWriter() {}
+
+ virtual void Finish(Status s) = 0;
+ virtual void SendInitialMetadata() = 0;
+ virtual void Read(Request* msg) = 0;
+ virtual void Write(const Response* msg, WriteOptions options) = 0;
+ virtual void WriteAndFinish(const Response* msg, WriteOptions options,
+ Status s) {
+ // Default implementation that can/should be overridden
+ Write(msg, std::move(options));
+ Finish(std::move(s));
+ };
+
+ protected:
+ void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
+ reactor->BindStream(this);
+ }
+};
+
+// The following classes are reactors that are to be implemented
+// by the user, returned as the result of the method handler for
+// a callback method, and activated by the call to OnStarted
+template <class Request, class Response>
+class ServerBidiReactor : public internal::ServerReactor {
+ public:
+ ~ServerBidiReactor() = default;
+ virtual void OnStarted(ServerContext*) {}
+ virtual void OnSendInitialMetadataDone(bool ok) {}
+ virtual void OnReadDone(bool ok) {}
+ virtual void OnWriteDone(bool ok) {}
+
+ void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
+ void StartRead(Request* msg) { stream_->Read(msg); }
+ void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); }
+ void StartWrite(const Response* msg, WriteOptions options) {
+ stream_->Write(msg, std::move(options));
+ }
+ void StartWriteAndFinish(const Response* msg, WriteOptions options,
+ Status s) {
+ stream_->WriteAndFinish(msg, std::move(options), std::move(s));
+ }
+ void StartWriteLast(const Response* msg, WriteOptions options) {
+ StartWrite(msg, std::move(options.set_last_message()));
+ }
+ void Finish(Status s) { stream_->Finish(std::move(s)); }
+
+ private:
+ friend class ServerCallbackReaderWriter<Request, Response>;
+ void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
+ stream_ = stream;
+ }
+
+ ServerCallbackReaderWriter<Request, Response>* stream_;
+};
+
+template <class Request, class Response>
+class ServerReadReactor : public internal::ServerReactor {
+ public:
+ ~ServerReadReactor() = default;
+ virtual void OnStarted(ServerContext*, Response* resp) {}
+ virtual void OnSendInitialMetadataDone(bool ok) {}
+ virtual void OnReadDone(bool ok) {}
+
+ void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
+ void StartRead(Request* msg) { reader_->Read(msg); }
+ void Finish(Status s) { reader_->Finish(std::move(s)); }
+
+ private:
+ friend class ServerCallbackReader<Request>;
+ void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
+
+ ServerCallbackReader<Request>* reader_;
+};
+
+template <class Request, class Response>
+class ServerWriteReactor : public internal::ServerReactor {
+ public:
+ ~ServerWriteReactor() = default;
+ virtual void OnStarted(ServerContext*, const Request* req) {}
+ virtual void OnSendInitialMetadataDone(bool ok) {}
+ virtual void OnWriteDone(bool ok) {}
+
+ void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
+ void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); }
+ void StartWrite(const Response* msg, WriteOptions options) {
+ writer_->Write(msg, std::move(options));
+ }
+ void StartWriteAndFinish(const Response* msg, WriteOptions options,
+ Status s) {
+ writer_->WriteAndFinish(msg, std::move(options), std::move(s));
+ }
+ void StartWriteLast(const Response* msg, WriteOptions options) {
+ StartWrite(msg, std::move(options.set_last_message()));
+ }
+ void Finish(Status s) { writer_->Finish(std::move(s)); }
+
+ private:
+ friend class ServerCallbackWriter<Response>;
+ void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }
+
+ ServerCallbackWriter<Response>* writer_;
+};
+
} // namespace experimental
namespace internal {
-template <class ServiceType, class RequestType, class ResponseType>
+template <class Request, class Response>
+class UnimplementedReadReactor
+ : public experimental::ServerReadReactor<Request, Response> {
+ public:
+ void OnDone() override { delete this; }
+ void OnStarted(ServerContext*, Response*) override {
+ this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
+ }
+};
+
+template <class Request, class Response>
+class UnimplementedWriteReactor
+ : public experimental::ServerWriteReactor<Request, Response> {
+ public:
+ void OnDone() override { delete this; }
+ void OnStarted(ServerContext*, const Request*) override {
+ this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
+ }
+};
+
+template <class Request, class Response>
+class UnimplementedBidiReactor
+ : public experimental::ServerBidiReactor<Request, Response> {
+ public:
+ void OnDone() override { delete this; }
+ void OnStarted(ServerContext*) override {
+ this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
+ }
+};
+
+template <class RequestType, class ResponseType>
class CallbackUnaryHandler : public MethodHandler {
public:
CallbackUnaryHandler(
std::function<void(ServerContext*, const RequestType*, ResponseType*,
experimental::ServerCallbackRpcController*)>
- func,
- ServiceType* service)
+ func)
: func_(func) {}
void RunHandler(const HandlerParameter& param) final {
// Arena allocate a controller structure (that includes request/response)
@@ -81,9 +272,8 @@ class CallbackUnaryHandler : public MethodHandler {
if (status.ok()) {
// Call the actual function handler and expect the user to call finish
- CatchingCallback(std::move(func_), param.server_context,
- controller->request(), controller->response(),
- controller);
+ CatchingCallback(func_, param.server_context, controller->request(),
+ controller->response(), controller);
} else {
// if deserialization failed, we need to fail the call
controller->Finish(status);
@@ -117,79 +307,579 @@ class CallbackUnaryHandler : public MethodHandler {
: public experimental::ServerCallbackRpcController {
public:
void Finish(Status s) override {
- finish_tag_.Set(
- call_.call(),
- [this](bool) {
- grpc_call* call = call_.call();
- auto call_requester = std::move(call_requester_);
- this->~ServerCallbackRpcControllerImpl(); // explicitly call
- // destructor
- g_core_codegen_interface->grpc_call_unref(call);
- call_requester();
- },
- &finish_buf_);
+ finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
+ &finish_ops_);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
- finish_buf_.set_compression_level(ctx_->compression_level());
+ finish_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (s.ok()) {
- finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
- finish_buf_.SendMessage(resp_));
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
+ finish_ops_.SendMessage(resp_));
} else {
- finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
}
- finish_buf_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_buf_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
}
void SendInitialMetadata(std::function<void(bool)> f) override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
-
- meta_tag_.Set(call_.call(), std::move(f), &meta_buf_);
- meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
+ callbacks_outstanding_++;
+ // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
+ // and if performance of this operation matters
+ meta_tag_.Set(call_.call(),
+ [this, f](bool ok) {
+ f(ok);
+ MaybeDone();
+ },
+ &meta_ops_);
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
- meta_buf_.set_compression_level(ctx_->compression_level());
+ meta_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
- meta_buf_.set_core_cq_tag(&meta_tag_);
- call_.PerformOps(&meta_buf_);
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
}
private:
- template <class SrvType, class ReqType, class RespType>
- friend class CallbackUnaryHandler;
+ friend class CallbackUnaryHandler<RequestType, ResponseType>;
ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
- RequestType* req,
+ const RequestType* req,
std::function<void()> call_requester)
: ctx_(ctx),
call_(*call),
req_(req),
- call_requester_(std::move(call_requester)) {}
+ call_requester_(std::move(call_requester)) {
+ ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
+ }
~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
- RequestType* request() { return req_; }
+ const RequestType* request() { return req_; }
ResponseType* response() { return &resp_; }
- CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+ void MaybeDone() {
+ if (--callbacks_outstanding_ == 0) {
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
+ }
+ }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallbackWithSuccessTag meta_tag_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
- finish_buf_;
+ finish_ops_;
CallbackWithSuccessTag finish_tag_;
ServerContext* ctx_;
Call call_;
- RequestType* req_;
+ const RequestType* req_;
ResponseType resp_;
std::function<void()> call_requester_;
+ std::atomic_int callbacks_outstanding_{
+ 2}; // reserve for Finish and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackClientStreamingHandler : public MethodHandler {
+ public:
+ CallbackClientStreamingHandler(
+ std::function<
+ experimental::ServerReadReactor<RequestType, ResponseType>*()>
+ func)
+ : func_(std::move(func)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ // Arena allocate a reader structure (that includes response)
+ g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ experimental::ServerReadReactor<RequestType, ResponseType>* reactor =
+ param.status.ok()
+ ? CatchingReactorCreator<
+ experimental::ServerReadReactor<RequestType, ResponseType>>(
+ func_)
+ : nullptr;
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new UnimplementedReadReactor<RequestType, ResponseType>;
+ }
+
+ auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackReaderImpl)))
+ ServerCallbackReaderImpl(param.server_context, param.call,
+ std::move(param.call_requester), reactor);
+
+ reader->BindReactor(reactor);
+ reactor->OnStarted(param.server_context, reader->response());
+ reader->MaybeDone();
+ }
+
+ private:
+ std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()>
+ func_;
+
+ class ServerCallbackReaderImpl
+ : public experimental::ServerCallbackReader<RequestType> {
+ public:
+ void Finish(Status s) override {
+ finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
+ &finish_ops_);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response is dropped if the status is not OK.
+ if (s.ok()) {
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
+ finish_ops_.SendMessage(resp_));
+ } else {
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ }
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ callbacks_outstanding_++;
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnSendInitialMetadataDone(ok);
+ MaybeDone();
+ },
+ &meta_ops_);
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Read(RequestType* req) override {
+ callbacks_outstanding_++;
+ read_ops_.RecvMessage(req);
+ call_.PerformOps(&read_ops_);
+ }
+
+ private:
+ friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
+
+ ServerCallbackReaderImpl(
+ ServerContext* ctx, Call* call, std::function<void()> call_requester,
+ experimental::ServerReadReactor<RequestType, ResponseType>* reactor)
+ : ctx_(ctx),
+ call_(*call),
+ call_requester_(std::move(call_requester)),
+ reactor_(reactor) {
+ ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
+ read_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeDone();
+ },
+ &read_ops_);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ }
+
+ ~ServerCallbackReaderImpl() {}
+
+ ResponseType* response() { return &resp_; }
+
+ void MaybeDone() {
+ if (--callbacks_outstanding_ == 0) {
+ reactor_->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ this->~ServerCallbackReaderImpl(); // explicitly call destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
+ }
+ }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallbackWithSuccessTag meta_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
+ CallbackWithSuccessTag read_tag_;
+
+ ServerContext* ctx_;
+ Call call_;
+ ResponseType resp_;
+ std::function<void()> call_requester_;
+ experimental::ServerReadReactor<RequestType, ResponseType>* reactor_;
+ std::atomic_int callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackServerStreamingHandler : public MethodHandler {
+ public:
+ CallbackServerStreamingHandler(
+ std::function<
+ experimental::ServerWriteReactor<RequestType, ResponseType>*()>
+ func)
+ : func_(std::move(func)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ // Arena allocate a writer structure
+ g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ experimental::ServerWriteReactor<RequestType, ResponseType>* reactor =
+ param.status.ok()
+ ? CatchingReactorCreator<
+ experimental::ServerWriteReactor<RequestType, ResponseType>>(
+ func_)
+ : nullptr;
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new UnimplementedWriteReactor<RequestType, ResponseType>;
+ }
+
+ auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackWriterImpl)))
+ ServerCallbackWriterImpl(param.server_context, param.call,
+ static_cast<RequestType*>(param.request),
+ std::move(param.call_requester), reactor);
+ writer->BindReactor(reactor);
+ reactor->OnStarted(param.server_context, writer->request());
+ writer->MaybeDone();
+ }
+
+ void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
+ Status* status) final {
+ ByteBuffer buf;
+ buf.set_buffer(req);
+ auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call, sizeof(RequestType))) RequestType();
+ *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
+ buf.Release();
+ if (status->ok()) {
+ return request;
+ }
+ request->~RequestType();
+ return nullptr;
+ }
+
+ private:
+ std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()>
+ func_;
+
+ class ServerCallbackWriterImpl
+ : public experimental::ServerCallbackWriter<ResponseType> {
+ public:
+ void Finish(Status s) override {
+ finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
+ &finish_ops_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ callbacks_outstanding_++;
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnSendInitialMetadataDone(ok);
+ MaybeDone();
+ },
+ &meta_ops_);
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Write(const ResponseType* resp, WriteOptions options) override {
+ callbacks_outstanding_++;
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ if (!ctx_->sent_initial_metadata_) {
+ write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ write_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const ResponseType* resp, WriteOptions options,
+ Status s) override {
+ // This combines the write into the finish callback
+ // Don't send any message if the status is bad
+ if (s.ok()) {
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok());
+ }
+ Finish(std::move(s));
+ }
+
+ private:
+ friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
+
+ ServerCallbackWriterImpl(
+ ServerContext* ctx, Call* call, const RequestType* req,
+ std::function<void()> call_requester,
+ experimental::ServerWriteReactor<RequestType, ResponseType>* reactor)
+ : ctx_(ctx),
+ call_(*call),
+ req_(req),
+ call_requester_(std::move(call_requester)),
+ reactor_(reactor) {
+ ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
+ write_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeDone();
+ },
+ &write_ops_);
+ write_ops_.set_core_cq_tag(&write_tag_);
+ }
+ ~ServerCallbackWriterImpl() { req_->~RequestType(); }
+
+ const RequestType* request() { return req_; }
+
+ void MaybeDone() {
+ if (--callbacks_outstanding_ == 0) {
+ reactor_->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ this->~ServerCallbackWriterImpl(); // explicitly call destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
+ }
+ }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallbackWithSuccessTag meta_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallbackWithSuccessTag write_tag_;
+
+ ServerContext* ctx_;
+ Call call_;
+ const RequestType* req_;
+ std::function<void()> call_requester_;
+ experimental::ServerWriteReactor<RequestType, ResponseType>* reactor_;
+ std::atomic_int callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
+ };
+};
+
+template <class RequestType, class ResponseType>
+class CallbackBidiHandler : public MethodHandler {
+ public:
+ CallbackBidiHandler(
+ std::function<
+ experimental::ServerBidiReactor<RequestType, ResponseType>*()>
+ func)
+ : func_(std::move(func)) {}
+ void RunHandler(const HandlerParameter& param) final {
+ g_core_codegen_interface->grpc_call_ref(param.call->call());
+
+ experimental::ServerBidiReactor<RequestType, ResponseType>* reactor =
+ param.status.ok()
+ ? CatchingReactorCreator<
+ experimental::ServerBidiReactor<RequestType, ResponseType>>(
+ func_)
+ : nullptr;
+
+ if (reactor == nullptr) {
+ // if deserialization or reactor creator failed, we need to fail the call
+ reactor = new UnimplementedBidiReactor<RequestType, ResponseType>;
+ }
+
+ auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
+ ServerCallbackReaderWriterImpl(param.server_context, param.call,
+ std::move(param.call_requester),
+ reactor);
+
+ stream->BindReactor(reactor);
+ reactor->OnStarted(param.server_context);
+ stream->MaybeDone();
+ }
+
+ private:
+ std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()>
+ func_;
+
+ class ServerCallbackReaderWriterImpl
+ : public experimental::ServerCallbackReaderWriter<RequestType,
+ ResponseType> {
+ public:
+ void Finish(Status s) override {
+ finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); },
+ &finish_ops_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+
+ if (!ctx_->sent_initial_metadata_) {
+ finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ call_.PerformOps(&finish_ops_);
+ }
+
+ void SendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+ callbacks_outstanding_++;
+ meta_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnSendInitialMetadataDone(ok);
+ MaybeDone();
+ },
+ &meta_ops_);
+ meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_ops_.set_core_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_ops_);
+ }
+
+ void Write(const ResponseType* resp, WriteOptions options) override {
+ callbacks_outstanding_++;
+ if (options.is_last_message()) {
+ options.set_buffer_hint();
+ }
+ if (!ctx_->sent_initial_metadata_) {
+ write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ write_ops_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok());
+ call_.PerformOps(&write_ops_);
+ }
+
+ void WriteAndFinish(const ResponseType* resp, WriteOptions options,
+ Status s) override {
+ // Don't send any message if the status is bad
+ if (s.ok()) {
+ // TODO(vjpai): don't assert
+ GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok());
+ }
+ Finish(std::move(s));
+ }
+
+ void Read(RequestType* req) override {
+ callbacks_outstanding_++;
+ read_ops_.RecvMessage(req);
+ call_.PerformOps(&read_ops_);
+ }
+
+ private:
+ friend class CallbackBidiHandler<RequestType, ResponseType>;
+
+ ServerCallbackReaderWriterImpl(
+ ServerContext* ctx, Call* call, std::function<void()> call_requester,
+ experimental::ServerBidiReactor<RequestType, ResponseType>* reactor)
+ : ctx_(ctx),
+ call_(*call),
+ call_requester_(std::move(call_requester)),
+ reactor_(reactor) {
+ ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor);
+ write_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnWriteDone(ok);
+ MaybeDone();
+ },
+ &write_ops_);
+ write_ops_.set_core_cq_tag(&write_tag_);
+ read_tag_.Set(call_.call(),
+ [this](bool ok) {
+ reactor_->OnReadDone(ok);
+ MaybeDone();
+ },
+ &read_ops_);
+ read_ops_.set_core_cq_tag(&read_tag_);
+ }
+ ~ServerCallbackReaderWriterImpl() {}
+
+ void MaybeDone() {
+ if (--callbacks_outstanding_ == 0) {
+ reactor_->OnDone();
+ grpc_call* call = call_.call();
+ auto call_requester = std::move(call_requester_);
+ this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ call_requester();
+ }
+ }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallbackWithSuccessTag meta_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ finish_ops_;
+ CallbackWithSuccessTag finish_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallbackWithSuccessTag write_tag_;
+ CallOpSet<CallOpRecvMessage<RequestType>> read_ops_;
+ CallbackWithSuccessTag read_tag_;
+
+ ServerContext* ctx_;
+ Call call_;
+ std::function<void()> call_requester_;
+ experimental::ServerBidiReactor<RequestType, ResponseType>* reactor_;
+ std::atomic_int callbacks_outstanding_{
+ 3}; // reserve for OnStarted, Finish, and CompletionOp
};
};
diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h
index 82ee862f61..ccb5925e7d 100644
--- a/include/grpcpp/impl/codegen/server_context.h
+++ b/include/grpcpp/impl/codegen/server_context.h
@@ -66,13 +66,20 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
-template <class ServiceType, class RequestType, class ResponseType>
+template <class RequestType, class ResponseType>
class CallbackUnaryHandler;
+template <class RequestType, class ResponseType>
+class CallbackClientStreamingHandler;
+template <class RequestType, class ResponseType>
+class CallbackServerStreamingHandler;
+template <class RequestType, class ResponseType>
+class CallbackBidiHandler;
template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
class Call;
+class ServerReactor;
} // namespace internal
class CompletionQueue;
@@ -270,8 +277,14 @@ class ServerContext {
friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler;
- template <class ServiceType, class RequestType, class ResponseType>
+ template <class RequestType, class ResponseType>
friend class ::grpc::internal::CallbackUnaryHandler;
+ template <class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackClientStreamingHandler;
+ template <class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackServerStreamingHandler;
+ template <class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackBidiHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
friend class ::grpc::ClientContext;
@@ -282,7 +295,9 @@ class ServerContext {
class CompletionOp;
- void BeginCompletionOp(internal::Call* call, bool callback);
+ void BeginCompletionOp(internal::Call* call,
+ std::function<void(bool)> callback,
+ internal::ServerReactor* reactor);
/// Return the tag queued by BeginCompletionOp()
internal::CompletionQueueTag* GetCompletionOpTag();
@@ -299,12 +314,12 @@ class ServerContext {
uint32_t initial_metadata_flags() const { return 0; }
experimental::ServerRpcInfo* set_server_rpc_info(
- const char* method,
+ const char* method, internal::RpcMethod::RpcType type,
const std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>&
creators) {
if (creators.size() != 0) {
- rpc_info_ = new experimental::ServerRpcInfo(this, method);
+ rpc_info_ = new experimental::ServerRpcInfo(this, method, type);
rpc_info_->RegisterInterceptors(creators);
}
return rpc_info_;
diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h
index 5fb5df28b7..afc3c198cc 100644
--- a/include/grpcpp/impl/codegen/server_interceptor.h
+++ b/include/grpcpp/impl/codegen/server_interceptor.h
@@ -23,6 +23,7 @@
#include <vector>
#include <grpcpp/impl/codegen/interceptor.h>
+#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/string_ref.h>
namespace grpc {
@@ -44,19 +45,36 @@ class ServerInterceptorFactoryInterface {
class ServerRpcInfo {
public:
+ enum class Type { UNARY, CLIENT_STREAMING, SERVER_STREAMING, BIDI_STREAMING };
+
~ServerRpcInfo(){};
ServerRpcInfo(const ServerRpcInfo&) = delete;
- ServerRpcInfo(ServerRpcInfo&&) = default;
- ServerRpcInfo& operator=(ServerRpcInfo&&) = default;
+ ServerRpcInfo(ServerRpcInfo&&) = delete;
+ ServerRpcInfo& operator=(ServerRpcInfo&&) = delete;
// Getter methods
- const char* method() { return method_; }
+ const char* method() const { return method_; }
+ Type type() const { return type_; }
grpc::ServerContext* server_context() { return ctx_; }
private:
- ServerRpcInfo(grpc::ServerContext* ctx, const char* method)
- : ctx_(ctx), method_(method) {
+ static_assert(Type::UNARY ==
+ static_cast<Type>(internal::RpcMethod::NORMAL_RPC),
+ "violated expectation about Type enum");
+ static_assert(Type::CLIENT_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING),
+ "violated expectation about Type enum");
+ static_assert(Type::SERVER_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::SERVER_STREAMING),
+ "violated expectation about Type enum");
+ static_assert(Type::BIDI_STREAMING ==
+ static_cast<Type>(internal::RpcMethod::BIDI_STREAMING),
+ "violated expectation about Type enum");
+
+ ServerRpcInfo(grpc::ServerContext* ctx, const char* method,
+ internal::RpcMethod::RpcType type)
+ : ctx_(ctx), method_(method), type_(static_cast<Type>(type)) {
ref_.store(1);
}
@@ -86,6 +104,7 @@ class ServerRpcInfo {
grpc::ServerContext* ctx_ = nullptr;
const char* method_ = nullptr;
+ const Type type_;
std::atomic_int ref_;
std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_;
diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h
index 55c94f4d2f..e0e2629827 100644
--- a/include/grpcpp/impl/codegen/server_interface.h
+++ b/include/grpcpp/impl/codegen/server_interface.h
@@ -174,13 +174,14 @@ class ServerInterface : public internal::CallHook {
bool done_intercepting_;
};
+ /// RegisteredAsyncRequest is not part of the C++ API
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
- const char* name);
+ const char* name, internal::RpcMethod::RpcType type);
virtual bool FinalizeResult(void** tag, bool* status) override {
/* If we are done intercepting, then there is nothing more for us to do */
@@ -189,7 +190,7 @@ class ServerInterface : public internal::CallHook {
}
call_wrapper_ = internal::Call(
call_, server_, call_cq_, server_->max_receive_message_size(),
- context_->set_server_rpc_info(name_,
+ context_->set_server_rpc_info(name_, type_,
*server_->interceptor_creators()));
return BaseAsyncRequest::FinalizeResult(tag, status);
}
@@ -198,6 +199,7 @@ class ServerInterface : public internal::CallHook {
void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq);
const char* name_;
+ const internal::RpcMethod::RpcType type_;
};
class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
@@ -207,9 +209,9 @@ class ServerInterface : public internal::CallHook {
internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
- : RegisteredAsyncRequest(server, context, stream, call_cq,
- notification_cq, tag,
- registered_method->name()) {
+ : RegisteredAsyncRequest(
+ server, context, stream, call_cq, notification_cq, tag,
+ registered_method->name(), registered_method->method_type()) {
IssueRequest(registered_method->server_tag(), nullptr, notification_cq);
}
@@ -225,9 +227,9 @@ class ServerInterface : public internal::CallHook {
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
- : RegisteredAsyncRequest(server, context, stream, call_cq,
- notification_cq, tag,
- registered_method->name()),
+ : RegisteredAsyncRequest(
+ server, context, stream, call_cq, notification_cq, tag,
+ registered_method->name(), registered_method->method_type()),
registered_method_(registered_method),
server_(server),
context_(context),
diff --git a/include/grpcpp/security/credentials.h b/include/grpcpp/security/credentials.h
index 8dfbdec3e6..d8c9e04d77 100644
--- a/include/grpcpp/security/credentials.h
+++ b/include/grpcpp/security/credentials.h
@@ -46,8 +46,8 @@ std::shared_ptr<Channel> CreateCustomChannelWithInterceptors(
const grpc::string& target,
const std::shared_ptr<ChannelCredentials>& creds,
const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
} // namespace experimental
@@ -80,8 +80,8 @@ class ChannelCredentials : private GrpcLibraryCodegen {
const grpc::string& target,
const std::shared_ptr<ChannelCredentials>& creds,
const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
virtual std::shared_ptr<Channel> CreateChannel(
@@ -91,8 +91,8 @@ class ChannelCredentials : private GrpcLibraryCodegen {
// implemented as a virtual function so that it does not break API.
virtual std::shared_ptr<Channel> CreateChannelWithInterceptors(
const grpc::string& target, const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators) {
return nullptr;
};
diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h
index a14a4da578..cdcac186cb 100644
--- a/include/grpcpp/server.h
+++ b/include/grpcpp/server.h
@@ -111,8 +111,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
/// interceptors
std::shared_ptr<Channel> InProcessChannelWithInterceptors(
const ChannelArguments& args,
- std::unique_ptr<std::vector<
- std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators);
private: