aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h150
-rw-r--r--test/cpp/end2end/client_callback_end2end_test.cc4
-rw-r--r--test/cpp/end2end/test_service_impl.cc1
3 files changed, 111 insertions, 44 deletions
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
index 49ceaf7ca2..92a588b3c3 100644
--- a/include/grpcpp/impl/codegen/client_callback.h
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -197,10 +197,12 @@ class ClientCallbackReaderWriterImpl
}
void StartCall() override {
- // This call initiates two batches, each with a callback
+ // This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
- // 2. Recv trailing metadata, on_completion callback
- callbacks_outstanding_ = 2;
+ // 2. Any read backlog
+ // 3. Recv trailing metadata, on_completion callback
+ // 4. Any write backlog
+ started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
@@ -208,7 +210,6 @@ class ClientCallbackReaderWriterImpl
MaybeFinish();
},
&start_ops_);
- start_corked_ = context_->initial_metadata_corked_;
if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
@@ -217,12 +218,6 @@ class ClientCallbackReaderWriterImpl
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
- finish_ops_.ClientRecvStatus(context_, &finish_status_);
- finish_ops_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_ops_);
-
// Also set up the read and write tags so that they don't have to be set up
// each time
write_tag_.Set(call_.call(),
@@ -240,12 +235,33 @@ class ClientCallbackReaderWriterImpl
},
&read_ops_);
read_ops_.set_core_cq_tag(&read_tag_);
+ if (read_ops_at_start_) {
+ call_.PerformOps(&read_ops_);
+ }
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ if (write_ops_at_start_) {
+ call_.PerformOps(&write_ops_);
+ }
+
+ if (writes_done_ops_at_start_) {
+ call_.PerformOps(&writes_done_ops_);
+ }
}
void Read(Response* msg) override {
read_ops_.RecvMessage(msg);
callbacks_outstanding_++;
- call_.PerformOps(&read_ops_);
+ if (started_) {
+ call_.PerformOps(&read_ops_);
+ } else {
+ read_ops_at_start_ = true;
+ }
}
void Write(const Request* msg, WriteOptions options) override {
@@ -262,7 +278,11 @@ class ClientCallbackReaderWriterImpl
write_ops_.ClientSendClose();
}
callbacks_outstanding_++;
- call_.PerformOps(&write_ops_);
+ if (started_) {
+ call_.PerformOps(&write_ops_);
+ } else {
+ write_ops_at_start_ = true;
+ }
}
void WritesDone() override {
if (start_corked_) {
@@ -279,7 +299,11 @@ class ClientCallbackReaderWriterImpl
&writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++;
- call_.PerformOps(&writes_done_ops_);
+ if (started_) {
+ call_.PerformOps(&writes_done_ops_);
+ } else {
+ writes_done_ops_at_start_ = true;
+ }
}
private:
@@ -288,7 +312,10 @@ class ClientCallbackReaderWriterImpl
ClientCallbackReaderWriterImpl(
Call call, ClientContext* context,
::grpc::experimental::ClientBidiReactor* reactor)
- : context_(context), call_(call), reactor_(reactor) {}
+ : context_(context),
+ call_(call),
+ reactor_(reactor),
+ start_corked_(context_->initial_metadata_corked_) {}
ClientContext* context_;
Call call_;
@@ -305,14 +332,19 @@ class ClientCallbackReaderWriterImpl
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
write_ops_;
CallbackWithSuccessTag write_tag_;
+ bool write_ops_at_start_{false};
CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_;
CallbackWithSuccessTag writes_done_tag_;
+ bool writes_done_ops_at_start_{false};
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<Response>> read_ops_;
+ CallOpSet<CallOpRecvMessage<Response>> read_ops_;
CallbackWithSuccessTag read_tag_;
+ bool read_ops_at_start_{false};
- std::atomic_int callbacks_outstanding_;
+ // Minimum of 2 outstanding callbacks to pre-register for start and finish
+ std::atomic_int callbacks_outstanding_{2};
+ bool started_{false};
};
template <class Request, class Response>
@@ -358,10 +390,11 @@ class ClientCallbackReaderImpl
}
void StartCall() override {
- // This call initiates two batches, each with a callback
+ // This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
- // 2. Recv trailing metadata, on_completion callback
- callbacks_outstanding_ = 2;
+ // 2. Any backlog
+ // 3. Recv trailing metadata, on_completion callback
+ started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
@@ -375,12 +408,6 @@ class ClientCallbackReaderImpl
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
- finish_ops_.ClientRecvStatus(context_, &finish_status_);
- finish_ops_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_ops_);
-
// Also set up the read tag so it doesn't have to be set up each time
read_tag_.Set(call_.call(),
[this](bool ok) {
@@ -389,12 +416,25 @@ class ClientCallbackReaderImpl
},
&read_ops_);
read_ops_.set_core_cq_tag(&read_tag_);
+ if (read_ops_at_start_) {
+ call_.PerformOps(&read_ops_);
+ }
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
}
void Read(Response* msg) override {
read_ops_.RecvMessage(msg);
callbacks_outstanding_++;
- call_.PerformOps(&read_ops_);
+ if (started_) {
+ call_.PerformOps(&read_ops_);
+ } else {
+ read_ops_at_start_ = true;
+ }
}
private:
@@ -422,10 +462,13 @@ class ClientCallbackReaderImpl
CallbackWithSuccessTag finish_tag_;
Status finish_status_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<Response>> read_ops_;
+ CallOpSet<CallOpRecvMessage<Response>> read_ops_;
CallbackWithSuccessTag read_tag_;
+ bool read_ops_at_start_{false};
- std::atomic_int callbacks_outstanding_;
+ // Minimum of 2 outstanding callbacks to pre-register for start and finish
+ std::atomic_int callbacks_outstanding_{2};
+ bool started_{false};
};
template <class Response>
@@ -471,10 +514,11 @@ class ClientCallbackWriterImpl
}
void StartCall() override {
- // This call initiates two batches, each with a callback
+ // This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
- // 2. Recv message + recv trailing metadata, on_completion callback
- callbacks_outstanding_ = 2;
+ // 2. Recv trailing metadata, on_completion callback
+ // 3. Any backlog
+ started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
@@ -482,7 +526,6 @@ class ClientCallbackWriterImpl
MaybeFinish();
},
&start_ops_);
- start_corked_ = context_->initial_metadata_corked_;
if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
@@ -491,12 +534,6 @@ class ClientCallbackWriterImpl
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
- finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
- &finish_ops_);
- finish_ops_.ClientRecvStatus(context_, &finish_status_);
- finish_ops_.set_core_cq_tag(&finish_tag_);
- call_.PerformOps(&finish_ops_);
-
// Also set up the read and write tags so that they don't have to be set up
// each time
write_tag_.Set(call_.call(),
@@ -506,6 +543,20 @@ class ClientCallbackWriterImpl
},
&write_ops_);
write_ops_.set_core_cq_tag(&write_tag_);
+
+ finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
+ &finish_ops_);
+ finish_ops_.ClientRecvStatus(context_, &finish_status_);
+ finish_ops_.set_core_cq_tag(&finish_tag_);
+ call_.PerformOps(&finish_ops_);
+
+ if (write_ops_at_start_) {
+ call_.PerformOps(&write_ops_);
+ }
+
+ if (writes_done_ops_at_start_) {
+ call_.PerformOps(&writes_done_ops_);
+ }
}
void Write(const Request* msg, WriteOptions options) override {
@@ -522,7 +573,11 @@ class ClientCallbackWriterImpl
write_ops_.ClientSendClose();
}
callbacks_outstanding_++;
- call_.PerformOps(&write_ops_);
+ if (started_) {
+ call_.PerformOps(&write_ops_);
+ } else {
+ write_ops_at_start_ = true;
+ }
}
void WritesDone() override {
if (start_corked_) {
@@ -539,7 +594,11 @@ class ClientCallbackWriterImpl
&writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++;
- call_.PerformOps(&writes_done_ops_);
+ if (started_) {
+ call_.PerformOps(&writes_done_ops_);
+ } else {
+ writes_done_ops_at_start_ = true;
+ }
}
private:
@@ -549,7 +608,10 @@ class ClientCallbackWriterImpl
ClientCallbackWriterImpl(Call call, ClientContext* context,
Response* response,
::grpc::experimental::ClientWriteReactor* reactor)
- : context_(context), call_(call), reactor_(reactor) {
+ : context_(context),
+ call_(call),
+ reactor_(reactor),
+ start_corked_(context_->initial_metadata_corked_) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
}
@@ -569,11 +631,15 @@ class ClientCallbackWriterImpl
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
write_ops_;
CallbackWithSuccessTag write_tag_;
+ bool write_ops_at_start_{false};
CallOpSet<CallOpSendInitialMetadata, CallOpClientSendClose> writes_done_ops_;
CallbackWithSuccessTag writes_done_tag_;
+ bool writes_done_ops_at_start_{false};
- std::atomic_int callbacks_outstanding_;
+ // Minimum of 2 outstanding callbacks to pre-register for start and finish
+ std::atomic_int callbacks_outstanding_{2};
+ bool started_{false};
};
template <class Request>
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
index 31216f72cc..57c3cb87f2 100644
--- a/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -194,11 +194,11 @@ class ClientCallbackEnd2endTest
stream_ =
test->generic_stub_->experimental().PrepareBidiStreamingCall(
&cli_ctx_, method_name, this);
- stream_->StartCall();
request_.set_message(test_str);
send_buf_ = SerializeToByteBuffer(&request_);
- stream_->Read(&recv_buf_);
stream_->Write(send_buf_.get());
+ stream_->Read(&recv_buf_);
+ stream_->StartCall();
}
void OnWriteDone(bool ok) override { stream_->WritesDone(); }
void OnReadDone(bool ok) override {
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 605356724f..1726e87ea6 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -223,6 +223,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
return;
}
+ gpr_log(GPR_DEBUG, "Request message was %s", request->message().c_str());
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (host_) {