From d7eb26648d33912199573b842557b0e92ec3071f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Sun, 4 Nov 2018 23:37:00 -0800 Subject: Client callback streaming --- include/grpcpp/generic/generic_stub.h | 5 + include/grpcpp/impl/codegen/callback_common.h | 8 +- include/grpcpp/impl/codegen/channel_interface.h | 12 + include/grpcpp/impl/codegen/client_callback.h | 505 ++++++++++++++++++++++++ include/grpcpp/impl/codegen/client_context.h | 12 + 5 files changed, 538 insertions(+), 4 deletions(-) (limited to 'include') diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h index d509d9a520..ccbf8a0e55 100644 --- a/include/grpcpp/generic/generic_stub.h +++ b/include/grpcpp/generic/generic_stub.h @@ -24,6 +24,7 @@ #include #include #include +#include #include namespace grpc { @@ -76,6 +77,10 @@ class GenericStub final { const ByteBuffer* request, ByteBuffer* response, std::function on_completion); + experimental::ClientCallbackReaderWriter* + PrepareBidiStreamingCall(ClientContext* context, const grpc::string& method, + experimental::ClientBidiReactor* reactor); + private: GenericStub* stub_; }; diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 51367cf550..f7a24204dc 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -145,18 +145,19 @@ class CallbackWithSuccessTag // or on a tag that has been Set before unless the tag has been cleared. void Set(grpc_call* call, std::function f, CompletionQueueTag* ops) { + GPR_CODEGEN_ASSERT(call_ == nullptr); + g_core_codegen_interface->grpc_call_ref(call); call_ = call; func_ = std::move(f); ops_ = ops; - g_core_codegen_interface->grpc_call_ref(call); functor_run = &CallbackWithSuccessTag::StaticRun; } void Clear() { if (call_ != nullptr) { - func_ = nullptr; grpc_call* call = call_; call_ = nullptr; + func_ = nullptr; g_core_codegen_interface->grpc_call_unref(call); } } @@ -182,10 +183,9 @@ class CallbackWithSuccessTag } void Run(bool ok) { void* ignored = ops_; - bool new_ok = ok; // Allow a "false" return value from FinalizeResult to silence the // callback, just as it silences a CQ tag in the async cases - bool do_callback = ops_->FinalizeResult(&ignored, &new_ok); + bool do_callback = ops_->FinalizeResult(&ignored, &ok); GPR_CODEGEN_ASSERT(ignored == ops_); if (do_callback) { diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index 6ec1ffb8c7..728a7b9049 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -53,6 +53,12 @@ template class ClientAsyncReaderWriterFactory; template class ClientAsyncResponseReaderFactory; +template +class ClientCallbackReaderWriterFactory; +template +class ClientCallbackReaderFactory; +template +class ClientCallbackWriterFactory; class InterceptedChannel; } // namespace internal @@ -106,6 +112,12 @@ class ChannelInterface { friend class ::grpc::internal::ClientAsyncReaderWriterFactory; template friend class ::grpc::internal::ClientAsyncResponseReaderFactory; + template + friend class ::grpc::internal::ClientCallbackReaderWriterFactory; + template + friend class ::grpc::internal::ClientCallbackReaderFactory; + template + friend class ::grpc::internal::ClientCallbackWriterFactory; template friend class ::grpc::internal::BlockingUnaryCallImpl; template diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 4baa819091..01a28a9e6f 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -88,6 +89,510 @@ class CallbackUnaryCallImpl { call.PerformOps(ops); } }; +} // namespace internal + +namespace experimental { + +// The user must implement this reactor interface with reactions to each event +// type that gets called by the library. An empty reaction is provided by +// default + +class ClientBidiReactor { + public: + virtual ~ClientBidiReactor() {} + virtual void OnDone(Status s) {} + virtual void OnReadInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + virtual void OnWritesDoneDone(bool ok) {} +}; + +class ClientReadReactor { + public: + virtual ~ClientReadReactor() {} + virtual void OnDone(Status s) {} + virtual void OnReadInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} +}; + +class ClientWriteReactor { + public: + virtual ~ClientWriteReactor() {} + virtual void OnDone(Status s) {} + virtual void OnReadInitialMetadataDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + virtual void OnWritesDoneDone(bool ok) {} +}; + +template +class ClientCallbackReaderWriter { + public: + virtual ~ClientCallbackReaderWriter() {} + virtual void StartCall() = 0; + void Write(const Request* req) { Write(req, WriteOptions()); } + virtual void Write(const Request* req, WriteOptions options) = 0; + void WriteLast(const Request* req, WriteOptions options) { + Write(req, options.set_last_message()); + } + virtual void WritesDone() = 0; + virtual void Read(Response* resp) = 0; +}; + +template +class ClientCallbackReader { + public: + virtual ~ClientCallbackReader() {} + virtual void StartCall() = 0; + virtual void Read(Response* resp) = 0; +}; + +template +class ClientCallbackWriter { + public: + virtual ~ClientCallbackWriter() {} + virtual void StartCall() = 0; + void Write(const Request* req) { Write(req, WriteOptions()); } + virtual void Write(const Request* req, WriteOptions options) = 0; + void WriteLast(const Request* req, WriteOptions options) { + Write(req, options.set_last_message()); + } + virtual void WritesDone() = 0; +}; + +} // namespace experimental + +namespace internal { + +// Forward declare factory classes for friendship +template +class ClientCallbackReaderWriterFactory; +template +class ClientCallbackReaderFactory; +template +class ClientCallbackWriterFactory; + +template +class ClientCallbackReaderWriterImpl + : public ::grpc::experimental::ClientCallbackReaderWriter { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientCallbackReaderWriterImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + void MaybeFinish() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(std::move(finish_status_)); + auto* call = call_.call(); + this->~ClientCallbackReaderWriterImpl(); + g_core_codegen_interface->grpc_call_unref(call); + } + } + + void StartCall() override { + // This call initiates two batches + // 1. Send initial metadata (unless corked)/recv initial metadata + // 2. Recv trailing metadata, on_completion callback + callbacks_outstanding_ = 2; + + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_); + start_corked_ = context_->initial_metadata_corked_; + if (!start_corked_) { + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + } + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + + // Also set up the read and write tags so that they don't have to be set up + // each time + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(); + }, + &write_ops_); + write_ops_.set_core_cq_tag(&write_tag_); + + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(); + }, + &read_ops_); + read_ops_.set_core_cq_tag(&read_tag_); + } + + void Read(Response* msg) override { + read_ops_.RecvMessage(msg); + callbacks_outstanding_++; + call_.PerformOps(&read_ops_); + } + + void Write(const Request* msg, WriteOptions options) override { + if (start_corked_) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok()); + + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + callbacks_outstanding_++; + call_.PerformOps(&write_ops_); + } + void WritesDone() override { + if (start_corked_) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + writes_done_ops_.ClientSendClose(); + writes_done_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(); + }, + &writes_done_ops_); + writes_done_ops_.set_core_cq_tag(&writes_done_tag_); + callbacks_outstanding_++; + call_.PerformOps(&writes_done_ops_); + } + + private: + friend class ClientCallbackReaderWriterFactory; + + ClientCallbackReaderWriterImpl( + Call call, ClientContext* context, + ::grpc::experimental::ClientBidiReactor* reactor) + : context_(context), call_(call), reactor_(reactor) {} + + ClientContext* context_; + Call call_; + ::grpc::experimental::ClientBidiReactor* reactor_; + + CallOpSet start_ops_; + CallbackWithSuccessTag start_tag_; + bool start_corked_; + + CallOpSet finish_ops_; + CallbackWithSuccessTag finish_tag_; + Status finish_status_; + + CallOpSet + write_ops_; + CallbackWithSuccessTag write_tag_; + + CallOpSet writes_done_ops_; + CallbackWithSuccessTag writes_done_tag_; + + CallOpSet> read_ops_; + CallbackWithSuccessTag read_tag_; + + std::atomic_int callbacks_outstanding_; +}; + +template +class ClientCallbackReaderWriterFactory { + public: + static experimental::ClientCallbackReaderWriter* Create( + ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, + ClientContext* context, + ::grpc::experimental::ClientBidiReactor* reactor) { + Call call = channel->CreateCall(method, context, channel->CallbackCQ()); + + g_core_codegen_interface->grpc_call_ref(call.call()); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackReaderWriterImpl))) + ClientCallbackReaderWriterImpl(call, context, + reactor); + } +}; + +template +class ClientCallbackReaderImpl + : public ::grpc::experimental::ClientCallbackReader { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientCallbackReaderImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + void MaybeFinish() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(std::move(finish_status_)); + auto* call = call_.call(); + this->~ClientCallbackReaderImpl(); + g_core_codegen_interface->grpc_call_unref(call); + } + } + + void StartCall() override { + // This call initiates two batches + // 1. Send initial metadata (unless corked)/recv initial metadata + // 2. Recv trailing metadata, on_completion callback + callbacks_outstanding_ = 2; + + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_); + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + + // Also set up the read tag so it doesn't have to be set up each time + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(); + }, + &read_ops_); + read_ops_.set_core_cq_tag(&read_tag_); + } + + void Read(Response* msg) override { + read_ops_.RecvMessage(msg); + callbacks_outstanding_++; + call_.PerformOps(&read_ops_); + } + + private: + friend class ClientCallbackReaderFactory; + + template + ClientCallbackReaderImpl(Call call, ClientContext* context, Request* request, + ::grpc::experimental::ClientReadReactor* reactor) + : context_(context), call_(call), reactor_(reactor) { + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok()); + start_ops_.ClientSendClose(); + } + + ClientContext* context_; + Call call_; + ::grpc::experimental::ClientReadReactor* reactor_; + + CallOpSet + start_ops_; + CallbackWithSuccessTag start_tag_; + + CallOpSet finish_ops_; + CallbackWithSuccessTag finish_tag_; + Status finish_status_; + + CallOpSet> read_ops_; + CallbackWithSuccessTag read_tag_; + + std::atomic_int callbacks_outstanding_; +}; + +template +class ClientCallbackReaderFactory { + public: + template + static experimental::ClientCallbackReader* Create( + ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, + ClientContext* context, const Request* request, + ::grpc::experimental::ClientReadReactor* reactor) { + Call call = channel->CreateCall(method, context, channel->CallbackCQ()); + + g_core_codegen_interface->grpc_call_ref(call.call()); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackReaderImpl))) + ClientCallbackReaderImpl(call, context, request, reactor); + } +}; + +template +class ClientCallbackWriterImpl + : public ::grpc::experimental::ClientCallbackWriter { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientCallbackWriterImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + void MaybeFinish() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(std::move(finish_status_)); + auto* call = call_.call(); + this->~ClientCallbackWriterImpl(); + g_core_codegen_interface->grpc_call_unref(call); + } + } + + void StartCall() override { + // This call initiates two batches + // 1. Send initial metadata (unless corked)/recv initial metadata + // 2. Recv message + trailing metadata, on_completion callback + callbacks_outstanding_ = 2; + + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_); + start_corked_ = context_->initial_metadata_corked_; + if (!start_corked_) { + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + } + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + + // Also set up the read and write tags so that they don't have to be set up + // each time + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(); + }, + &write_ops_); + write_ops_.set_core_cq_tag(&write_tag_); + } + + void Write(const Request* msg, WriteOptions options) override { + if (start_corked_) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok()); + + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + callbacks_outstanding_++; + call_.PerformOps(&write_ops_); + } + void WritesDone() override { + if (start_corked_) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + writes_done_ops_.ClientSendClose(); + writes_done_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(); + }, + &writes_done_ops_); + writes_done_ops_.set_core_cq_tag(&writes_done_tag_); + callbacks_outstanding_++; + call_.PerformOps(&writes_done_ops_); + } + + private: + friend class ClientCallbackWriterFactory; + + template + ClientCallbackWriterImpl(Call call, ClientContext* context, + Response* response, + ::grpc::experimental::ClientWriteReactor* reactor) + : context_(context), call_(call), reactor_(reactor) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + } + + ClientContext* context_; + Call call_; + ::grpc::experimental::ClientWriteReactor* reactor_; + + CallOpSet start_ops_; + CallbackWithSuccessTag start_tag_; + bool start_corked_; + + CallOpSet finish_ops_; + CallbackWithSuccessTag finish_tag_; + Status finish_status_; + + CallOpSet + write_ops_; + CallbackWithSuccessTag write_tag_; + + CallOpSet writes_done_ops_; + CallbackWithSuccessTag writes_done_tag_; + + std::atomic_int callbacks_outstanding_; +}; + +template +class ClientCallbackWriterFactory { + public: + template + static experimental::ClientCallbackWriter* Create( + ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, + ClientContext* context, Response* response, + ::grpc::experimental::ClientWriteReactor* reactor) { + Call call = channel->CreateCall(method, context, channel->CallbackCQ()); + + g_core_codegen_interface->grpc_call_ref(call.call()); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackWriterImpl))) + ClientCallbackWriterImpl(call, context, response, reactor); + } +}; } // namespace internal } // namespace grpc diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 75b955e760..6059c3c58a 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -71,6 +71,12 @@ template class BlockingUnaryCallImpl; template class CallbackUnaryCallImpl; +template +class ClientCallbackReaderWriterImpl; +template +class ClientCallbackReaderImpl; +template +class ClientCallbackWriterImpl; } // namespace internal template @@ -394,6 +400,12 @@ class ClientContext { friend class ::grpc::internal::BlockingUnaryCallImpl; template friend class ::grpc::internal::CallbackUnaryCallImpl; + template + friend class ::grpc::internal::ClientCallbackReaderWriterImpl; + template + friend class ::grpc::internal::ClientCallbackReaderImpl; + template + friend class ::grpc::internal::ClientCallbackWriterImpl; // Used by friend class CallOpClientRecvStatus void set_debug_error_string(const grpc::string& debug_error_string) { -- cgit v1.2.3