/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #include #include #include #include #include #include #include #include #include namespace grpc { // forward declarations namespace internal { template class CallbackUnaryHandler; } // namespace internal namespace experimental { // For unary RPCs, the exposed controller class is only an interface // and the actual implementation is an internal class. class ServerCallbackRpcController { public: virtual ~ServerCallbackRpcController() {} // The method handler must call this function when it is done so that // the library knows to free its resources virtual void Finish(Status s) = 0; // Allow the method handler to push out the initial metadata before // the response and status are ready virtual void SendInitialMetadata(std::function) = 0; }; } // namespace experimental namespace internal { template class CallbackUnaryHandler : public MethodHandler { public: CallbackUnaryHandler( std::function func, ServiceType* service) : func_(func) {} void RunHandler(const HandlerParameter& param) final { // Arena allocate a controller structure (that includes request/response) g_core_codegen_interface->grpc_call_ref(param.call->call()); auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc( param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) ServerCallbackRpcControllerImpl( param.server_context, param.call, static_cast(param.request), std::move(param.call_requester)); Status status = param.status; if (status.ok()) { // Call the actual function handler and expect the user to call finish CatchingCallback(std::move(func_), param.server_context, controller->request(), controller->response(), controller); } else { // if deserialization failed, we need to fail the call controller->Finish(status); } } void* Deserialize(grpc_call* call, grpc_byte_buffer* req, Status* status) final { ByteBuffer buf; buf.set_buffer(req); auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( call, sizeof(RequestType))) RequestType(); *status = SerializationTraits::Deserialize(&buf, request); buf.Release(); if (status->ok()) { return request; } request->~RequestType(); return nullptr; } private: std::function func_; // The implementation class of ServerCallbackRpcController is a private member // of CallbackUnaryHandler since it is never exposed anywhere, and this allows // it to take advantage of CallbackUnaryHandler's friendships. class ServerCallbackRpcControllerImpl : public experimental::ServerCallbackRpcController { public: void Finish(Status s) override { finish_tag_.Set( call_.call(), [this](bool) { grpc_call* call = call_.call(); auto call_requester = std::move(call_requester_); this->~ServerCallbackRpcControllerImpl(); // explicitly call // destructor g_core_codegen_interface->grpc_call_unref(call); call_requester(); }, &finish_buf_); if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_buf_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (s.ok()) { finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, finish_buf_.SendMessage(resp_)); } else { finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s); } finish_buf_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_buf_); } void SendInitialMetadata(std::function f) override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_tag_.Set(call_.call(), std::move(f), &meta_buf_); meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_buf_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; meta_buf_.set_core_cq_tag(&meta_tag_); call_.PerformOps(&meta_buf_); } private: template friend class CallbackUnaryHandler; ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, RequestType* req, std::function call_requester) : ctx_(ctx), call_(*call), req_(req), call_requester_(std::move(call_requester)) {} ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } RequestType* request() { return req_; } ResponseType* response() { return &resp_; } CallOpSet meta_buf_; CallbackWithSuccessTag meta_tag_; CallOpSet finish_buf_; CallbackWithSuccessTag finish_tag_; ServerContext* ctx_; Call call_; RequestType* req_; ResponseType resp_; std::function call_requester_; }; }; } // namespace internal } // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H