aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-11-14 11:47:15 -0800
committerGravatar Vijay Pai <vpai@google.com>2018-11-30 04:13:40 -0800
commitdac2066a1c0df628fefe4144ae0f97337af6324e (patch)
tree085c3f8e28a7bb9bfccf7fbf97320f5ef6091f8d /include
parent60949a823267d2bfa783f9488dddff515a7928c1 (diff)
Make StartCall() a releasing operation so that you can pile up ops
Diffstat (limited to 'include')
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h150
1 files changed, 108 insertions, 42 deletions
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
index 49ceaf7ca2..92a588b3c3 100644
--- a/include/grpcpp/impl/codegen/client_callback.h
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -197,10 +197,12 @@ class ClientCallbackReaderWriterImpl
}
void StartCall() override {
- // This call initiates two batches, each with a callback
+ // This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
- // 2. Recv trailing metadata, on_completion callback
- callbacks_outstanding_ = 2;
+ // 2. Any read backlog
+ // 3. Recv trailing metadata, on_completion callback
+ // 4. Any write backlog
+ started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
@@ -208,7 +210,6 @@ class ClientCallbackReaderWriterImpl
MaybeFinish();
},
&start_ops_);
- start_corked_ = context_->initial_metadata_corked_;
if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
@@ -217,12 +218,6 @@ class ClientCallbackReaderWriterImpl
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
- finish_ops_.ClientRecvStatus(context_, &finish_status_);
- finish_ops_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_ops_);
-
// Also set up the read and write tags so that they don't have to be set up
// each time
write_tag_.Set(call_.call(),
@@ -240,12 +235,33 @@ class ClientCallbackReaderWriterImpl
},
&read_ops_);
read_ops_.set_core_cq_tag(&read_tag_);
+ if (read_ops_at_start_) {
+ call_.PerformOps(&read_ops_);
+ }
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ if (write_ops_at_start_) {
+ call_.PerformOps(&write_ops_);
+ }
+
+ if (writes_done_ops_at_start_) {
+ call_.PerformOps(&writes_done_ops_);
+ }
}
void Read(Response* msg) override {
read_ops_.RecvMessage(msg);
callbacks_outstanding_++;
- call_.PerformOps(&read_ops_);
+ if (started_) {
+ call_.PerformOps(&read_ops_);
+ } else {
+ read_ops_at_start_ = true;
+ }
}
void Write(const Request* msg, WriteOptions options) override {
@@ -262,7 +278,11 @@ class ClientCallbackReaderWriterImpl
write_ops_.ClientSendClose();
}
callbacks_outstanding_++;
- call_.PerformOps(&write_ops_);
+ if (started_) {
+ call_.PerformOps(&write_ops_);
+ } else {
+ write_ops_at_start_ = true;
+ }
}
void WritesDone() override {
if (start_corked_) {
@@ -279,7 +299,11 @@ class ClientCallbackReaderWriterImpl
&writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++;
- call_.PerformOps(&writes_done_ops_);
+ if (started_) {
+ call_.PerformOps(&writes_done_ops_);
+ } else {
+ writes_done_ops_at_start_ = true;
+ }
}
private:
@@ -288,7 +312,10 @@ class ClientCallbackReaderWriterImpl
ClientCallbackReaderWriterImpl(
Call call, ClientContext* context,
::grpc::experimental::ClientBidiReactor* reactor)
- : context_(context), call_(call), reactor_(reactor) {}
+ : context_(context),
+ call_(call),
+ reactor_(reactor),
+ start_corked_(context_->initial_metadata_corked_) {}
ClientContext* context_;
Call call_;
@@ -305,14 +332,19 @@ class ClientCallbackReaderWriterImpl
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
write_ops_;
CallbackWithSuccessTag write_tag_;
+ bool write_ops_at_start_{false};
CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_;
CallbackWithSuccessTag writes_done_tag_;
+ bool writes_done_ops_at_start_{false};
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<Response>> read_ops_;
+ CallOpSet<CallOpRecvMessage<Response>> read_ops_;
CallbackWithSuccessTag read_tag_;
+ bool read_ops_at_start_{false};
- std::atomic_int callbacks_outstanding_;
+ // Minimum of 2 outstanding callbacks to pre-register for start and finish
+ std::atomic_int callbacks_outstanding_{2};
+ bool started_{false};
};
template <class Request, class Response>
@@ -358,10 +390,11 @@ class ClientCallbackReaderImpl
}
void StartCall() override {
- // This call initiates two batches, each with a callback
+ // This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
- // 2. Recv trailing metadata, on_completion callback
- callbacks_outstanding_ = 2;
+ // 2. Any backlog
+ // 3. Recv trailing metadata, on_completion callback
+ started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
@@ -375,12 +408,6 @@ class ClientCallbackReaderImpl
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
- finish_ops_.ClientRecvStatus(context_, &finish_status_);
- finish_ops_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_ops_);
-
// Also set up the read tag so it doesn't have to be set up each time
read_tag_.Set(call_.call(),
[this](bool ok) {
@@ -389,12 +416,25 @@ class ClientCallbackReaderImpl
},
&read_ops_);
read_ops_.set_core_cq_tag(&read_tag_);
+ if (read_ops_at_start_) {
+ call_.PerformOps(&read_ops_);
+ }
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
}
void Read(Response* msg) override {
read_ops_.RecvMessage(msg);
callbacks_outstanding_++;
- call_.PerformOps(&read_ops_);
+ if (started_) {
+ call_.PerformOps(&read_ops_);
+ } else {
+ read_ops_at_start_ = true;
+ }
}
private:
@@ -422,10 +462,13 @@ class ClientCallbackReaderImpl
CallbackWithSuccessTag finish_tag_;
Status finish_status_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<Response>> read_ops_;
+ CallOpSet<CallOpRecvMessage<Response>> read_ops_;
CallbackWithSuccessTag read_tag_;
+ bool read_ops_at_start_{false};
- std::atomic_int callbacks_outstanding_;
+ // Minimum of 2 outstanding callbacks to pre-register for start and finish
+ std::atomic_int callbacks_outstanding_{2};
+ bool started_{false};
};
template <class Response>
@@ -471,10 +514,11 @@ class ClientCallbackWriterImpl
}
void StartCall() override {
- // This call initiates two batches, each with a callback
+ // This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
- // 2. Recv message + recv trailing metadata, on_completion callback
- callbacks_outstanding_ = 2;
+ // 2. Recv trailing metadata, on_completion callback
+ // 3. Any backlog
+ started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
@@ -482,7 +526,6 @@ class ClientCallbackWriterImpl
MaybeFinish();
},
&start_ops_);
- start_corked_ = context_->initial_metadata_corked_;
if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
@@ -491,12 +534,6 @@ class ClientCallbackWriterImpl
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
- finish_ops_.ClientRecvStatus(context_, &finish_status_);
- finish_ops_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_ops_);
-
// Also set up the read and write tags so that they don't have to be set up
// each time
write_tag_.Set(call_.call(),
@@ -506,6 +543,20 @@ class ClientCallbackWriterImpl
},
&write_ops_);
write_ops_.set_core_cq_tag(&write_tag_);
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ if (write_ops_at_start_) {
+ call_.PerformOps(&write_ops_);
+ }
+
+ if (writes_done_ops_at_start_) {
+ call_.PerformOps(&writes_done_ops_);
+ }
}
void Write(const Request* msg, WriteOptions options) override {
@@ -522,7 +573,11 @@ class ClientCallbackWriterImpl
write_ops_.ClientSendClose();
}
callbacks_outstanding_++;
- call_.PerformOps(&write_ops_);
+ if (started_) {
+ call_.PerformOps(&write_ops_);
+ } else {
+ write_ops_at_start_ = true;
+ }
}
void WritesDone() override {
if (start_corked_) {
@@ -539,7 +594,11 @@ class ClientCallbackWriterImpl
&writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++;
- call_.PerformOps(&writes_done_ops_);
+ if (started_) {
+ call_.PerformOps(&writes_done_ops_);
+ } else {
+ writes_done_ops_at_start_ = true;
+ }
}
private:
@@ -549,7 +608,10 @@ class ClientCallbackWriterImpl
ClientCallbackWriterImpl(Call call, ClientContext* context,
Response* response,
::grpc::experimental::ClientWriteReactor* reactor)
- : context_(context), call_(call), reactor_(reactor) {
+ : context_(context),
+ call_(call),
+ reactor_(reactor),
+ start_corked_(context_->initial_metadata_corked_) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
}
@@ -569,11 +631,15 @@ class ClientCallbackWriterImpl
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
write_ops_;
CallbackWithSuccessTag write_tag_;
+ bool write_ops_at_start_{false};
CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_;
CallbackWithSuccessTag writes_done_tag_;
+ bool writes_done_ops_at_start_{false};
- std::atomic_int callbacks_outstanding_;
+ // Minimum of 2 outstanding callbacks to pre-register for start and finish
+ std::atomic_int callbacks_outstanding_{2};
+ bool started_{false};
};
template <class Request>