aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpcpp
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-10-08 13:28:10 -0700
committerGravatar Vijay Pai <vpai@google.com>2018-10-29 13:41:25 -0700
commit84e763f10a1e10d36c7de35970f9d25958ee2e16 (patch)
tree0aeb2dfa3ac8a6ec9d829989cb99d70ee20fa8eb /include/grpcpp
parentccfd919190babf042c2876e6b94fa38a2508b424 (diff)
Experimental C++ server callback unary API
Diffstat (limited to 'include/grpcpp')
-rw-r--r--include/grpcpp/impl/codegen/byte_buffer.h4
-rw-r--r--include/grpcpp/impl/codegen/callback_common.h11
-rw-r--r--include/grpcpp/impl/codegen/channel_interface.h2
-rw-r--r--include/grpcpp/impl/codegen/completion_queue.h10
-rw-r--r--include/grpcpp/impl/codegen/rpc_service_method.h43
-rw-r--r--include/grpcpp/impl/codegen/server_callback.h204
-rw-r--r--include/grpcpp/impl/codegen/server_context.h11
-rw-r--r--include/grpcpp/impl/codegen/server_interface.h10
-rw-r--r--include/grpcpp/impl/codegen/service_type.h58
-rw-r--r--include/grpcpp/server.h13
-rw-r--r--include/grpcpp/support/server_callback.h24
11 files changed, 362 insertions, 28 deletions
diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h
index d54ae31852..abba5549b8 100644
--- a/include/grpcpp/impl/codegen/byte_buffer.h
+++ b/include/grpcpp/impl/codegen/byte_buffer.h
@@ -45,6 +45,8 @@ template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class CallbackUnaryHandler;
template <StatusCode code>
class ErrorMethodHandler;
template <class R>
@@ -154,6 +156,8 @@ class ByteBuffer final {
friend class internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class internal::ServerStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class internal::CallbackUnaryHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
template <class R>
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h
index eba9ec6edc..8273ef2f4a 100644
--- a/include/grpcpp/impl/codegen/callback_common.h
+++ b/include/grpcpp/impl/codegen/callback_common.h
@@ -101,10 +101,11 @@ class CallbackWithStatusTag
GPR_CODEGEN_ASSERT(ignored == ops_);
// Last use of func_ or status_, so ok to move them out
- CatchingCallback(std::move(func_), std::move(status_));
-
+ auto func = std::move(func_);
+ auto status = std::move(status_);
func_ = nullptr; // reset to clear this out for sure
status_ = Status(); // reset to clear this out for sure
+ CatchingCallback(std::move(func), std::move(status));
g_core_codegen_interface->grpc_call_unref(call_);
}
};
@@ -124,6 +125,8 @@ class CallbackWithSuccessTag
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }
+ CallbackWithSuccessTag() : call_(nullptr), ops_(nullptr) {}
+
CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f,
CompletionQueueTag* ops)
: call_(call), func_(std::move(f)), ops_(ops) {
@@ -154,9 +157,9 @@ class CallbackWithSuccessTag
GPR_CODEGEN_ASSERT(ignored == ops_);
// Last use of func_, so ok to move it out for rvalue call above
- CatchingCallback(std::move(func_), ok);
-
+ auto func = std::move(func_);
func_ = nullptr; // reset to clear this out for sure
+ CatchingCallback(std::move(func), ok);
g_core_codegen_interface->grpc_call_unref(call_);
}
};
diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h
index 6fd1dd1d9b..0735c96521 100644
--- a/include/grpcpp/impl/codegen/channel_interface.h
+++ b/include/grpcpp/impl/codegen/channel_interface.h
@@ -142,7 +142,7 @@ class ChannelInterface {
// channel. If the return value is nullptr, this channel doesn't support
// callback operations.
// TODO(vjpai): Consider a better default like using a global CQ
- // Returns nullptr (rather than being pure) since this is a new method
+ // Returns nullptr (rather than being pure) since this is a post-1.0 method
// and adding a new pure method to an interface would be a breaking change
// (even though this is private and non-API)
virtual CompletionQueue* CallbackCQ() { return nullptr; }
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index 5eef2c281f..d603c7c700 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -380,12 +380,18 @@ class ServerCompletionQueue : public CompletionQueue {
ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {}
private:
+ /// \param completion_type indicates whether this is a NEXT or CALLBACK
+ /// completion queue.
/// \param polling_type Informs the GRPC library about the type of polling
/// allowed on this completion queue. See grpc_cq_polling_type's description
/// in grpc_types.h for more details.
- ServerCompletionQueue(grpc_cq_polling_type polling_type)
+ /// \param shutdown_cb is the shutdown callback used for CALLBACK api queues
+ ServerCompletionQueue(grpc_cq_completion_type completion_type,
+ grpc_cq_polling_type polling_type,
+ grpc_experimental_completion_queue_functor* shutdown_cb)
: CompletionQueue(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type, nullptr}),
+ GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
+ shutdown_cb}),
polling_type_(polling_type) {}
grpc_cq_polling_type polling_type_;
diff --git a/include/grpcpp/impl/codegen/rpc_service_method.h b/include/grpcpp/impl/codegen/rpc_service_method.h
index e77f4046a3..44d9b8ad63 100644
--- a/include/grpcpp/impl/codegen/rpc_service_method.h
+++ b/include/grpcpp/impl/codegen/rpc_service_method.h
@@ -41,13 +41,18 @@ class MethodHandler {
virtual ~MethodHandler() {}
struct HandlerParameter {
HandlerParameter(Call* c, ServerContext* context, void* req,
- Status req_status)
- : call(c), server_context(context), request(req), status(req_status) {}
+ Status req_status, std::function<void()> renew)
+ : call(c),
+ server_context(context),
+ request(req),
+ status(req_status),
+ renewer(std::move(renew)) {}
~HandlerParameter() {}
Call* call;
ServerContext* server_context;
void* request;
Status status;
+ std::function<void()> renewer;
};
virtual void RunHandler(const HandlerParameter& param) = 0;
@@ -71,25 +76,29 @@ class RpcServiceMethod : public RpcMethod {
MethodHandler* handler)
: RpcMethod(name, type),
server_tag_(nullptr),
- async_type_(AsyncType::UNSET),
+ api_type_(ApiType::SYNC),
handler_(handler) {}
- enum class AsyncType {
- UNSET,
+ enum class ApiType {
+ SYNC,
ASYNC,
RAW,
+ CALL_BACK, // not CALLBACK because that is reserved in Windows
+ RAW_CALL_BACK,
};
void set_server_tag(void* tag) { server_tag_ = tag; }
void* server_tag() const { return server_tag_; }
/// if MethodHandler is nullptr, then this is an async method
MethodHandler* handler() const { return handler_.get(); }
+ ApiType api_type() const { return api_type_; }
void SetHandler(MethodHandler* handler) { handler_.reset(handler); }
- void SetServerAsyncType(RpcServiceMethod::AsyncType type) {
- if (async_type_ == AsyncType::UNSET) {
+ void SetServerApiType(RpcServiceMethod::ApiType type) {
+ if ((api_type_ == ApiType::SYNC) &&
+ (type == ApiType::ASYNC || type == ApiType::RAW)) {
// this marks this method as async
handler_.reset();
- } else {
+ } else if (api_type_ != ApiType::SYNC) {
// this is not an error condition, as it allows users to declare a server
// like WithRawMethod_foo<AsyncService>. However since it
// overwrites behavior, it should be logged.
@@ -98,24 +107,28 @@ class RpcServiceMethod : public RpcMethod {
"You are marking method %s as '%s', even though it was "
"previously marked '%s'. This behavior will overwrite the original "
"behavior. If you expected this then ignore this message.",
- name(), TypeToString(async_type_), TypeToString(type));
+ name(), TypeToString(api_type_), TypeToString(type));
}
- async_type_ = type;
+ api_type_ = type;
}
private:
void* server_tag_;
- AsyncType async_type_;
+ ApiType api_type_;
std::unique_ptr<MethodHandler> handler_;
- const char* TypeToString(RpcServiceMethod::AsyncType type) {
+ const char* TypeToString(RpcServiceMethod::ApiType type) {
switch (type) {
- case AsyncType::UNSET:
+ case ApiType::SYNC:
return "unset";
- case AsyncType::ASYNC:
+ case ApiType::ASYNC:
return "async";
- case AsyncType::RAW:
+ case ApiType::RAW:
return "raw";
+ case ApiType::CALL_BACK:
+ return "callback";
+ case ApiType::RAW_CALL_BACK:
+ return "raw_callback";
default:
GPR_UNREACHABLE_CODE(return "unknown");
}
diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h
new file mode 100644
index 0000000000..c8f7510ed5
--- /dev/null
+++ b/include/grpcpp/impl/codegen/server_callback.h
@@ -0,0 +1,204 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
+#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
+
+#include <functional>
+
+#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/callback_common.h>
+#include <grpcpp/impl/codegen/config.h>
+#include <grpcpp/impl/codegen/core_codegen_interface.h>
+#include <grpcpp/impl/codegen/server_context.h>
+#include <grpcpp/impl/codegen/server_interface.h>
+#include <grpcpp/impl/codegen/status.h>
+
+namespace grpc {
+
+// forward declarations
+namespace internal {
+template <class ServiceType, class RequestType, class ResponseType>
+class CallbackUnaryHandler;
+} // namespace internal
+
+namespace experimental {
+
+// For unary RPCs, the exposed controller class is only an interface
+// and the actual implementation is an internal class.
+class ServerCallbackRpcController {
+ public:
+ virtual ~ServerCallbackRpcController() {}
+
+ // The method handler must call this function when it is done so that
+ // the library knows to free its resources
+ virtual void Finish(Status s) = 0;
+ virtual void FinishWithError(Status s) = 0;
+
+ // Allow the method handler to push out the initial metadata before
+ // the response and status are ready
+ virtual void SendInitialMetadata(std::function<void(bool)>) = 0;
+};
+
+} // namespace experimental
+
+namespace internal {
+
+template <class ServiceType, class RequestType, class ResponseType>
+class CallbackUnaryHandler : public MethodHandler {
+ public:
+ CallbackUnaryHandler(
+ std::function<void(ServerContext*, const RequestType*, ResponseType*,
+ experimental::ServerCallbackRpcController*)>
+ func,
+ ServiceType* service)
+ : func_(func) {}
+ void RunHandler(const HandlerParameter& param) final {
+ // Arena allocate a controller structure (that includes request/response)
+ g_core_codegen_interface->grpc_call_ref(param.call->call());
+ auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
+ ServerCallbackRpcControllerImpl(
+ param.server_context, param.call,
+ static_cast<RequestType*>(param.request), std::move(param.renewer));
+ Status status = param.status;
+
+ if (status.ok()) {
+ // Call the actual function handler and expect the user to call finish
+ CatchingCallback(std::move(func_), param.server_context,
+ controller->request(), controller->response(),
+ controller);
+ } else {
+ // if deserialization failed, we need to fail the call
+ controller->Finish(status);
+ }
+ }
+
+ void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
+ Status* status) final {
+ ByteBuffer buf;
+ buf.set_buffer(req);
+ auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call, sizeof(RequestType))) RequestType();
+ *status = SerializationTraits<RequestType>::Deserialize(&buf, request);
+ buf.Release();
+ if (status->ok()) {
+ return request;
+ }
+ request->~RequestType();
+ return nullptr;
+ }
+
+ private:
+ std::function<void(ServerContext*, const RequestType*, ResponseType*,
+ experimental::ServerCallbackRpcController*)>
+ func_;
+
+ // The implementation class of ServerCallbackRpcController is a private member
+ // of CallbackUnaryHandler since it is never exposed anywhere, and this allows
+ // it to take advantage of CallbackUnaryHandler's friendships.
+
+ class ServerCallbackRpcControllerImpl
+ : public experimental::ServerCallbackRpcController {
+ public:
+ void Finish(Status s) override { FinishInternal(std::move(s), false); }
+
+ void FinishWithError(Status s) override {
+ FinishInternal(std::move(s), true);
+ }
+
+ void SendInitialMetadata(std::function<void(bool)> f) override {
+ GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
+
+ meta_tag_ =
+ CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_);
+ meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ meta_buf_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ meta_buf_.set_cq_tag(&meta_tag_);
+ call_.PerformOps(&meta_buf_);
+ }
+
+ private:
+ template <class SrvType, class ReqType, class RespType>
+ friend class CallbackUnaryHandler;
+
+ ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
+ RequestType* req,
+ std::function<void()> renewer)
+ : ctx_(ctx), call_(*call), req_(req), renewer_(std::move(renewer)) {}
+
+ ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
+
+ void FinishInternal(Status s, bool allow_error) {
+ finish_tag_ = CallbackWithSuccessTag(
+ call_.call(),
+ [this](bool) {
+ grpc_call* call = call_.call();
+ auto renewer = std::move(renewer_);
+ this->~ServerCallbackRpcControllerImpl(); // explicitly call
+ // destructor
+ g_core_codegen_interface->grpc_call_unref(call);
+ renewer();
+ },
+ &finish_buf_);
+ if (!ctx_->sent_initial_metadata_) {
+ finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
+ ctx_->initial_metadata_flags());
+ if (ctx_->compression_level_set()) {
+ finish_buf_.set_compression_level(ctx_->compression_level());
+ }
+ ctx_->sent_initial_metadata_ = true;
+ }
+ // The response may be dropped if the status is not OK.
+ if (allow_error || s.ok()) {
+ finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
+ finish_buf_.SendMessage(resp_));
+ } else {
+ finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s);
+ }
+ finish_buf_.set_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_buf_);
+ }
+
+ RequestType* request() { return req_; }
+ ResponseType* response() { return &resp_; }
+
+ CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+ CallbackWithSuccessTag meta_tag_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ finish_buf_;
+ CallbackWithSuccessTag finish_tag_;
+
+ ServerContext* ctx_;
+ Call call_;
+ RequestType* req_;
+ ResponseType resp_;
+ std::function<void()> renewer_;
+ };
+};
+
+} // namespace internal
+
+} // namespace grpc
+
+#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h
index 7559fb3b34..ebbf64bc6d 100644
--- a/include/grpcpp/impl/codegen/server_context.h
+++ b/include/grpcpp/impl/codegen/server_context.h
@@ -65,6 +65,8 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
+template <class ServiceType, class RequestType, class ResponseType>
+class CallbackUnaryHandler;
template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler;
template <StatusCode code>
@@ -267,6 +269,8 @@ class ServerContext {
friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler;
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class ::grpc::internal::CallbackUnaryHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
friend class ::grpc::ClientContext;
@@ -285,6 +289,11 @@ class ServerContext {
void set_call(grpc_call* call) { call_ = call; }
+ void BindDeadlineAndMetadata(gpr_timespec deadline, grpc_metadata_array* arr);
+
+ void Clear();
+ void Setup(gpr_timespec deadline);
+
uint32_t initial_metadata_flags() const { return 0; }
experimental::ServerRpcInfo* set_server_rpc_info(
@@ -321,7 +330,7 @@ class ServerContext {
pending_ops_;
bool has_pending_ops_;
- experimental::ServerRpcInfo* rpc_info_ = nullptr;
+ experimental::ServerRpcInfo* rpc_info_;
};
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h
index 92c87a5f7e..bde7740f44 100644
--- a/include/grpcpp/impl/codegen/server_interface.h
+++ b/include/grpcpp/impl/codegen/server_interface.h
@@ -338,6 +338,16 @@ class ServerInterface : public internal::CallHook {
interceptor_creators() {
return nullptr;
}
+
+ // EXPERIMENTAL
+ // A method to get the callbackable completion queue associated with this
+ // server. If the return value is nullptr, this server doesn't support
+ // callback operations.
+ // TODO(vjpai): Consider a better default like using a global CQ
+ // Returns nullptr (rather than being pure) since this is a post-1.0 method
+ // and adding a new pure method to an interface would be a breaking change
+ // (even though this is private and non-API)
+ virtual CompletionQueue* CallbackCQ() { return nullptr; }
};
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/service_type.h b/include/grpcpp/impl/codegen/service_type.h
index 9f1a052168..332a04c294 100644
--- a/include/grpcpp/impl/codegen/service_type.h
+++ b/include/grpcpp/impl/codegen/service_type.h
@@ -71,7 +71,20 @@ class Service {
bool has_synchronous_methods() const {
for (auto it = methods_.begin(); it != methods_.end(); ++it) {
- if (*it && (*it)->handler() != nullptr) {
+ if (*it &&
+ (*it)->api_type() == internal::RpcServiceMethod::ApiType::SYNC) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ bool has_callback_methods() const {
+ for (auto it = methods_.begin(); it != methods_.end(); ++it) {
+ if (*it && ((*it)->api_type() ==
+ internal::RpcServiceMethod::ApiType::CALL_BACK ||
+ (*it)->api_type() ==
+ internal::RpcServiceMethod::ApiType::RAW_CALL_BACK)) {
return true;
}
}
@@ -88,6 +101,43 @@ class Service {
}
protected:
+ // TODO(vjpai): Promote experimental contents once callback API is accepted
+ class experimental_type {
+ public:
+ explicit experimental_type(Service* service) : service_(service) {}
+
+ void MarkMethodCallback(int index, internal::MethodHandler* handler) {
+ // This does not have to be a hard error, however no one has approached us
+ // with a use case yet. Please file an issue if you believe you have one.
+ size_t idx = static_cast<size_t>(index);
+ GPR_CODEGEN_ASSERT(
+ service_->methods_[idx].get() != nullptr &&
+ "Cannot mark the method as 'callback' because it has already been "
+ "marked as 'generic'.");
+ service_->methods_[idx]->SetHandler(handler);
+ service_->methods_[idx]->SetServerApiType(
+ internal::RpcServiceMethod::ApiType::CALL_BACK);
+ }
+
+ void MarkMethodRawCallback(int index, internal::MethodHandler* handler) {
+ // This does not have to be a hard error, however no one has approached us
+ // with a use case yet. Please file an issue if you believe you have one.
+ size_t idx = static_cast<size_t>(index);
+ GPR_CODEGEN_ASSERT(
+ service_->methods_[idx].get() != nullptr &&
+ "Cannot mark the method as 'raw callback' because it has already "
+ "been marked as 'generic'.");
+ service_->methods_[idx]->SetHandler(handler);
+ service_->methods_[idx]->SetServerApiType(
+ internal::RpcServiceMethod::ApiType::RAW_CALL_BACK);
+ }
+
+ private:
+ Service* service_;
+ };
+
+ experimental_type experimental() { return experimental_type(this); }
+
template <class Message>
void RequestAsyncUnary(int index, ServerContext* context, Message* request,
internal::ServerAsyncStreamingInterface* stream,
@@ -138,8 +188,7 @@ class Service {
methods_[idx].get() != nullptr &&
"Cannot mark the method as 'async' because it has already been "
"marked as 'generic'.");
- methods_[idx]->SetServerAsyncType(
- internal::RpcServiceMethod::AsyncType::ASYNC);
+ methods_[idx]->SetServerApiType(internal::RpcServiceMethod::ApiType::ASYNC);
}
void MarkMethodRaw(int index) {
@@ -149,8 +198,7 @@ class Service {
GPR_CODEGEN_ASSERT(methods_[idx].get() != nullptr &&
"Cannot mark the method as 'raw' because it has already "
"been marked as 'generic'.");
- methods_[idx]->SetServerAsyncType(
- internal::RpcServiceMethod::AsyncType::RAW);
+ methods_[idx]->SetServerApiType(internal::RpcServiceMethod::ApiType::RAW);
}
void MarkMethodGeneric(int index) {
diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h
index 2b89ffd317..ef54a218a7 100644
--- a/include/grpcpp/server.h
+++ b/include/grpcpp/server.h
@@ -202,6 +202,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
friend class ServerInitializer;
class SyncRequest;
+ class CallbackRequest;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
@@ -224,6 +225,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
return max_receive_message_size_;
};
+ CompletionQueue* CallbackCQ() override;
+
ServerInitializer* initializer();
const int max_receive_message_size_;
@@ -238,6 +241,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
/// the \a sync_server_cqs)
std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
+ /// Outstanding callback requests
+ std::vector<std::unique_ptr<CallbackRequest>> callback_reqs_;
+
// Server status
std::mutex mu_;
bool started_;
@@ -264,6 +270,13 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
interceptor_creators_;
+
+ // callback_cq_ references the callbackable completion queue associated
+ // with this server (if any). It is set on the first call to CallbackCQ().
+ // It is _not owned_ by the server; ownership belongs with its internal
+ // shutdown callback tag (invoked when the CQ is fully shutdown).
+ // It is protected by mu_
+ CompletionQueue* callback_cq_ = nullptr;
};
} // namespace grpc
diff --git a/include/grpcpp/support/server_callback.h b/include/grpcpp/support/server_callback.h
new file mode 100644
index 0000000000..b0aeeb53c5
--- /dev/null
+++ b/include/grpcpp/support/server_callback.h
@@ -0,0 +1,24 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
+#define GRPCPP_SUPPORT_SERVER_CALLBACK_H
+
+#include <grpcpp/impl/codegen/server_callback.h>
+
+#endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H