diff options
author | Vijay Pai <vpai@google.com> | 2018-11-05 14:39:44 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-11-30 16:14:21 -0800 |
commit | 2a0c0d7ad6a1256ef6c0398e1900eca2be077a51 (patch) | |
tree | 031f2a881661ead48fd9d86d56a6b33a01f64ca9 /include | |
parent | cdea58eb9a06240062793198739f5278d7193c3a (diff) |
Streaming API for callback servers
Diffstat (limited to 'include')
-rw-r--r-- | include/grpcpp/impl/codegen/byte_buffer.h | 8 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/callback_common.h | 19 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_callback.h | 774 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_context.h | 21 |
4 files changed, 774 insertions, 48 deletions
diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h index abba5549b8..53ecb53371 100644 --- a/include/grpcpp/impl/codegen/byte_buffer.h +++ b/include/grpcpp/impl/codegen/byte_buffer.h @@ -45,8 +45,10 @@ template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; -template <class ServiceType, class RequestType, class ResponseType> +template <class RequestType, class ResponseType> class CallbackUnaryHandler; +template <class RequestType, class ResponseType> +class CallbackServerStreamingHandler; template <StatusCode code> class ErrorMethodHandler; template <class R> @@ -156,8 +158,10 @@ class ByteBuffer final { friend class internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> friend class internal::ServerStreamingHandler; - template <class ServiceType, class RequestType, class ResponseType> + template <class RequestType, class ResponseType> friend class internal::CallbackUnaryHandler; + template <class RequestType, class ResponseType> + friend class ::grpc::internal::CallbackServerStreamingHandler; template <StatusCode code> friend class internal::ErrorMethodHandler; template <class R> diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index f7a24204dc..a3c8c41246 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -32,6 +32,8 @@ namespace grpc { namespace internal { /// An exception-safe way of invoking a user-specified callback function +// TODO(vjpai): decide whether it is better for this to take a const lvalue +// parameter or an rvalue parameter, or if it even matters template <class Func, class... Args> void CatchingCallback(Func&& func, Args&&... args) { #if GRPC_ALLOW_EXCEPTIONS @@ -45,6 +47,20 @@ void CatchingCallback(Func&& func, Args&&... args) { #endif // GRPC_ALLOW_EXCEPTIONS } +template <class ReturnType, class Func, class... Args> +ReturnType* CatchingReactorCreator(Func&& func, Args&&... args) { +#if GRPC_ALLOW_EXCEPTIONS + try { + return func(std::forward<Args>(args)...); + } catch (...) { + // fail the RPC, don't crash the library + return nullptr; + } +#else // GRPC_ALLOW_EXCEPTIONS + return func(std::forward<Args>(args)...); +#endif // GRPC_ALLOW_EXCEPTIONS +} + // The contract on these tags is that they are single-shot. They must be // constructed and then fired at exactly one point. There is no expectation // that they can be reused without reconstruction. @@ -185,8 +201,9 @@ class CallbackWithSuccessTag void* ignored = ops_; // Allow a "false" return value from FinalizeResult to silence the // callback, just as it silences a CQ tag in the async cases + auto* ops = ops_; bool do_callback = ops_->FinalizeResult(&ignored, &ok); - GPR_CODEGEN_ASSERT(ignored == ops_); + GPR_CODEGEN_ASSERT(ignored == ops); if (do_callback) { CatchingCallback(func_, ok); diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index b866fc16dc..1854f6ef2f 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -19,7 +19,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H +#include <atomic> #include <functional> +#include <type_traits> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/call_op_set.h> @@ -32,19 +34,33 @@ namespace grpc { -// forward declarations +// Declare base class of all reactors as internal namespace internal { -template <class ServiceType, class RequestType, class ResponseType> -class CallbackUnaryHandler; + +class ServerReactor { + public: + virtual ~ServerReactor() = default; + virtual void OnDone() {} + virtual void OnCancel() {} +}; + } // namespace internal namespace experimental { +// Forward declarations +template <class Request, class Response> +class ServerReadReactor; +template <class Request, class Response> +class ServerWriteReactor; +template <class Request, class Response> +class ServerBidiReactor; + // For unary RPCs, the exposed controller class is only an interface // and the actual implementation is an internal class. class ServerCallbackRpcController { public: - virtual ~ServerCallbackRpcController() {} + virtual ~ServerCallbackRpcController() = default; // The method handler must call this function when it is done so that // the library knows to free its resources @@ -55,18 +71,193 @@ class ServerCallbackRpcController { virtual void SendInitialMetadata(std::function<void(bool)>) = 0; }; +// NOTE: The actual streaming object classes are provided +// as API only to support mocking. There are no implementations of +// these class interfaces in the API. +template <class Request> +class ServerCallbackReader { + public: + virtual ~ServerCallbackReader() {} + virtual void Finish(Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Read(Request* msg) = 0; + + protected: + template <class Response> + void BindReactor(ServerReadReactor<Request, Response>* reactor) { + reactor->BindReader(this); + } +}; + +template <class Response> +class ServerCallbackWriter { + public: + virtual ~ServerCallbackWriter() {} + + virtual void Finish(Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Write(const Response* msg, WriteOptions options) = 0; + virtual void WriteAndFinish(const Response* msg, WriteOptions options, + Status s) { + // Default implementation that can/should be overridden + Write(msg, std::move(options)); + Finish(std::move(s)); + }; + + protected: + template <class Request> + void BindReactor(ServerWriteReactor<Request, Response>* reactor) { + reactor->BindWriter(this); + } +}; + +template <class Request, class Response> +class ServerCallbackReaderWriter { + public: + virtual ~ServerCallbackReaderWriter() {} + + virtual void Finish(Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Read(Request* msg) = 0; + virtual void Write(const Response* msg, WriteOptions options) = 0; + virtual void WriteAndFinish(const Response* msg, WriteOptions options, + Status s) { + // Default implementation that can/should be overridden + Write(msg, std::move(options)); + Finish(std::move(s)); + }; + + protected: + void BindReactor(ServerBidiReactor<Request, Response>* reactor) { + reactor->BindStream(this); + } +}; + +// The following classes are reactors that are to be implemented +// by the user, returned as the result of the method handler for +// a callback method, and activated by the call to OnStarted +template <class Request, class Response> +class ServerBidiReactor : public internal::ServerReactor { + public: + ~ServerBidiReactor() = default; + virtual void OnStarted(ServerContext*) {} + virtual void OnSendInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + + void StartSendInitialMetadata() { stream_->SendInitialMetadata(); } + void StartRead(Request* msg) { stream_->Read(msg); } + void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } + void StartWrite(const Response* msg, WriteOptions options) { + stream_->Write(msg, std::move(options)); + } + void StartWriteAndFinish(const Response* msg, WriteOptions options, + Status s) { + stream_->WriteAndFinish(msg, std::move(options), std::move(s)); + } + void StartWriteLast(const Response* msg, WriteOptions options) { + StartWrite(msg, std::move(options.set_last_message())); + } + void Finish(Status s) { stream_->Finish(std::move(s)); } + + private: + friend class ServerCallbackReaderWriter<Request, Response>; + void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) { + stream_ = stream; + } + + ServerCallbackReaderWriter<Request, Response>* stream_; +}; + +template <class Request, class Response> +class ServerReadReactor : public internal::ServerReactor { + public: + ~ServerReadReactor() = default; + virtual void OnStarted(ServerContext*, Response* resp) {} + virtual void OnSendInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + + void StartSendInitialMetadata() { reader_->SendInitialMetadata(); } + void StartRead(Request* msg) { reader_->Read(msg); } + void Finish(Status s) { reader_->Finish(std::move(s)); } + + private: + friend class ServerCallbackReader<Request>; + void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; } + + ServerCallbackReader<Request>* reader_; +}; + +template <class Request, class Response> +class ServerWriteReactor : public internal::ServerReactor { + public: + ~ServerWriteReactor() = default; + virtual void OnStarted(ServerContext*, const Request* req) {} + virtual void OnSendInitialMetadataDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + + void StartSendInitialMetadata() { writer_->SendInitialMetadata(); } + void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } + void StartWrite(const Response* msg, WriteOptions options) { + writer_->Write(msg, std::move(options)); + } + void StartWriteAndFinish(const Response* msg, WriteOptions options, + Status s) { + writer_->WriteAndFinish(msg, std::move(options), std::move(s)); + } + void StartWriteLast(const Response* msg, WriteOptions options) { + StartWrite(msg, std::move(options.set_last_message())); + } + void Finish(Status s) { writer_->Finish(std::move(s)); } + + private: + friend class ServerCallbackWriter<Response>; + void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; } + + ServerCallbackWriter<Response>* writer_; +}; + } // namespace experimental namespace internal { -template <class ServiceType, class RequestType, class ResponseType> +template <class Request, class Response> +class UnimplementedReadReactor + : public experimental::ServerReadReactor<Request, Response> { + public: + void OnDone() override { delete this; } + void OnStarted(ServerContext*, Response*) override { + this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); + } +}; + +template <class Request, class Response> +class UnimplementedWriteReactor + : public experimental::ServerWriteReactor<Request, Response> { + public: + void OnDone() override { delete this; } + void OnStarted(ServerContext*, const Request*) override { + this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); + } +}; + +template <class Request, class Response> +class UnimplementedBidiReactor + : public experimental::ServerBidiReactor<Request, Response> { + public: + void OnDone() override { delete this; } + void OnStarted(ServerContext*) override { + this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); + } +}; + +template <class RequestType, class ResponseType> class CallbackUnaryHandler : public MethodHandler { public: CallbackUnaryHandler( std::function<void(ServerContext*, const RequestType*, ResponseType*, experimental::ServerCallbackRpcController*)> - func, - ServiceType* service) + func) : func_(func) {} void RunHandler(const HandlerParameter& param) final { // Arena allocate a controller structure (that includes request/response) @@ -81,9 +272,8 @@ class CallbackUnaryHandler : public MethodHandler { if (status.ok()) { // Call the actual function handler and expect the user to call finish - CatchingCallback(std::move(func_), param.server_context, - controller->request(), controller->response(), - controller); + CatchingCallback(func_, param.server_context, controller->request(), + controller->response(), controller); } else { // if deserialization failed, we need to fail the call controller->Finish(status); @@ -117,79 +307,579 @@ class CallbackUnaryHandler : public MethodHandler { : public experimental::ServerCallbackRpcController { public: void Finish(Status s) override { - finish_tag_.Set( - call_.call(), - [this](bool) { - grpc_call* call = call_.call(); - auto call_requester = std::move(call_requester_); - this->~ServerCallbackRpcControllerImpl(); // explicitly call - // destructor - g_core_codegen_interface->grpc_call_unref(call); - call_requester(); - }, - &finish_buf_); + finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, + &finish_ops_); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - finish_buf_.set_compression_level(ctx_->compression_level()); + finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (s.ok()) { - finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_buf_.SendMessage(resp_)); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_ops_.SendMessage(resp_)); } else { - finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } - finish_buf_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_buf_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); } void SendInitialMetadata(std::function<void(bool)> f) override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_tag_.Set(call_.call(), std::move(f), &meta_buf_); - meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + callbacks_outstanding_++; + // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14 + // and if performance of this operation matters + meta_tag_.Set(call_.call(), + [this, f](bool ok) { + f(ok); + MaybeDone(); + }, + &meta_ops_); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - meta_buf_.set_compression_level(ctx_->compression_level()); + meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; - meta_buf_.set_core_cq_tag(&meta_tag_); - call_.PerformOps(&meta_buf_); + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); } private: - template <class SrvType, class ReqType, class RespType> - friend class CallbackUnaryHandler; + friend class CallbackUnaryHandler<RequestType, ResponseType>; ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, - RequestType* req, + const RequestType* req, std::function<void()> call_requester) : ctx_(ctx), call_(*call), req_(req), - call_requester_(std::move(call_requester)) {} + call_requester_(std::move(call_requester)) { + ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr); + } ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } - RequestType* request() { return req_; } + const RequestType* request() { return req_; } ResponseType* response() { return &resp_; } - CallOpSet<CallOpSendInitialMetadata> meta_buf_; + void MaybeDone() { + if (--callbacks_outstanding_ == 0) { + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + } + } + + CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> - finish_buf_; + finish_ops_; CallbackWithSuccessTag finish_tag_; ServerContext* ctx_; Call call_; - RequestType* req_; + const RequestType* req_; ResponseType resp_; std::function<void()> call_requester_; + std::atomic_int callbacks_outstanding_{ + 2}; // reserve for Finish and CompletionOp + }; +}; + +template <class RequestType, class ResponseType> +class CallbackClientStreamingHandler : public MethodHandler { + public: + CallbackClientStreamingHandler( + std::function< + experimental::ServerReadReactor<RequestType, ResponseType>*()> + func) + : func_(std::move(func)) {} + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a reader structure (that includes response) + g_core_codegen_interface->grpc_call_ref(param.call->call()); + + experimental::ServerReadReactor<RequestType, ResponseType>* reactor = + param.status.ok() + ? CatchingReactorCreator< + experimental::ServerReadReactor<RequestType, ResponseType>>( + func_) + : nullptr; + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new UnimplementedReadReactor<RequestType, ResponseType>; + } + + auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackReaderImpl))) + ServerCallbackReaderImpl(param.server_context, param.call, + std::move(param.call_requester), reactor); + + reader->BindReactor(reactor); + reactor->OnStarted(param.server_context, reader->response()); + reader->MaybeDone(); + } + + private: + std::function<experimental::ServerReadReactor<RequestType, ResponseType>*()> + func_; + + class ServerCallbackReaderImpl + : public experimental::ServerCallbackReader<RequestType> { + public: + void Finish(Status s) override { + finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, + &finish_ops_); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (s.ok()) { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_ops_.SendMessage(resp_)); + } else { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + } + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + callbacks_outstanding_++; + meta_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnSendInitialMetadataDone(ok); + MaybeDone(); + }, + &meta_ops_); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Read(RequestType* req) override { + callbacks_outstanding_++; + read_ops_.RecvMessage(req); + call_.PerformOps(&read_ops_); + } + + private: + friend class CallbackClientStreamingHandler<RequestType, ResponseType>; + + ServerCallbackReaderImpl( + ServerContext* ctx, Call* call, std::function<void()> call_requester, + experimental::ServerReadReactor<RequestType, ResponseType>* reactor) + : ctx_(ctx), + call_(*call), + call_requester_(std::move(call_requester)), + reactor_(reactor) { + ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeDone(); + }, + &read_ops_); + read_ops_.set_core_cq_tag(&read_tag_); + } + + ~ServerCallbackReaderImpl() {} + + ResponseType* response() { return &resp_; } + + void MaybeDone() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(); + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackReaderImpl(); // explicitly call destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + } + } + + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallbackWithSuccessTag meta_tag_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + finish_ops_; + CallbackWithSuccessTag finish_tag_; + CallOpSet<CallOpRecvMessage<RequestType>> read_ops_; + CallbackWithSuccessTag read_tag_; + + ServerContext* ctx_; + Call call_; + ResponseType resp_; + std::function<void()> call_requester_; + experimental::ServerReadReactor<RequestType, ResponseType>* reactor_; + std::atomic_int callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp + }; +}; + +template <class RequestType, class ResponseType> +class CallbackServerStreamingHandler : public MethodHandler { + public: + CallbackServerStreamingHandler( + std::function< + experimental::ServerWriteReactor<RequestType, ResponseType>*()> + func) + : func_(std::move(func)) {} + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a writer structure + g_core_codegen_interface->grpc_call_ref(param.call->call()); + + experimental::ServerWriteReactor<RequestType, ResponseType>* reactor = + param.status.ok() + ? CatchingReactorCreator< + experimental::ServerWriteReactor<RequestType, ResponseType>>( + func_) + : nullptr; + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new UnimplementedWriteReactor<RequestType, ResponseType>; + } + + auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackWriterImpl))) + ServerCallbackWriterImpl(param.server_context, param.call, + static_cast<RequestType*>(param.request), + std::move(param.call_requester), reactor); + writer->BindReactor(reactor); + reactor->OnStarted(param.server_context, writer->request()); + writer->MaybeDone(); + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + Status* status) final { + ByteBuffer buf; + buf.set_buffer(req); + auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); + *status = SerializationTraits<RequestType>::Deserialize(&buf, request); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; + } + + private: + std::function<experimental::ServerWriteReactor<RequestType, ResponseType>*()> + func_; + + class ServerCallbackWriterImpl + : public experimental::ServerCallbackWriter<ResponseType> { + public: + void Finish(Status s) override { + finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, + &finish_ops_); + finish_ops_.set_core_cq_tag(&finish_tag_); + + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + callbacks_outstanding_++; + meta_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnSendInitialMetadataDone(ok); + MaybeDone(); + }, + &meta_ops_); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Write(const ResponseType* resp, WriteOptions options) override { + callbacks_outstanding_++; + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->sent_initial_metadata_) { + write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const ResponseType* resp, WriteOptions options, + Status s) override { + // This combines the write into the finish callback + // Don't send any message if the status is bad + if (s.ok()) { + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok()); + } + Finish(std::move(s)); + } + + private: + friend class CallbackServerStreamingHandler<RequestType, ResponseType>; + + ServerCallbackWriterImpl( + ServerContext* ctx, Call* call, const RequestType* req, + std::function<void()> call_requester, + experimental::ServerWriteReactor<RequestType, ResponseType>* reactor) + : ctx_(ctx), + call_(*call), + req_(req), + call_requester_(std::move(call_requester)), + reactor_(reactor) { + ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeDone(); + }, + &write_ops_); + write_ops_.set_core_cq_tag(&write_tag_); + } + ~ServerCallbackWriterImpl() { req_->~RequestType(); } + + const RequestType* request() { return req_; } + + void MaybeDone() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(); + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackWriterImpl(); // explicitly call destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + } + } + + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallbackWithSuccessTag meta_tag_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + finish_ops_; + CallbackWithSuccessTag finish_tag_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallbackWithSuccessTag write_tag_; + + ServerContext* ctx_; + Call call_; + const RequestType* req_; + std::function<void()> call_requester_; + experimental::ServerWriteReactor<RequestType, ResponseType>* reactor_; + std::atomic_int callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp + }; +}; + +template <class RequestType, class ResponseType> +class CallbackBidiHandler : public MethodHandler { + public: + CallbackBidiHandler( + std::function< + experimental::ServerBidiReactor<RequestType, ResponseType>*()> + func) + : func_(std::move(func)) {} + void RunHandler(const HandlerParameter& param) final { + g_core_codegen_interface->grpc_call_ref(param.call->call()); + + experimental::ServerBidiReactor<RequestType, ResponseType>* reactor = + param.status.ok() + ? CatchingReactorCreator< + experimental::ServerBidiReactor<RequestType, ResponseType>>( + func_) + : nullptr; + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new UnimplementedBidiReactor<RequestType, ResponseType>; + } + + auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) + ServerCallbackReaderWriterImpl(param.server_context, param.call, + std::move(param.call_requester), + reactor); + + stream->BindReactor(reactor); + reactor->OnStarted(param.server_context); + stream->MaybeDone(); + } + + private: + std::function<experimental::ServerBidiReactor<RequestType, ResponseType>*()> + func_; + + class ServerCallbackReaderWriterImpl + : public experimental::ServerCallbackReaderWriter<RequestType, + ResponseType> { + public: + void Finish(Status s) override { + finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, + &finish_ops_); + finish_ops_.set_core_cq_tag(&finish_tag_); + + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + callbacks_outstanding_++; + meta_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnSendInitialMetadataDone(ok); + MaybeDone(); + }, + &meta_ops_); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Write(const ResponseType* resp, WriteOptions options) override { + callbacks_outstanding_++; + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->sent_initial_metadata_) { + write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const ResponseType* resp, WriteOptions options, + Status s) override { + // Don't send any message if the status is bad + if (s.ok()) { + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok()); + } + Finish(std::move(s)); + } + + void Read(RequestType* req) override { + callbacks_outstanding_++; + read_ops_.RecvMessage(req); + call_.PerformOps(&read_ops_); + } + + private: + friend class CallbackBidiHandler<RequestType, ResponseType>; + + ServerCallbackReaderWriterImpl( + ServerContext* ctx, Call* call, std::function<void()> call_requester, + experimental::ServerBidiReactor<RequestType, ResponseType>* reactor) + : ctx_(ctx), + call_(*call), + call_requester_(std::move(call_requester)), + reactor_(reactor) { + ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeDone(); + }, + &write_ops_); + write_ops_.set_core_cq_tag(&write_tag_); + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeDone(); + }, + &read_ops_); + read_ops_.set_core_cq_tag(&read_tag_); + } + ~ServerCallbackReaderWriterImpl() {} + + void MaybeDone() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(); + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + } + } + + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallbackWithSuccessTag meta_tag_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + finish_ops_; + CallbackWithSuccessTag finish_tag_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallbackWithSuccessTag write_tag_; + CallOpSet<CallOpRecvMessage<RequestType>> read_ops_; + CallbackWithSuccessTag read_tag_; + + ServerContext* ctx_; + Call call_; + std::function<void()> call_requester_; + experimental::ServerBidiReactor<RequestType, ResponseType>* reactor_; + std::atomic_int callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp }; }; diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 82ee862f61..4a5f9e2dd9 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -66,13 +66,20 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; -template <class ServiceType, class RequestType, class ResponseType> +template <class RequestType, class ResponseType> class CallbackUnaryHandler; +template <class RequestType, class ResponseType> +class CallbackClientStreamingHandler; +template <class RequestType, class ResponseType> +class CallbackServerStreamingHandler; +template <class RequestType, class ResponseType> +class CallbackBidiHandler; template <class Streamer, bool WriteNeeded> class TemplatedBidiStreamingHandler; template <StatusCode code> class ErrorMethodHandler; class Call; +class ServerReactor; } // namespace internal class CompletionQueue; @@ -270,8 +277,14 @@ class ServerContext { friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> friend class ::grpc::internal::TemplatedBidiStreamingHandler; - template <class ServiceType, class RequestType, class ResponseType> + template <class RequestType, class ResponseType> friend class ::grpc::internal::CallbackUnaryHandler; + template <class RequestType, class ResponseType> + friend class ::grpc::internal::CallbackClientStreamingHandler; + template <class RequestType, class ResponseType> + friend class ::grpc::internal::CallbackServerStreamingHandler; + template <class RequestType, class ResponseType> + friend class ::grpc::internal::CallbackBidiHandler; template <StatusCode code> friend class internal::ErrorMethodHandler; friend class ::grpc::ClientContext; @@ -282,7 +295,9 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(internal::Call* call, bool callback); + void BeginCompletionOp(internal::Call* call, + std::function<void(bool)> callback, + internal::ServerReactor* reactor); /// Return the tag queued by BeginCompletionOp() internal::CompletionQueueTag* GetCompletionOpTag(); |