aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/stream.h2
-rw-r--r--src/cpp/common/completion_queue.cc3
-rw-r--r--test/cpp/end2end/end2end_test.cc21
3 files changed, 24 insertions, 2 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 7625bcc38d..6647e345c0 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -173,7 +173,7 @@ class ClientWriter GRPC_FINAL : public ClientStreamingInterface,
buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message);
+ GPR_ASSERT(cq_.Pluck(&buf));
return status;
}
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index cea2d24831..cdbe042d21 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -92,7 +92,8 @@ bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
void* ignored = tag;
GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
GPR_ASSERT(ignored == tag);
- return ok;
+ // Ignore mutations by FinalizeResult: Pluck returns the C API status
+ return ev->data.op_complete == GRPC_OP_OK;
}
void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 2d3b405d1c..48174ec127 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -491,6 +491,27 @@ TEST_F(End2endTest, ServerCancelsRpc) {
EXPECT_TRUE(s.details().empty());
}
+// Client cancels request stream after sending two messages
+TEST_F(End2endTest, ClientCancelsRequestStream) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("hello");
+
+ auto stream = stub_->RequestStream(&context, &response);
+ EXPECT_TRUE(stream->Write(request));
+ EXPECT_TRUE(stream->Write(request));
+
+ context.TryCancel();
+
+ Status s = stream->Finish();
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code());
+
+ EXPECT_EQ(response.message(), "");
+
+}
+
// Client cancels server stream after sending some messages
TEST_F(End2endTest, ClientCancelsResponseStream) {
ResetStub();