diff options
Diffstat (limited to 'include/grpcpp/impl/codegen/client_callback.h')
-rw-r--r-- | include/grpcpp/impl/codegen/client_callback.h | 641 |
1 files changed, 641 insertions, 0 deletions
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 4baa819091..4d9579fd6a 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -22,6 +22,7 @@ #include <functional> #include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call_op_set.h> #include <grpcpp/impl/codegen/callback_common.h> #include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/config.h> @@ -88,6 +89,646 @@ class CallbackUnaryCallImpl { call.PerformOps(ops); } }; +} // namespace internal + +namespace experimental { + +// Forward declarations +template <class Request, class Response> +class ClientBidiReactor; +template <class Response> +class ClientReadReactor; +template <class Request> +class ClientWriteReactor; + +// NOTE: The streaming objects are not actually implemented in the public API. +// These interfaces are provided for mocking only. Typical applications +// will interact exclusively with the reactors that they define. +template <class Request, class Response> +class ClientCallbackReaderWriter { + public: + virtual ~ClientCallbackReaderWriter() {} + virtual void StartCall() = 0; + virtual void Write(const Request* req, WriteOptions options) = 0; + virtual void WritesDone() = 0; + virtual void Read(Response* resp) = 0; + + protected: + void BindReactor(ClientBidiReactor<Request, Response>* reactor) { + reactor->BindStream(this); + } +}; + +template <class Response> +class ClientCallbackReader { + public: + virtual ~ClientCallbackReader() {} + virtual void StartCall() = 0; + virtual void Read(Response* resp) = 0; + + protected: + void BindReactor(ClientReadReactor<Response>* reactor) { + reactor->BindReader(this); + } +}; + +template <class Request> +class ClientCallbackWriter { + public: + virtual ~ClientCallbackWriter() {} + virtual void StartCall() = 0; + void Write(const Request* req) { Write(req, WriteOptions()); } + virtual void Write(const Request* req, WriteOptions options) = 0; + void WriteLast(const Request* req, WriteOptions options) { + Write(req, options.set_last_message()); + } + virtual void WritesDone() = 0; + + protected: + void BindReactor(ClientWriteReactor<Request>* reactor) { + reactor->BindWriter(this); + } +}; + +// The user must implement this reactor interface with reactions to each event +// type that gets called by the library. An empty reaction is provided by +// default +template <class Request, class Response> +class ClientBidiReactor { + public: + virtual ~ClientBidiReactor() {} + virtual void OnDone(const Status& s) {} + virtual void OnReadInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + virtual void OnWritesDoneDone(bool ok) {} + + void StartCall() { stream_->StartCall(); } + void StartRead(Response* resp) { stream_->Read(resp); } + void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); } + void StartWrite(const Request* req, WriteOptions options) { + stream_->Write(req, std::move(options)); + } + void StartWriteLast(const Request* req, WriteOptions options) { + StartWrite(req, std::move(options.set_last_message())); + } + void StartWritesDone() { stream_->WritesDone(); } + + private: + friend class ClientCallbackReaderWriter<Request, Response>; + void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) { + stream_ = stream; + } + ClientCallbackReaderWriter<Request, Response>* stream_; +}; + +template <class Response> +class ClientReadReactor { + public: + virtual ~ClientReadReactor() {} + virtual void OnDone(const Status& s) {} + virtual void OnReadInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + + void StartCall() { reader_->StartCall(); } + void StartRead(Response* resp) { reader_->Read(resp); } + + private: + friend class ClientCallbackReader<Response>; + void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; } + ClientCallbackReader<Response>* reader_; +}; + +template <class Request> +class ClientWriteReactor { + public: + virtual ~ClientWriteReactor() {} + virtual void OnDone(const Status& s) {} + virtual void OnReadInitialMetadataDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + virtual void OnWritesDoneDone(bool ok) {} + + void StartCall() { writer_->StartCall(); } + void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); } + void StartWrite(const Request* req, WriteOptions options) { + writer_->Write(req, std::move(options)); + } + void StartWriteLast(const Request* req, WriteOptions options) { + StartWrite(req, std::move(options.set_last_message())); + } + void StartWritesDone() { writer_->WritesDone(); } + + private: + friend class ClientCallbackWriter<Request>; + void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; } + ClientCallbackWriter<Request>* writer_; +}; + +} // namespace experimental + +namespace internal { + +// Forward declare factory classes for friendship +template <class Request, class Response> +class ClientCallbackReaderWriterFactory; +template <class Response> +class ClientCallbackReaderFactory; +template <class Request> +class ClientCallbackWriterFactory; + +template <class Request, class Response> +class ClientCallbackReaderWriterImpl + : public ::grpc::experimental::ClientCallbackReaderWriter<Request, + Response> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientCallbackReaderWriterImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + void MaybeFinish() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(finish_status_); + auto* call = call_.call(); + this->~ClientCallbackReaderWriterImpl(); + g_core_codegen_interface->grpc_call_unref(call); + } + } + + void StartCall() override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Any read backlog + // 3. Recv trailing metadata, on_completion callback + // 4. Any write backlog + started_ = true; + + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_); + if (!start_corked_) { + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + } + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + // Also set up the read and write tags so that they don't have to be set up + // each time + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(); + }, + &write_ops_); + write_ops_.set_core_cq_tag(&write_tag_); + + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(); + }, + &read_ops_); + read_ops_.set_core_cq_tag(&read_tag_); + if (read_ops_at_start_) { + call_.PerformOps(&read_ops_); + } + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + + if (write_ops_at_start_) { + call_.PerformOps(&write_ops_); + } + + if (writes_done_ops_at_start_) { + call_.PerformOps(&writes_done_ops_); + } + } + + void Read(Response* msg) override { + read_ops_.RecvMessage(msg); + callbacks_outstanding_++; + if (started_) { + call_.PerformOps(&read_ops_); + } else { + read_ops_at_start_ = true; + } + } + + void Write(const Request* msg, WriteOptions options) override { + if (start_corked_) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok()); + + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + callbacks_outstanding_++; + if (started_) { + call_.PerformOps(&write_ops_); + } else { + write_ops_at_start_ = true; + } + } + void WritesDone() override { + if (start_corked_) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + writes_done_ops_.ClientSendClose(); + writes_done_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(); + }, + &writes_done_ops_); + writes_done_ops_.set_core_cq_tag(&writes_done_tag_); + callbacks_outstanding_++; + if (started_) { + call_.PerformOps(&writes_done_ops_); + } else { + writes_done_ops_at_start_ = true; + } + } + + private: + friend class ClientCallbackReaderWriterFactory<Request, Response>; + + ClientCallbackReaderWriterImpl( + Call call, ClientContext* context, + ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor) + : context_(context), + call_(call), + reactor_(reactor), + start_corked_(context_->initial_metadata_corked_) { + this->BindReactor(reactor); + } + + ClientContext* context_; + Call call_; + ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor_; + + CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_; + CallbackWithSuccessTag start_tag_; + bool start_corked_; + + CallOpSet<CallOpClientRecvStatus> finish_ops_; + CallbackWithSuccessTag finish_tag_; + Status finish_status_; + + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + write_ops_; + CallbackWithSuccessTag write_tag_; + bool write_ops_at_start_{false}; + + CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_; + CallbackWithSuccessTag writes_done_tag_; + bool writes_done_ops_at_start_{false}; + + CallOpSet<CallOpRecvMessage<Response>> read_ops_; + CallbackWithSuccessTag read_tag_; + bool read_ops_at_start_{false}; + + // Minimum of 2 outstanding callbacks to pre-register for start and finish + std::atomic_int callbacks_outstanding_{2}; + bool started_{false}; +}; + +template <class Request, class Response> +class ClientCallbackReaderWriterFactory { + public: + static void Create( + ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, + ClientContext* context, + ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor) { + Call call = channel->CreateCall(method, context, channel->CallbackCQ()); + + g_core_codegen_interface->grpc_call_ref(call.call()); + new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>))) + ClientCallbackReaderWriterImpl<Request, Response>(call, context, + reactor); + } +}; + +template <class Response> +class ClientCallbackReaderImpl + : public ::grpc::experimental::ClientCallbackReader<Response> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientCallbackReaderImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + void MaybeFinish() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(finish_status_); + auto* call = call_.call(); + this->~ClientCallbackReaderImpl(); + g_core_codegen_interface->grpc_call_unref(call); + } + } + + void StartCall() override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Any backlog + // 3. Recv trailing metadata, on_completion callback + started_ = true; + + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_); + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + // Also set up the read tag so it doesn't have to be set up each time + read_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadDone(ok); + MaybeFinish(); + }, + &read_ops_); + read_ops_.set_core_cq_tag(&read_tag_); + if (read_ops_at_start_) { + call_.PerformOps(&read_ops_); + } + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + } + + void Read(Response* msg) override { + read_ops_.RecvMessage(msg); + callbacks_outstanding_++; + if (started_) { + call_.PerformOps(&read_ops_); + } else { + read_ops_at_start_ = true; + } + } + + private: + friend class ClientCallbackReaderFactory<Response>; + + template <class Request> + ClientCallbackReaderImpl( + Call call, ClientContext* context, Request* request, + ::grpc::experimental::ClientReadReactor<Response>* reactor) + : context_(context), call_(call), reactor_(reactor) { + this->BindReactor(reactor); + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok()); + start_ops_.ClientSendClose(); + } + + ClientContext* context_; + Call call_; + ::grpc::experimental::ClientReadReactor<Response>* reactor_; + + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose, + CallOpRecvInitialMetadata> + start_ops_; + CallbackWithSuccessTag start_tag_; + + CallOpSet<CallOpClientRecvStatus> finish_ops_; + CallbackWithSuccessTag finish_tag_; + Status finish_status_; + + CallOpSet<CallOpRecvMessage<Response>> read_ops_; + CallbackWithSuccessTag read_tag_; + bool read_ops_at_start_{false}; + + // Minimum of 2 outstanding callbacks to pre-register for start and finish + std::atomic_int callbacks_outstanding_{2}; + bool started_{false}; +}; + +template <class Response> +class ClientCallbackReaderFactory { + public: + template <class Request> + static void Create( + ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, + ClientContext* context, const Request* request, + ::grpc::experimental::ClientReadReactor<Response>* reactor) { + Call call = channel->CreateCall(method, context, channel->CallbackCQ()); + + g_core_codegen_interface->grpc_call_ref(call.call()); + new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackReaderImpl<Response>))) + ClientCallbackReaderImpl<Response>(call, context, request, reactor); + } +}; + +template <class Request> +class ClientCallbackWriterImpl + : public ::grpc::experimental::ClientCallbackWriter<Request> { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientCallbackWriterImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + void MaybeFinish() { + if (--callbacks_outstanding_ == 0) { + reactor_->OnDone(finish_status_); + auto* call = call_.call(); + this->~ClientCallbackWriterImpl(); + g_core_codegen_interface->grpc_call_unref(call); + } + } + + void StartCall() override { + // This call initiates two batches, plus any backlog, each with a callback + // 1. Send initial metadata (unless corked) + recv initial metadata + // 2. Recv trailing metadata, on_completion callback + // 3. Any backlog + started_ = true; + + start_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnReadInitialMetadataDone(ok); + MaybeFinish(); + }, + &start_ops_); + if (!start_corked_) { + start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + } + start_ops_.RecvInitialMetadata(context_); + start_ops_.set_core_cq_tag(&start_tag_); + call_.PerformOps(&start_ops_); + + // Also set up the read and write tags so that they don't have to be set up + // each time + write_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWriteDone(ok); + MaybeFinish(); + }, + &write_ops_); + write_ops_.set_core_cq_tag(&write_tag_); + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + + if (write_ops_at_start_) { + call_.PerformOps(&write_ops_); + } + + if (writes_done_ops_at_start_) { + call_.PerformOps(&writes_done_ops_); + } + } + + void Write(const Request* msg, WriteOptions options) override { + if (start_corked_) { + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + // TODO(vjpai): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok()); + + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + callbacks_outstanding_++; + if (started_) { + call_.PerformOps(&write_ops_); + } else { + write_ops_at_start_ = true; + } + } + void WritesDone() override { + if (start_corked_) { + writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_, + context_->initial_metadata_flags()); + start_corked_ = false; + } + writes_done_ops_.ClientSendClose(); + writes_done_tag_.Set(call_.call(), + [this](bool ok) { + reactor_->OnWritesDoneDone(ok); + MaybeFinish(); + }, + &writes_done_ops_); + writes_done_ops_.set_core_cq_tag(&writes_done_tag_); + callbacks_outstanding_++; + if (started_) { + call_.PerformOps(&writes_done_ops_); + } else { + writes_done_ops_at_start_ = true; + } + } + + private: + friend class ClientCallbackWriterFactory<Request>; + + template <class Response> + ClientCallbackWriterImpl( + Call call, ClientContext* context, Response* response, + ::grpc::experimental::ClientWriteReactor<Request>* reactor) + : context_(context), + call_(call), + reactor_(reactor), + start_corked_(context_->initial_metadata_corked_) { + this->BindReactor(reactor); + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + } + + ClientContext* context_; + Call call_; + ::grpc::experimental::ClientWriteReactor<Request>* reactor_; + + CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_; + CallbackWithSuccessTag start_tag_; + bool start_corked_; + + CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; + CallbackWithSuccessTag finish_tag_; + Status finish_status_; + + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + write_ops_; + CallbackWithSuccessTag write_tag_; + bool write_ops_at_start_{false}; + + CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_; + CallbackWithSuccessTag writes_done_tag_; + bool writes_done_ops_at_start_{false}; + + // Minimum of 2 outstanding callbacks to pre-register for start and finish + std::atomic_int callbacks_outstanding_{2}; + bool started_{false}; +}; + +template <class Request> +class ClientCallbackWriterFactory { + public: + template <class Response> + static void Create( + ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, + ClientContext* context, Response* response, + ::grpc::experimental::ClientWriteReactor<Request>* reactor) { + Call call = channel->CreateCall(method, context, channel->CallbackCQ()); + + g_core_codegen_interface->grpc_call_ref(call.call()); + new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientCallbackWriterImpl<Request>))) + ClientCallbackWriterImpl<Request>(call, context, response, reactor); + } +}; } // namespace internal } // namespace grpc |