From 2a0c0d7ad6a1256ef6c0398e1900eca2be077a51 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Nov 2018 14:39:44 -0800 Subject: Streaming API for callback servers --- include/grpcpp/impl/codegen/byte_buffer.h | 8 +- include/grpcpp/impl/codegen/callback_common.h | 19 +- include/grpcpp/impl/codegen/server_callback.h | 774 ++++++++++++++++++++++++-- include/grpcpp/impl/codegen/server_context.h | 21 +- 4 files changed, 774 insertions(+), 48 deletions(-) (limited to 'include/grpcpp') diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h index abba5549b8..53ecb53371 100644 --- a/include/grpcpp/impl/codegen/byte_buffer.h +++ b/include/grpcpp/impl/codegen/byte_buffer.h @@ -45,8 +45,10 @@ template class RpcMethodHandler; template class ServerStreamingHandler; -template +template class CallbackUnaryHandler; +template +class CallbackServerStreamingHandler; template class ErrorMethodHandler; template @@ -156,8 +158,10 @@ class ByteBuffer final { friend class internal::RpcMethodHandler; template friend class internal::ServerStreamingHandler; - template + template friend class internal::CallbackUnaryHandler; + template + friend class ::grpc::internal::CallbackServerStreamingHandler; template friend class internal::ErrorMethodHandler; template diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index f7a24204dc..a3c8c41246 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -32,6 +32,8 @@ namespace grpc { namespace internal { /// An exception-safe way of invoking a user-specified callback function +// TODO(vjpai): decide whether it is better for this to take a const lvalue +// parameter or an rvalue parameter, or if it even matters template void CatchingCallback(Func&& func, Args&&... args) { #if GRPC_ALLOW_EXCEPTIONS @@ -45,6 +47,20 @@ void CatchingCallback(Func&& func, Args&&... args) { #endif // GRPC_ALLOW_EXCEPTIONS } +template +ReturnType* CatchingReactorCreator(Func&& func, Args&&... args) { +#if GRPC_ALLOW_EXCEPTIONS + try { + return func(std::forward(args)...); + } catch (...) { + // fail the RPC, don't crash the library + return nullptr; + } +#else // GRPC_ALLOW_EXCEPTIONS + return func(std::forward(args)...); +#endif // GRPC_ALLOW_EXCEPTIONS +} + // The contract on these tags is that they are single-shot. They must be // constructed and then fired at exactly one point. There is no expectation // that they can be reused without reconstruction. @@ -185,8 +201,9 @@ class CallbackWithSuccessTag void* ignored = ops_; // Allow a "false" return value from FinalizeResult to silence the // callback, just as it silences a CQ tag in the async cases + auto* ops = ops_; bool do_callback = ops_->FinalizeResult(&ignored, &ok); - GPR_CODEGEN_ASSERT(ignored == ops_); + GPR_CODEGEN_ASSERT(ignored == ops); if (do_callback) { CatchingCallback(func_, ok); diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index b866fc16dc..1854f6ef2f 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -19,7 +19,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H +#include #include +#include #include #include @@ -32,19 +34,33 @@ namespace grpc { -// forward declarations +// Declare base class of all reactors as internal namespace internal { -template -class CallbackUnaryHandler; + +class ServerReactor { + public: + virtual ~ServerReactor() = default; + virtual void OnDone() {} + virtual void OnCancel() {} +}; + } // namespace internal namespace experimental { +// Forward declarations +template +class ServerReadReactor; +template +class ServerWriteReactor; +template +class ServerBidiReactor; + // For unary RPCs, the exposed controller class is only an interface // and the actual implementation is an internal class. class ServerCallbackRpcController { public: - virtual ~ServerCallbackRpcController() {} + virtual ~ServerCallbackRpcController() = default; // The method handler must call this function when it is done so that // the library knows to free its resources @@ -55,18 +71,193 @@ class ServerCallbackRpcController { virtual void SendInitialMetadata(std::function) = 0; }; +// NOTE: The actual streaming object classes are provided +// as API only to support mocking. There are no implementations of +// these class interfaces in the API. +template +class ServerCallbackReader { + public: + virtual ~ServerCallbackReader() {} + virtual void Finish(Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Read(Request* msg) = 0; + + protected: + template + void BindReactor(ServerReadReactor* reactor) { + reactor->BindReader(this); + } +}; + +template +class ServerCallbackWriter { + public: + virtual ~ServerCallbackWriter() {} + + virtual void Finish(Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Write(const Response* msg, WriteOptions options) = 0; + virtual void WriteAndFinish(const Response* msg, WriteOptions options, + Status s) { + // Default implementation that can/should be overridden + Write(msg, std::move(options)); + Finish(std::move(s)); + }; + + protected: + template + void BindReactor(ServerWriteReactor* reactor) { + reactor->BindWriter(this); + } +}; + +template +class ServerCallbackReaderWriter { + public: + virtual ~ServerCallbackReaderWriter() {} + + virtual void Finish(Status s) = 0; + virtual void SendInitialMetadata() = 0; + virtual void Read(Request* msg) = 0; + virtual void Write(const Response* msg, WriteOptions options) = 0; + virtual void WriteAndFinish(const Response* msg, WriteOptions options, + Status s) { + // Default implementation that can/should be overridden + Write(msg, std::move(options)); + Finish(std::move(s)); + }; + + protected: + void BindReactor(ServerBidiReactor* reactor) { + reactor->BindStream(this); + } +}; + +// The following classes are reactors that are to be implemented +// by the user, returned as the result of the method handler for +// a callback method, and activated by the call to OnStarted +template +class ServerBidiReactor : public internal::ServerReactor { + public: + ~ServerBidiReactor() = default; + virtual void OnStarted(ServerContext*) {} + virtual void OnSendInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + + void StartSendInitialMetadata() { stream_->SendInitialMetadata(); } + void StartRead(Request* msg) { stream_->Read(msg); } + void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } + void StartWrite(const Response* msg, WriteOptions options) { + stream_->Write(msg, std::move(options)); + } + void StartWriteAndFinish(const Response* msg, WriteOptions options, + Status s) { + stream_->WriteAndFinish(msg, std::move(options), std::move(s)); + } + void StartWriteLast(const Response* msg, WriteOptions options) { + StartWrite(msg, std::move(options.set_last_message())); + } + void Finish(Status s) { stream_->Finish(std::move(s)); } + + private: + friend class ServerCallbackReaderWriter; + void BindStream(ServerCallbackReaderWriter* stream) { + stream_ = stream; + } + + ServerCallbackReaderWriter* stream_; +}; + +template +class ServerReadReactor : public internal::ServerReactor { + public: + ~ServerReadReactor() = default; + virtual void OnStarted(ServerContext*, Response* resp) {} + virtual void OnSendInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + + void StartSendInitialMetadata() { reader_->SendInitialMetadata(); } + void StartRead(Request* msg) { reader_->Read(msg); } + void Finish(Status s) { reader_->Finish(std::move(s)); } + + private: + friend class ServerCallbackReader; + void BindReader(ServerCallbackReader* reader) { reader_ = reader; } + + ServerCallbackReader* reader_; +}; + +template +class ServerWriteReactor : public internal::ServerReactor { + public: + ~ServerWriteReactor() = default; + virtual void OnStarted(ServerContext*, const Request* req) {} + virtual void OnSendInitialMetadataDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + + void StartSendInitialMetadata() { writer_->SendInitialMetadata(); } + void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } + void StartWrite(const Response* msg, WriteOptions options) { + writer_->Write(msg, std::move(options)); + } + void StartWriteAndFinish(const Response* msg, WriteOptions options, + Status s) { + writer_->WriteAndFinish(msg, std::move(options), std::move(s)); + } + void StartWriteLast(const Response* msg, WriteOptions options) { + StartWrite(msg, std::move(options.set_last_message())); + } + void Finish(Status s) { writer_->Finish(std::move(s)); } + + private: + friend class ServerCallbackWriter; + void BindWriter(ServerCallbackWriter* writer) { writer_ = writer; } + + ServerCallbackWriter* writer_; +}; + } // namespace experimental namespace internal { -template +template +class UnimplementedReadReactor + : public experimental::ServerReadReactor { + public: + void OnDone() override { delete this; } + void OnStarted(ServerContext*, Response*) override { + this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); + } +}; + +template +class UnimplementedWriteReactor + : public experimental::ServerWriteReactor { + public: + void OnDone() override { delete this; } + void OnStarted(ServerContext*, const Request*) override { + this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); + } +}; + +template +class UnimplementedBidiReactor + : public experimental::ServerBidiReactor { + public: + void OnDone() override { delete this; } + void OnStarted(ServerContext*) override { + this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); + } +}; + +template class CallbackUnaryHandler : public MethodHandler { public: CallbackUnaryHandler( std::function - func, - ServiceType* service) + func) : func_(func) {} void RunHandler(const HandlerParameter& param) final { // Arena allocate a controller structure (that includes request/response) @@ -81,9 +272,8 @@ class CallbackUnaryHandler : public MethodHandler { if (status.ok()) { // Call the actual function handler and expect the user to call finish - CatchingCallback(std::move(func_), param.server_context, - controller->request(), controller->response(), - controller); + CatchingCallback(func_, param.server_context, controller->request(), + controller->response(), controller); } else { // if deserialization failed, we need to fail the call controller->Finish(status); @@ -117,79 +307,579 @@ class CallbackUnaryHandler : public MethodHandler { : public experimental::ServerCallbackRpcController { public: void Finish(Status s) override { - finish_tag_.Set( - call_.call(), - [this](bool) { - grpc_call* call = call_.call(); - auto call_requester = std::move(call_requester_); - this->~ServerCallbackRpcControllerImpl(); // explicitly call - // destructor - g_core_codegen_interface->grpc_call_unref(call); - call_requester(); - }, - &finish_buf_); + finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, + &finish_ops_); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - finish_buf_.set_compression_level(ctx_->compression_level()); + finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (s.ok()) { - finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_buf_.SendMessage(resp_)); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_ops_.SendMessage(resp_)); } else { - finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } - finish_buf_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_buf_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); } void SendInitialMetadata(std::function f) override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_tag_.Set(call_.call(), std::move(f), &meta_buf_); - meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + callbacks_outstanding_++; + // TODO(vjpai): Consider taking f as a move-capture if we adopt C++14 + // and if performance of this operation matters + meta_tag_.Set(call_.call(), + [this, f](bool ok) { + f(ok); + MaybeDone(); + }, + &meta_ops_); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - meta_buf_.set_compression_level(ctx_->compression_level()); + meta_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; - meta_buf_.set_core_cq_tag(&meta_tag_); - call_.PerformOps(&meta_buf_); + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); } private: - template - friend class CallbackUnaryHandler; + friend class CallbackUnaryHandler; ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, - RequestType* req, + const RequestType* req, std::function call_requester) : ctx_(ctx), call_(*call), req_(req), - call_requester_(std::move(call_requester)) {} + call_requester_(std::move(call_requester)) { + ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr); + } ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } - RequestType* request() { return req_; } + const RequestType* request() { return req_; } ResponseType* response() { return &resp_; } - CallOpSet meta_buf_; + void MaybeDone() { + if (--callbacks_outstanding_ == 0) { + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + } + } + + CallOpSet meta_ops_; CallbackWithSuccessTag meta_tag_; CallOpSet - finish_buf_; + finish_ops_; CallbackWithSuccessTag finish_tag_; ServerContext* ctx_; Call call_; - RequestType* req_; + const RequestType* req_; ResponseType resp_; std::function call_requester_; + std::atomic_int callbacks_outstanding_{ + 2}; // reserve for Finish and CompletionOp + }; +}; + +template +class CallbackClientStreamingHandler : public MethodHandler { + public: + CallbackClientStreamingHandler( + std::function< + experimental::ServerReadReactor*()> + func) + : func_(std::move(func)) {} + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a reader structure (that includes response) + g_core_codegen_interface->grpc_call_ref(param.call->call()); + + experimental::ServerReadReactor* reactor = + param.status.ok() + ? CatchingReactorCreator< + experimental::ServerReadReactor>( + func_) + : nullptr; + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new UnimplementedReadReactor; + } + + auto* reader = new (g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackReaderImpl))) + ServerCallbackReaderImpl(param.server_context, param.call, + std::move(param.call_requester), reactor); + + reader->BindReactor(reactor); + reactor->OnStarted(param.server_context, reader->response()); + reader->MaybeDone(); + } + + private: + std::function*()> + func_; + + class ServerCallbackReaderImpl + : public experimental::ServerCallbackReader { + public: + void Finish(Status s) override { + finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, + &finish_ops_); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (s.ok()) { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_ops_.SendMessage(resp_)); + } else { + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + } + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + callbacks_outstanding_++; + meta_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnSendInitialMetadataDone(ok); + MaybeDone(); + }, + &meta_ops_); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Read(RequestType* req) override { + callbacks_outstanding_++; + read_ops_.RecvMessage(req); + call_.PerformOps(&read_ops_); + } + + private: + friend class CallbackClientStreamingHandler; + + ServerCallbackReaderImpl( + ServerContext* ctx, Call* call, std::function call_requester, + experimental::ServerReadReactor* reactor) + : ctx_(ctx), + call_(*call), + call_requester_(std::move(call_requester)), + reactor_(reactor) { + ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeDone(); + }, + &read_ops_); + read_ops_.set_core_cq_tag(&read_tag_); + } + + ~ServerCallbackReaderImpl() {} + + ResponseType* response() { return &resp_; } + + void MaybeDone() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(); + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackReaderImpl(); // explicitly call destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + } + } + + CallOpSet meta_ops_; + CallbackWithSuccessTag meta_tag_; + CallOpSet + finish_ops_; + CallbackWithSuccessTag finish_tag_; + CallOpSet> read_ops_; + CallbackWithSuccessTag read_tag_; + + ServerContext* ctx_; + Call call_; + ResponseType resp_; + std::function call_requester_; + experimental::ServerReadReactor* reactor_; + std::atomic_int callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp + }; +}; + +template +class CallbackServerStreamingHandler : public MethodHandler { + public: + CallbackServerStreamingHandler( + std::function< + experimental::ServerWriteReactor*()> + func) + : func_(std::move(func)) {} + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a writer structure + g_core_codegen_interface->grpc_call_ref(param.call->call()); + + experimental::ServerWriteReactor* reactor = + param.status.ok() + ? CatchingReactorCreator< + experimental::ServerWriteReactor>( + func_) + : nullptr; + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new UnimplementedWriteReactor; + } + + auto* writer = new (g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackWriterImpl))) + ServerCallbackWriterImpl(param.server_context, param.call, + static_cast(param.request), + std::move(param.call_requester), reactor); + writer->BindReactor(reactor); + reactor->OnStarted(param.server_context, writer->request()); + writer->MaybeDone(); + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + Status* status) final { + ByteBuffer buf; + buf.set_buffer(req); + auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); + *status = SerializationTraits::Deserialize(&buf, request); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; + } + + private: + std::function*()> + func_; + + class ServerCallbackWriterImpl + : public experimental::ServerCallbackWriter { + public: + void Finish(Status s) override { + finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, + &finish_ops_); + finish_ops_.set_core_cq_tag(&finish_tag_); + + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + callbacks_outstanding_++; + meta_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnSendInitialMetadataDone(ok); + MaybeDone(); + }, + &meta_ops_); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Write(const ResponseType* resp, WriteOptions options) override { + callbacks_outstanding_++; + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->sent_initial_metadata_) { + write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const ResponseType* resp, WriteOptions options, + Status s) override { + // This combines the write into the finish callback + // Don't send any message if the status is bad + if (s.ok()) { + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok()); + } + Finish(std::move(s)); + } + + private: + friend class CallbackServerStreamingHandler; + + ServerCallbackWriterImpl( + ServerContext* ctx, Call* call, const RequestType* req, + std::function call_requester, + experimental::ServerWriteReactor* reactor) + : ctx_(ctx), + call_(*call), + req_(req), + call_requester_(std::move(call_requester)), + reactor_(reactor) { + ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeDone(); + }, + &write_ops_); + write_ops_.set_core_cq_tag(&write_tag_); + } + ~ServerCallbackWriterImpl() { req_->~RequestType(); } + + const RequestType* request() { return req_; } + + void MaybeDone() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(); + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackWriterImpl(); // explicitly call destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + } + } + + CallOpSet meta_ops_; + CallbackWithSuccessTag meta_tag_; + CallOpSet + finish_ops_; + CallbackWithSuccessTag finish_tag_; + CallOpSet write_ops_; + CallbackWithSuccessTag write_tag_; + + ServerContext* ctx_; + Call call_; + const RequestType* req_; + std::function call_requester_; + experimental::ServerWriteReactor* reactor_; + std::atomic_int callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp + }; +}; + +template +class CallbackBidiHandler : public MethodHandler { + public: + CallbackBidiHandler( + std::function< + experimental::ServerBidiReactor*()> + func) + : func_(std::move(func)) {} + void RunHandler(const HandlerParameter& param) final { + g_core_codegen_interface->grpc_call_ref(param.call->call()); + + experimental::ServerBidiReactor* reactor = + param.status.ok() + ? CatchingReactorCreator< + experimental::ServerBidiReactor>( + func_) + : nullptr; + + if (reactor == nullptr) { + // if deserialization or reactor creator failed, we need to fail the call + reactor = new UnimplementedBidiReactor; + } + + auto* stream = new (g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) + ServerCallbackReaderWriterImpl(param.server_context, param.call, + std::move(param.call_requester), + reactor); + + stream->BindReactor(reactor); + reactor->OnStarted(param.server_context); + stream->MaybeDone(); + } + + private: + std::function*()> + func_; + + class ServerCallbackReaderWriterImpl + : public experimental::ServerCallbackReaderWriter { + public: + void Finish(Status s) override { + finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, + &finish_ops_); + finish_ops_.set_core_cq_tag(&finish_tag_); + + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); + call_.PerformOps(&finish_ops_); + } + + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + callbacks_outstanding_++; + meta_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnSendInitialMetadataDone(ok); + MaybeDone(); + }, + &meta_ops_); + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_ops_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_ops_); + } + + void Write(const ResponseType* resp, WriteOptions options) override { + callbacks_outstanding_++; + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->sent_initial_metadata_) { + write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const ResponseType* resp, WriteOptions options, + Status s) override { + // Don't send any message if the status is bad + if (s.ok()) { + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok()); + } + Finish(std::move(s)); + } + + void Read(RequestType* req) override { + callbacks_outstanding_++; + read_ops_.RecvMessage(req); + call_.PerformOps(&read_ops_); + } + + private: + friend class CallbackBidiHandler; + + ServerCallbackReaderWriterImpl( + ServerContext* ctx, Call* call, std::function call_requester, + experimental::ServerBidiReactor* reactor) + : ctx_(ctx), + call_(*call), + call_requester_(std::move(call_requester)), + reactor_(reactor) { + ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, reactor); + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeDone(); + }, + &write_ops_); + write_ops_.set_core_cq_tag(&write_tag_); + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeDone(); + }, + &read_ops_); + read_ops_.set_core_cq_tag(&read_tag_); + } + ~ServerCallbackReaderWriterImpl() {} + + void MaybeDone() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(); + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + } + } + + CallOpSet meta_ops_; + CallbackWithSuccessTag meta_tag_; + CallOpSet + finish_ops_; + CallbackWithSuccessTag finish_tag_; + CallOpSet write_ops_; + CallbackWithSuccessTag write_tag_; + CallOpSet> read_ops_; + CallbackWithSuccessTag read_tag_; + + ServerContext* ctx_; + Call call_; + std::function call_requester_; + experimental::ServerBidiReactor* reactor_; + std::atomic_int callbacks_outstanding_{ + 3}; // reserve for OnStarted, Finish, and CompletionOp }; }; diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 82ee862f61..4a5f9e2dd9 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -66,13 +66,20 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; -template +template class CallbackUnaryHandler; +template +class CallbackClientStreamingHandler; +template +class CallbackServerStreamingHandler; +template +class CallbackBidiHandler; template class TemplatedBidiStreamingHandler; template class ErrorMethodHandler; class Call; +class ServerReactor; } // namespace internal class CompletionQueue; @@ -270,8 +277,14 @@ class ServerContext { friend class ::grpc::internal::ServerStreamingHandler; template friend class ::grpc::internal::TemplatedBidiStreamingHandler; - template + template friend class ::grpc::internal::CallbackUnaryHandler; + template + friend class ::grpc::internal::CallbackClientStreamingHandler; + template + friend class ::grpc::internal::CallbackServerStreamingHandler; + template + friend class ::grpc::internal::CallbackBidiHandler; template friend class internal::ErrorMethodHandler; friend class ::grpc::ClientContext; @@ -282,7 +295,9 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(internal::Call* call, bool callback); + void BeginCompletionOp(internal::Call* call, + std::function callback, + internal::ServerReactor* reactor); /// Return the tag queued by BeginCompletionOp() internal::CompletionQueueTag* GetCompletionOpTag(); -- cgit v1.2.3