diff options
Diffstat (limited to 'include')
19 files changed, 432 insertions, 67 deletions
diff --git a/include/grpcpp/impl/codegen/async_unary_call.h b/include/grpcpp/impl/codegen/async_unary_call.h index 744b128141..89dcb12418 100644 --- a/include/grpcpp/impl/codegen/async_unary_call.h +++ b/include/grpcpp/impl/codegen/async_unary_call.h @@ -240,7 +240,7 @@ class ServerAsyncResponseWriter final /// metadata. void Finish(const W& msg, const Status& status, void* tag) { finish_buf_.set_output_tag(tag); - finish_buf_.set_cq_tag(&finish_buf_); + finish_buf_.set_core_cq_tag(&finish_buf_); if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h index d54ae31852..abba5549b8 100644 --- a/include/grpcpp/impl/codegen/byte_buffer.h +++ b/include/grpcpp/impl/codegen/byte_buffer.h @@ -45,6 +45,8 @@ template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class CallbackUnaryHandler; template <StatusCode code> class ErrorMethodHandler; template <class R> @@ -154,6 +156,8 @@ class ByteBuffer final { friend class internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> friend class internal::ServerStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::CallbackUnaryHandler; template <StatusCode code> friend class internal::ErrorMethodHandler; template <class R> diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index 785688e67f..5c52b027b2 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -770,19 +770,19 @@ class CallOpSet : public CallOpSetInterface, public Op5, public Op6 { public: - CallOpSet() : cq_tag_(this), return_tag_(this) {} + CallOpSet() : core_cq_tag_(this), return_tag_(this) {} // The copy constructor and assignment operator reset the value of - // cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ since - // those are only meaningful on a specific object, not across objects. + // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ + // since those are only meaningful on a specific object, not across objects. CallOpSet(const CallOpSet& other) - : cq_tag_(this), + : core_cq_tag_(this), return_tag_(this), call_(other.call_), done_intercepting_(false), interceptor_methods_(InterceptorBatchMethodsImpl()) {} CallOpSet& operator=(const CallOpSet& other) { - cq_tag_ = this; + core_cq_tag_ = this; return_tag_ = this; call_ = other.call_; done_intercepting_ = false; @@ -834,13 +834,13 @@ class CallOpSet : public CallOpSetInterface, void set_output_tag(void* return_tag) { return_tag_ = return_tag; } - void* cq_tag() override { return cq_tag_; } + void* core_cq_tag() override { return core_cq_tag_; } - /// set_cq_tag is used to provide a different core CQ tag than "this". + /// set_core_cq_tag is used to provide a different core CQ tag than "this". /// This is used for callback-based tags, where the core tag is the core /// callback function. It does not change the use or behavior of any other /// function (such as FinalizeResult) - void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } // This will be called while interceptors are run if the RPC is a hijacked // RPC. This should set hijacking state for each of the ops. @@ -866,7 +866,7 @@ class CallOpSet : public CallOpSetInterface, this->Op6::AddOp(ops, &nops); GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( - call_.call(), ops, nops, cq_tag(), nullptr)); + call_.call(), ops, nops, core_cq_tag(), nullptr)); } // Should be called after interceptors are done running on the finalize result @@ -875,7 +875,7 @@ class CallOpSet : public CallOpSetInterface, done_intercepting_ = true; GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( - call_.call(), nullptr, 0, cq_tag(), nullptr)); + call_.call(), nullptr, 0, core_cq_tag(), nullptr)); } private: @@ -906,7 +906,7 @@ class CallOpSet : public CallOpSetInterface, return interceptor_methods_.RunInterceptors(); } - void* cq_tag_; + void* core_cq_tag_; void* return_tag_; Call call_; bool done_intercepting_ = false; diff --git a/include/grpcpp/impl/codegen/call_op_set_interface.h b/include/grpcpp/impl/codegen/call_op_set_interface.h index 815227a299..3b74566a6d 100644 --- a/include/grpcpp/impl/codegen/call_op_set_interface.h +++ b/include/grpcpp/impl/codegen/call_op_set_interface.h @@ -38,9 +38,9 @@ class CallOpSetInterface : public CompletionQueueTag { virtual void FillOps(internal::Call* call) = 0; /// Get the tag to be used at the core completion queue. Generally, the - /// value of cq_tag will be "this". However, it can be overridden if we + /// value of core_cq_tag will be "this". However, it can be overridden if we /// want core to process the tag differently (e.g., as a core callback) - virtual void* cq_tag() = 0; + virtual void* core_cq_tag() = 0; // This will be called while interceptors are run if the RPC is a hijacked // RPC. This should set hijacking state for each of the ops. diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index eba9ec6edc..29deef658f 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -101,10 +101,11 @@ class CallbackWithStatusTag GPR_CODEGEN_ASSERT(ignored == ops_); // Last use of func_ or status_, so ok to move them out - CatchingCallback(std::move(func_), std::move(status_)); - + auto func = std::move(func_); + auto status = std::move(status_); func_ = nullptr; // reset to clear this out for sure status_ = Status(); // reset to clear this out for sure + CatchingCallback(std::move(func), std::move(status)); g_core_codegen_interface->grpc_call_unref(call_); } }; @@ -124,6 +125,8 @@ class CallbackWithSuccessTag // there are no tests catching the compiler warning. static void operator delete(void*, void*) { assert(0); } + CallbackWithSuccessTag() : call_(nullptr), ops_(nullptr) {} + CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, CompletionQueueTag* ops) : call_(call), func_(std::move(f)), ops_(ops) { @@ -138,6 +141,9 @@ class CallbackWithSuccessTag // that are detected before the operations are internally processed. void force_run(bool ok) { Run(ok); } + /// check if this tag has ever been set + operator bool() const { return call_ != nullptr; } + private: grpc_call* call_; std::function<void(bool)> func_; @@ -150,13 +156,19 @@ class CallbackWithSuccessTag void Run(bool ok) { void* ignored = ops_; bool new_ok = ok; - GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + // Allow a "false" return value from FinalizeResult to silence the + // callback, just as it silences a CQ tag in the async cases + bool do_callback = ops_->FinalizeResult(&ignored, &new_ok); GPR_CODEGEN_ASSERT(ignored == ops_); - // Last use of func_, so ok to move it out for rvalue call above - CatchingCallback(std::move(func_), ok); - - func_ = nullptr; // reset to clear this out for sure + if (do_callback) { + // Last use of func_, so ok to move it out for rvalue call above + auto func = std::move(func_); + func_ = nullptr; // reset to clear this out for sure + CatchingCallback(std::move(func), ok); + } else { + func_ = nullptr; // reset to clear this out for sure + } g_core_codegen_interface->grpc_call_unref(call_); } }; diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index e8ed6a9805..6ec1ffb8c7 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -21,6 +21,7 @@ #include <grpc/impl/codegen/connectivity_state.h> #include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/client_context.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/time.h> @@ -142,7 +143,7 @@ class ChannelInterface { // channel. If the return value is nullptr, this channel doesn't support // callback operations. // TODO(vjpai): Consider a better default like using a global CQ - // Returns nullptr (rather than being pure) since this is a new method + // Returns nullptr (rather than being pure) since this is a post-1.0 method // and adding a new pure method to an interface would be a breaking change // (even though this is private and non-API) virtual CompletionQueue* CallbackCQ() { return nullptr; } diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index ecb00a0769..4baa819091 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl { ops->AllowNoMessage(); ops->ClientSendClose(); ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_cq_tag(tag); + ops->set_core_cq_tag(tag); call.PerformOps(ops); } }; diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h index bc62bb4a7b..f69c99ab22 100644 --- a/include/grpcpp/impl/codegen/client_interceptor.h +++ b/include/grpcpp/impl/codegen/client_interceptor.h @@ -22,7 +22,6 @@ #include <memory> #include <vector> -#include <grpc/impl/codegen/log.h> #include <grpcpp/impl/codegen/interceptor.h> #include <grpcpp/impl/codegen/string_ref.h> diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 5eef2c281f..d603c7c700 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -380,12 +380,18 @@ class ServerCompletionQueue : public CompletionQueue { ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {} private: + /// \param completion_type indicates whether this is a NEXT or CALLBACK + /// completion queue. /// \param polling_type Informs the GRPC library about the type of polling /// allowed on this completion queue. See grpc_cq_polling_type's description /// in grpc_types.h for more details. - ServerCompletionQueue(grpc_cq_polling_type polling_type) + /// \param shutdown_cb is the shutdown callback used for CALLBACK api queues + ServerCompletionQueue(grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type, + grpc_experimental_completion_queue_functor* shutdown_cb) : CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type, nullptr}), + GRPC_CQ_CURRENT_VERSION, completion_type, polling_type, + shutdown_cb}), polling_type_(polling_type) {} grpc_cq_polling_type polling_type_; diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index cdd34b80d1..15cab711e5 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -21,13 +21,13 @@ #include <grpc/impl/codegen/grpc_types.h> #include <grpcpp/impl/codegen/byte_buffer.h> -#include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/metadata_map.h> namespace grpc { +class ChannelInterface; class Status; namespace experimental { diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index 4f02e3e39b..dd53f975f6 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -66,7 +66,7 @@ class RpcMethodHandler : public MethodHandler { return func_(service_, param.server_context, static_cast<RequestType*>(param.request), &rsp); }); - delete static_cast<RequestType*>(param.request); + static_cast<RequestType*>(param.request)->~RequestType(); } GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_); @@ -86,16 +86,18 @@ class RpcMethodHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_byte_buffer* req, Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + Status* status) final { ByteBuffer buf; buf.set_buffer(req); - auto* request = new RequestType(); + auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); *status = SerializationTraits<RequestType>::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } - delete request; + request->~RequestType(); return nullptr; } @@ -170,7 +172,7 @@ class ServerStreamingHandler : public MethodHandler { return func_(service_, param.server_context, static_cast<RequestType*>(param.request), &writer); }); - delete static_cast<RequestType*>(param.request); + static_cast<RequestType*>(param.request)->~RequestType(); } CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; @@ -189,16 +191,18 @@ class ServerStreamingHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_byte_buffer* req, Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + Status* status) final { ByteBuffer buf; buf.set_buffer(req); - auto* request = new RequestType(); + auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); *status = SerializationTraits<RequestType>::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } - delete request; + request->~RequestType(); return nullptr; } @@ -323,7 +327,8 @@ class ErrorMethodHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } - void* Deserialize(grpc_byte_buffer* req, Status* status) final { + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + Status* status) final { // We have to destroy any request payload if (req != nullptr) { g_core_codegen_interface->grpc_byte_buffer_destroy(req); diff --git a/include/grpcpp/impl/codegen/rpc_service_method.h b/include/grpcpp/impl/codegen/rpc_service_method.h index 44da2bd768..f465c5fc2f 100644 --- a/include/grpcpp/impl/codegen/rpc_service_method.h +++ b/include/grpcpp/impl/codegen/rpc_service_method.h @@ -40,14 +40,28 @@ class MethodHandler { public: virtual ~MethodHandler() {} struct HandlerParameter { + /// Constructor for HandlerParameter + /// + /// \param c : the gRPC Call structure for this server call + /// \param context : the ServerContext structure for this server call + /// \param req : the request payload, if appropriate for this RPC + /// \param req_status : the request status after any interceptors have run + /// \param rpc_requester : used only by the callback API. It is a function + /// called by the RPC Controller to request another RPC (and also + /// to set up the state required to make that request possible) HandlerParameter(Call* c, ServerContext* context, void* req, - Status req_status) - : call(c), server_context(context), request(req), status(req_status) {} + Status req_status, std::function<void()> requester) + : call(c), + server_context(context), + request(req), + status(req_status), + call_requester(std::move(requester)) {} ~HandlerParameter() {} Call* call; ServerContext* server_context; void* request; Status status; + std::function<void()> call_requester; }; virtual void RunHandler(const HandlerParameter& param) = 0; @@ -56,7 +70,8 @@ class MethodHandler { a HandlerParameter and passed to RunHandler. It is illegal to access the pointer after calling RunHandler. Ownership of the deserialized request is retained by the handler. Returns nullptr if deserialization failed. */ - virtual void* Deserialize(grpc_byte_buffer* req, Status* status) { + virtual void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + Status* status) { GPR_CODEGEN_ASSERT(req == nullptr); return nullptr; } @@ -70,25 +85,29 @@ class RpcServiceMethod : public RpcMethod { MethodHandler* handler) : RpcMethod(name, type), server_tag_(nullptr), - async_type_(AsyncType::UNSET), + api_type_(ApiType::SYNC), handler_(handler) {} - enum class AsyncType { - UNSET, + enum class ApiType { + SYNC, ASYNC, RAW, + CALL_BACK, // not CALLBACK because that is reserved in Windows + RAW_CALL_BACK, }; void set_server_tag(void* tag) { server_tag_ = tag; } void* server_tag() const { return server_tag_; } /// if MethodHandler is nullptr, then this is an async method MethodHandler* handler() const { return handler_.get(); } + ApiType api_type() const { return api_type_; } void SetHandler(MethodHandler* handler) { handler_.reset(handler); } - void SetServerAsyncType(RpcServiceMethod::AsyncType type) { - if (async_type_ == AsyncType::UNSET) { + void SetServerApiType(RpcServiceMethod::ApiType type) { + if ((api_type_ == ApiType::SYNC) && + (type == ApiType::ASYNC || type == ApiType::RAW)) { // this marks this method as async handler_.reset(); - } else { + } else if (api_type_ != ApiType::SYNC) { // this is not an error condition, as it allows users to declare a server // like WithRawMethod_foo<AsyncService>. However since it // overwrites behavior, it should be logged. @@ -97,24 +116,28 @@ class RpcServiceMethod : public RpcMethod { "You are marking method %s as '%s', even though it was " "previously marked '%s'. This behavior will overwrite the original " "behavior. If you expected this then ignore this message.", - name(), TypeToString(async_type_), TypeToString(type)); + name(), TypeToString(api_type_), TypeToString(type)); } - async_type_ = type; + api_type_ = type; } private: void* server_tag_; - AsyncType async_type_; + ApiType api_type_; std::unique_ptr<MethodHandler> handler_; - const char* TypeToString(RpcServiceMethod::AsyncType type) { + const char* TypeToString(RpcServiceMethod::ApiType type) { switch (type) { - case AsyncType::UNSET: - return "unset"; - case AsyncType::ASYNC: + case ApiType::SYNC: + return "sync"; + case ApiType::ASYNC: return "async"; - case AsyncType::RAW: + case ApiType::RAW: return "raw"; + case ApiType::CALL_BACK: + return "callback"; + case ApiType::RAW_CALL_BACK: + return "raw_callback"; default: GPR_UNREACHABLE_CODE(return "unknown"); } diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h new file mode 100644 index 0000000000..5d56cbf1df --- /dev/null +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -0,0 +1,200 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H +#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H + +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/callback_common.h> +#include <grpcpp/impl/codegen/config.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/server_context.h> +#include <grpcpp/impl/codegen/server_interface.h> +#include <grpcpp/impl/codegen/status.h> + +namespace grpc { + +// forward declarations +namespace internal { +template <class ServiceType, class RequestType, class ResponseType> +class CallbackUnaryHandler; +} // namespace internal + +namespace experimental { + +// For unary RPCs, the exposed controller class is only an interface +// and the actual implementation is an internal class. +class ServerCallbackRpcController { + public: + virtual ~ServerCallbackRpcController() {} + + // The method handler must call this function when it is done so that + // the library knows to free its resources + virtual void Finish(Status s) = 0; + + // Allow the method handler to push out the initial metadata before + // the response and status are ready + virtual void SendInitialMetadata(std::function<void(bool)>) = 0; +}; + +} // namespace experimental + +namespace internal { + +template <class ServiceType, class RequestType, class ResponseType> +class CallbackUnaryHandler : public MethodHandler { + public: + CallbackUnaryHandler( + std::function<void(ServerContext*, const RequestType*, ResponseType*, + experimental::ServerCallbackRpcController*)> + func, + ServiceType* service) + : func_(func) {} + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a controller structure (that includes request/response) + g_core_codegen_interface->grpc_call_ref(param.call->call()); + auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) + ServerCallbackRpcControllerImpl( + param.server_context, param.call, + static_cast<RequestType*>(param.request), + std::move(param.call_requester)); + Status status = param.status; + + if (status.ok()) { + // Call the actual function handler and expect the user to call finish + CatchingCallback(std::move(func_), param.server_context, + controller->request(), controller->response(), + controller); + } else { + // if deserialization failed, we need to fail the call + controller->Finish(status); + } + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + Status* status) final { + ByteBuffer buf; + buf.set_buffer(req); + auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); + *status = SerializationTraits<RequestType>::Deserialize(&buf, request); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; + } + + private: + std::function<void(ServerContext*, const RequestType*, ResponseType*, + experimental::ServerCallbackRpcController*)> + func_; + + // The implementation class of ServerCallbackRpcController is a private member + // of CallbackUnaryHandler since it is never exposed anywhere, and this allows + // it to take advantage of CallbackUnaryHandler's friendships. + class ServerCallbackRpcControllerImpl + : public experimental::ServerCallbackRpcController { + public: + void Finish(Status s) override { + finish_tag_ = CallbackWithSuccessTag( + call_.call(), + [this](bool) { + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackRpcControllerImpl(); // explicitly call + // destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + }, + &finish_buf_); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (s.ok()) { + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_buf_.SendMessage(resp_)); + } else { + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s); + } + finish_buf_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_buf_); + } + + void SendInitialMetadata(std::function<void(bool)> f) override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + meta_tag_ = + CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_); + meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_buf_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_buf_); + } + + private: + template <class SrvType, class ReqType, class RespType> + friend class CallbackUnaryHandler; + + ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, + RequestType* req, + std::function<void()> call_requester) + : ctx_(ctx), + call_(*call), + req_(req), + call_requester_(std::move(call_requester)) {} + + ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } + + RequestType* request() { return req_; } + ResponseType* response() { return &resp_; } + + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallbackWithSuccessTag meta_tag_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + finish_buf_; + CallbackWithSuccessTag finish_tag_; + + ServerContext* ctx_; + Call call_; + RequestType* req_; + ResponseType resp_; + std::function<void()> call_requester_; + }; +}; + +} // namespace internal + +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 7559fb3b34..82ee862f61 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -27,6 +27,7 @@ #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/call_op_set.h> +#include <grpcpp/impl/codegen/callback_common.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/create_auth_context.h> @@ -65,6 +66,8 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class CallbackUnaryHandler; template <class Streamer, bool WriteNeeded> class TemplatedBidiStreamingHandler; template <StatusCode code> @@ -137,7 +140,7 @@ class ServerContext { /// must end in "-bin". void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); - /// IsCancelled is always safe to call when using sync API. + /// IsCancelled is always safe to call when using sync or callback API. /// When using async API, it is only safe to call IsCancelled after /// the AsyncNotifyWhenDone tag has been delivered. bool IsCancelled() const; @@ -267,6 +270,8 @@ class ServerContext { friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> friend class ::grpc::internal::TemplatedBidiStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class ::grpc::internal::CallbackUnaryHandler; template <StatusCode code> friend class internal::ErrorMethodHandler; friend class ::grpc::ClientContext; @@ -277,7 +282,7 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(internal::Call* call); + void BeginCompletionOp(internal::Call* call, bool callback); /// Return the tag queued by BeginCompletionOp() internal::CompletionQueueTag* GetCompletionOpTag(); @@ -285,6 +290,12 @@ class ServerContext { void set_call(grpc_call* call) { call_ = call; } + void BindDeadlineAndMetadata(gpr_timespec deadline, grpc_metadata_array* arr); + + void Clear(); + + void Setup(gpr_timespec deadline); + uint32_t initial_metadata_flags() const { return 0; } experimental::ServerRpcInfo* set_server_rpc_info( @@ -302,6 +313,7 @@ class ServerContext { CompletionOp* completion_op_; bool has_notify_when_done_tag_; void* async_notify_when_done_tag_; + internal::CallbackWithSuccessTag completion_tag_; gpr_timespec deadline_; grpc_call* call_; @@ -321,7 +333,7 @@ class ServerContext { pending_ops_; bool has_pending_ops_; - experimental::ServerRpcInfo* rpc_info_ = nullptr; + experimental::ServerRpcInfo* rpc_info_; }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h index c39e9a988d..5fb5df28b7 100644 --- a/include/grpcpp/impl/codegen/server_interceptor.h +++ b/include/grpcpp/impl/codegen/server_interceptor.h @@ -22,7 +22,6 @@ #include <atomic> #include <vector> -#include <grpc/impl/codegen/log.h> #include <grpcpp/impl/codegen/interceptor.h> #include <grpcpp/impl/codegen/string_ref.h> diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index 92c87a5f7e..8bfb10f704 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -333,11 +333,26 @@ class ServerInterface : public internal::CallHook { } private: - virtual const std::vector< + // EXPERIMENTAL + // Getter method for the vector of interceptor factory objects. + // Returns a nullptr (rather than being pure) since this is a new method and + // adding a new pure method to an interface would be a breaking change (even + // though this is private and non-API) + virtual std::vector< std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators() { return nullptr; } + + // EXPERIMENTAL + // A method to get the callbackable completion queue associated with this + // server. If the return value is nullptr, this server doesn't support + // callback operations. + // TODO(vjpai): Consider a better default like using a global CQ + // Returns nullptr (rather than being pure) since this is a post-1.0 method + // and adding a new pure method to an interface would be a breaking change + // (even though this is private and non-API) + virtual CompletionQueue* CallbackCQ() { return nullptr; } }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/service_type.h b/include/grpcpp/impl/codegen/service_type.h index 9f1a052168..332a04c294 100644 --- a/include/grpcpp/impl/codegen/service_type.h +++ b/include/grpcpp/impl/codegen/service_type.h @@ -71,7 +71,20 @@ class Service { bool has_synchronous_methods() const { for (auto it = methods_.begin(); it != methods_.end(); ++it) { - if (*it && (*it)->handler() != nullptr) { + if (*it && + (*it)->api_type() == internal::RpcServiceMethod::ApiType::SYNC) { + return true; + } + } + return false; + } + + bool has_callback_methods() const { + for (auto it = methods_.begin(); it != methods_.end(); ++it) { + if (*it && ((*it)->api_type() == + internal::RpcServiceMethod::ApiType::CALL_BACK || + (*it)->api_type() == + internal::RpcServiceMethod::ApiType::RAW_CALL_BACK)) { return true; } } @@ -88,6 +101,43 @@ class Service { } protected: + // TODO(vjpai): Promote experimental contents once callback API is accepted + class experimental_type { + public: + explicit experimental_type(Service* service) : service_(service) {} + + void MarkMethodCallback(int index, internal::MethodHandler* handler) { + // This does not have to be a hard error, however no one has approached us + // with a use case yet. Please file an issue if you believe you have one. + size_t idx = static_cast<size_t>(index); + GPR_CODEGEN_ASSERT( + service_->methods_[idx].get() != nullptr && + "Cannot mark the method as 'callback' because it has already been " + "marked as 'generic'."); + service_->methods_[idx]->SetHandler(handler); + service_->methods_[idx]->SetServerApiType( + internal::RpcServiceMethod::ApiType::CALL_BACK); + } + + void MarkMethodRawCallback(int index, internal::MethodHandler* handler) { + // This does not have to be a hard error, however no one has approached us + // with a use case yet. Please file an issue if you believe you have one. + size_t idx = static_cast<size_t>(index); + GPR_CODEGEN_ASSERT( + service_->methods_[idx].get() != nullptr && + "Cannot mark the method as 'raw callback' because it has already " + "been marked as 'generic'."); + service_->methods_[idx]->SetHandler(handler); + service_->methods_[idx]->SetServerApiType( + internal::RpcServiceMethod::ApiType::RAW_CALL_BACK); + } + + private: + Service* service_; + }; + + experimental_type experimental() { return experimental_type(this); } + template <class Message> void RequestAsyncUnary(int index, ServerContext* context, Message* request, internal::ServerAsyncStreamingInterface* stream, @@ -138,8 +188,7 @@ class Service { methods_[idx].get() != nullptr && "Cannot mark the method as 'async' because it has already been " "marked as 'generic'."); - methods_[idx]->SetServerAsyncType( - internal::RpcServiceMethod::AsyncType::ASYNC); + methods_[idx]->SetServerApiType(internal::RpcServiceMethod::ApiType::ASYNC); } void MarkMethodRaw(int index) { @@ -149,8 +198,7 @@ class Service { GPR_CODEGEN_ASSERT(methods_[idx].get() != nullptr && "Cannot mark the method as 'raw' because it has already " "been marked as 'generic'."); - methods_[idx]->SetServerAsyncType( - internal::RpcServiceMethod::AsyncType::RAW); + methods_[idx]->SetServerApiType(internal::RpcServiceMethod::ApiType::RAW); } void MarkMethodGeneric(int index) { diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 2b89ffd317..a14a4da578 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -191,8 +191,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { grpc_server* server() override { return server_; }; private: - const std::vector< - std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* + std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators() override { return &interceptor_creators_; } @@ -202,6 +201,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { friend class ServerInitializer; class SyncRequest; + class CallbackRequest; class UnimplementedAsyncRequest; class UnimplementedAsyncResponse; @@ -224,8 +224,18 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { return max_receive_message_size_; }; + CompletionQueue* CallbackCQ() override; + ServerInitializer* initializer(); + // A vector of interceptor factory objects. + // This should be destroyed after health_check_service_ and this requirement + // is satisfied by declaring interceptor_creators_ before + // health_check_service_. (C++ mandates that member objects be destroyed in + // the reverse order of initialization.) + std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> + interceptor_creators_; + const int max_receive_message_size_; /// The following completion queues are ONLY used in case of Sync API @@ -238,6 +248,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { /// the \a sync_server_cqs) std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; + /// Outstanding callback requests + std::vector<std::unique_ptr<CallbackRequest>> callback_reqs_; + // Server status std::mutex mu_; bool started_; @@ -262,8 +275,12 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { // A special handler for resource exhausted in sync case std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_; - std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> - interceptor_creators_; + // callback_cq_ references the callbackable completion queue associated + // with this server (if any). It is set on the first call to CallbackCQ(). + // It is _not owned_ by the server; ownership belongs with its internal + // shutdown callback tag (invoked when the CQ is fully shutdown). + // It is protected by mu_ + CompletionQueue* callback_cq_ = nullptr; }; } // namespace grpc diff --git a/include/grpcpp/support/server_callback.h b/include/grpcpp/support/server_callback.h new file mode 100644 index 0000000000..b0aeeb53c5 --- /dev/null +++ b/include/grpcpp/support/server_callback.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H +#define GRPCPP_SUPPORT_SERVER_CALLBACK_H + +#include <grpcpp/impl/codegen/server_callback.h> + +#endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H |