From dac2066a1c0df628fefe4144ae0f97337af6324e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 14 Nov 2018 11:47:15 -0800 Subject: Make StartCall() a releasing operation so that you can pile up ops --- include/grpcpp/impl/codegen/client_callback.h | 150 ++++++++++++++++++-------- 1 file changed, 108 insertions(+), 42 deletions(-) (limited to 'include') diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 49ceaf7ca2..92a588b3c3 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -197,10 +197,12 @@ class ClientCallbackReaderWriterImpl } void StartCall() override { - // This call initiates two batches, each with a callback + // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata - // 2. Recv trailing metadata, on_completion callback - callbacks_outstanding_ = 2; + // 2. Any read backlog + // 3. Recv trailing metadata, on_completion callback + // 4. Any write backlog + started_ = true; start_tag_.Set(call_.call(), [this](bool ok) { @@ -208,7 +210,6 @@ class ClientCallbackReaderWriterImpl MaybeFinish(); }, &start_ops_); - start_corked_ = context_->initial_metadata_corked_; if (!start_corked_) { start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); @@ -217,12 +218,6 @@ class ClientCallbackReaderWriterImpl start_ops_.set_core_cq_tag(&start_tag_); call_.PerformOps(&start_ops_); - finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, - &finish_ops_); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); - // Also set up the read and write tags so that they don't have to be set up // each time write_tag_.Set(call_.call(), @@ -240,12 +235,33 @@ class ClientCallbackReaderWriterImpl }, &read_ops_); read_ops_.set_core_cq_tag(&read_tag_); + if (read_ops_at_start_) { + call_.PerformOps(&read_ops_); + } + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + + if (write_ops_at_start_) { + call_.PerformOps(&write_ops_); + } + + if (writes_done_ops_at_start_) { + call_.PerformOps(&writes_done_ops_); + } } void Read(Response* msg) override { read_ops_.RecvMessage(msg); callbacks_outstanding_++; - call_.PerformOps(&read_ops_); + if (started_) { + call_.PerformOps(&read_ops_); + } else { + read_ops_at_start_ = true; + } } void Write(const Request* msg, WriteOptions options) override { @@ -262,7 +278,11 @@ class ClientCallbackReaderWriterImpl write_ops_.ClientSendClose(); } callbacks_outstanding_++; - call_.PerformOps(&write_ops_); + if (started_) { + call_.PerformOps(&write_ops_); + } else { + write_ops_at_start_ = true; + } } void WritesDone() override { if (start_corked_) { @@ -279,7 +299,11 @@ class ClientCallbackReaderWriterImpl &writes_done_ops_); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); callbacks_outstanding_++; - call_.PerformOps(&writes_done_ops_); + if (started_) { + call_.PerformOps(&writes_done_ops_); + } else { + writes_done_ops_at_start_ = true; + } } private: @@ -288,7 +312,10 @@ class ClientCallbackReaderWriterImpl ClientCallbackReaderWriterImpl( Call call, ClientContext* context, ::grpc::experimental::ClientBidiReactor* reactor) - : context_(context), call_(call), reactor_(reactor) {} + : context_(context), + call_(call), + reactor_(reactor), + start_corked_(context_->initial_metadata_corked_) {} ClientContext* context_; Call call_; @@ -305,14 +332,19 @@ class ClientCallbackReaderWriterImpl CallOpSet write_ops_; CallbackWithSuccessTag write_tag_; + bool write_ops_at_start_{false}; CallOpSet writes_done_ops_; CallbackWithSuccessTag writes_done_tag_; + bool writes_done_ops_at_start_{false}; - CallOpSet> read_ops_; + CallOpSet> read_ops_; CallbackWithSuccessTag read_tag_; + bool read_ops_at_start_{false}; - std::atomic_int callbacks_outstanding_; + // Minimum of 2 outstanding callbacks to pre-register for start and finish + std::atomic_int callbacks_outstanding_{2}; + bool started_{false}; }; template @@ -358,10 +390,11 @@ class ClientCallbackReaderImpl } void StartCall() override { - // This call initiates two batches, each with a callback + // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata - // 2. Recv trailing metadata, on_completion callback - callbacks_outstanding_ = 2; + // 2. Any backlog + // 3. Recv trailing metadata, on_completion callback + started_ = true; start_tag_.Set(call_.call(), [this](bool ok) { @@ -375,12 +408,6 @@ class ClientCallbackReaderImpl start_ops_.set_core_cq_tag(&start_tag_); call_.PerformOps(&start_ops_); - finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, - &finish_ops_); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); - // Also set up the read tag so it doesn't have to be set up each time read_tag_.Set(call_.call(), [this](bool ok) { @@ -389,12 +416,25 @@ class ClientCallbackReaderImpl }, &read_ops_); read_ops_.set_core_cq_tag(&read_tag_); + if (read_ops_at_start_) { + call_.PerformOps(&read_ops_); + } + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); } void Read(Response* msg) override { read_ops_.RecvMessage(msg); callbacks_outstanding_++; - call_.PerformOps(&read_ops_); + if (started_) { + call_.PerformOps(&read_ops_); + } else { + read_ops_at_start_ = true; + } } private: @@ -422,10 +462,13 @@ class ClientCallbackReaderImpl CallbackWithSuccessTag finish_tag_; Status finish_status_; - CallOpSet> read_ops_; + CallOpSet> read_ops_; CallbackWithSuccessTag read_tag_; + bool read_ops_at_start_{false}; - std::atomic_int callbacks_outstanding_; + // Minimum of 2 outstanding callbacks to pre-register for start and finish + std::atomic_int callbacks_outstanding_{2}; + bool started_{false}; }; template @@ -471,10 +514,11 @@ class ClientCallbackWriterImpl } void StartCall() override { - // This call initiates two batches, each with a callback + // This call initiates two batches, plus any backlog, each with a callback // 1. Send initial metadata (unless corked) + recv initial metadata - // 2. Recv message + recv trailing metadata, on_completion callback - callbacks_outstanding_ = 2; + // 2. Recv trailing metadata, on_completion callback + // 3. Any backlog + started_ = true; start_tag_.Set(call_.call(), [this](bool ok) { @@ -482,7 +526,6 @@ class ClientCallbackWriterImpl MaybeFinish(); }, &start_ops_); - start_corked_ = context_->initial_metadata_corked_; if (!start_corked_) { start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); @@ -491,12 +534,6 @@ class ClientCallbackWriterImpl start_ops_.set_core_cq_tag(&start_tag_); call_.PerformOps(&start_ops_); - finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, - &finish_ops_); - finish_ops_.ClientRecvStatus(context_, &finish_status_); - finish_ops_.set_core_cq_tag(&finish_tag_); - call_.PerformOps(&finish_ops_); - // Also set up the read and write tags so that they don't have to be set up // each time write_tag_.Set(call_.call(), @@ -506,6 +543,20 @@ class ClientCallbackWriterImpl }, &write_ops_); write_ops_.set_core_cq_tag(&write_tag_); + + finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, + &finish_ops_); + finish_ops_.ClientRecvStatus(context_, &finish_status_); + finish_ops_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_ops_); + + if (write_ops_at_start_) { + call_.PerformOps(&write_ops_); + } + + if (writes_done_ops_at_start_) { + call_.PerformOps(&writes_done_ops_); + } } void Write(const Request* msg, WriteOptions options) override { @@ -522,7 +573,11 @@ class ClientCallbackWriterImpl write_ops_.ClientSendClose(); } callbacks_outstanding_++; - call_.PerformOps(&write_ops_); + if (started_) { + call_.PerformOps(&write_ops_); + } else { + write_ops_at_start_ = true; + } } void WritesDone() override { if (start_corked_) { @@ -539,7 +594,11 @@ class ClientCallbackWriterImpl &writes_done_ops_); writes_done_ops_.set_core_cq_tag(&writes_done_tag_); callbacks_outstanding_++; - call_.PerformOps(&writes_done_ops_); + if (started_) { + call_.PerformOps(&writes_done_ops_); + } else { + writes_done_ops_at_start_ = true; + } } private: @@ -549,7 +608,10 @@ class ClientCallbackWriterImpl ClientCallbackWriterImpl(Call call, ClientContext* context, Response* response, ::grpc::experimental::ClientWriteReactor* reactor) - : context_(context), call_(call), reactor_(reactor) { + : context_(context), + call_(call), + reactor_(reactor), + start_corked_(context_->initial_metadata_corked_) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); } @@ -569,11 +631,15 @@ class ClientCallbackWriterImpl CallOpSet write_ops_; CallbackWithSuccessTag write_tag_; + bool write_ops_at_start_{false}; CallOpSet writes_done_ops_; CallbackWithSuccessTag writes_done_tag_; + bool writes_done_ops_at_start_{false}; - std::atomic_int callbacks_outstanding_; + // Minimum of 2 outstanding callbacks to pre-register for start and finish + std::atomic_int callbacks_outstanding_{2}; + bool started_{false}; }; template -- cgit v1.2.3