aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/client_context.h10
-rw-r--r--include/grpc++/impl/call.h6
-rw-r--r--include/grpc++/stream.h48
-rw-r--r--src/cpp/client/client_unary_call.cc8
-rw-r--r--src/cpp/common/call.cc11
-rw-r--r--src/cpp/server/server_context.cc10
-rw-r--r--test/cpp/end2end/async_end2end_test.cc149
7 files changed, 188 insertions, 54 deletions
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 4594cbaeb6..7f1069ea5e 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -119,16 +119,6 @@ class ClientContext {
friend class ::grpc::ClientAsyncWriter;
template <class R, class W>
friend class ::grpc::ClientAsyncReaderWriter;
- friend Status BlockingUnaryCall(ChannelInterface *channel,
- const RpcMethod &method,
- ClientContext *context,
- const google::protobuf::Message &request,
- google::protobuf::Message *result);
- friend void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
- ClientContext *context,
- const google::protobuf::Message &request,
- google::protobuf::Message *result, Status *status,
- CompletionQueue *cq, void *tag);
grpc_call *call() { return call_; }
void set_call(grpc_call *call) {
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index 64f0f890c5..4ab226339d 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -65,13 +65,11 @@ class CallOpBuffer : public CompletionQueueTag {
void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendInitialMetadata(ClientContext *ctx);
- void AddRecvInitialMetadata(
- std::multimap<grpc::string, grpc::string> *metadata);
+ void AddRecvInitialMetadata(ClientContext* ctx);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
- void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata,
- Status *status);
+ void AddClientRecvStatus(ClientContext *ctx, Status *status);
void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata,
const Status &status);
void AddServerRecvClose(bool *cancelled);
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index be5b29589f..20ba3fb790 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -106,17 +106,15 @@ class ClientReader final : public ClientStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
- buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ buf.AddRecvInitialMetadata(context_);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
- context_->initial_metadata_received_ = true;
}
virtual bool Read(R* msg) override {
CallOpBuffer buf;
if (!context_->initial_metadata_received_) {
- buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
- context_->initial_metadata_received_ = true;
+ buf.AddRecvInitialMetadata(context_);
}
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
@@ -126,7 +124,7 @@ class ClientReader final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
- buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
return status;
@@ -173,7 +171,7 @@ class ClientWriter final : public ClientStreamingInterface,
CallOpBuffer buf;
Status status;
buf.AddRecvMessage(response_);
- buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message);
return status;
@@ -210,17 +208,15 @@ class ClientReaderWriter final : public ClientStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
- buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ buf.AddRecvInitialMetadata(context_);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
- context_->initial_metadata_received_ = true;
}
virtual bool Read(R* msg) override {
CallOpBuffer buf;
if (!context_->initial_metadata_received_) {
- buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
- context_->initial_metadata_received_ = true;
+ buf.AddRecvInitialMetadata(context_);
}
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
@@ -244,7 +240,7 @@ class ClientReaderWriter final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
- buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
+ buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
return status;
@@ -403,16 +399,14 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
- context_->initial_metadata_received_ = true;
}
void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
- read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
- context_->initial_metadata_received_ = true;
+ read_buf_.AddRecvInitialMetadata(context_);
}
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
@@ -421,10 +415,9 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
- context_->initial_metadata_received_ = true;
+ finish_buf_.AddRecvInitialMetadata(context_);
}
- finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
@@ -456,9 +449,8 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
- context_->initial_metadata_received_ = true;
}
void Write(const W& msg, void* tag) override {
@@ -476,11 +468,10 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
- context_->initial_metadata_received_ = true;
+ finish_buf_.AddRecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(response_);
- finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
@@ -514,16 +505,14 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
- meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
- context_->initial_metadata_received_ = true;
}
void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
- read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
- context_->initial_metadata_received_ = true;
+ read_buf_.AddRecvInitialMetadata(context_);
}
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
@@ -544,10 +533,9 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
- context_->initial_metadata_received_ = true;
+ finish_buf_.AddRecvInitialMetadata(context_);
}
- finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
+ finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
index 284af33b43..03a0326128 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/src/cpp/client/client_unary_call.cc
@@ -52,10 +52,10 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
Status status;
buf.AddSendInitialMetadata(context);
buf.AddSendMessage(request);
- buf.AddRecvInitialMetadata(&context->recv_initial_metadata_);
+ buf.AddRecvInitialMetadata(context);
buf.AddRecvMessage(result);
buf.AddClientSendClose();
- buf.AddClientRecvStatus(&context->trailing_metadata_, &status);
+ buf.AddClientRecvStatus(context, &status);
call.PerformOps(&buf);
GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
return status;
@@ -79,10 +79,10 @@ void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
Call call(channel->CreateCall(method, context, cq));
buf->AddSendInitialMetadata(context);
buf->AddSendMessage(request);
- buf->AddRecvInitialMetadata(&context->recv_initial_metadata_);
+ buf->AddRecvInitialMetadata(context);
buf->AddRecvMessage(result);
buf->AddClientSendClose();
- buf->AddClientRecvStatus(&context->trailing_metadata_, status);
+ buf->AddClientRecvStatus(context, status);
call.PerformOps(buf);
}
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index f1142cf8e5..04af36f312 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -130,9 +130,9 @@ void CallOpBuffer::AddSendInitialMetadata(
initial_metadata_ = FillMetadataArray(metadata);
}
-void CallOpBuffer::AddRecvInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) {
- recv_initial_metadata_ = metadata;
+void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) {
+ ctx->initial_metadata_received_ = true;
+ recv_initial_metadata_ = &ctx->recv_initial_metadata_;
}
void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) {
@@ -154,9 +154,8 @@ void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
recv_closed_ = cancelled;
}
-void CallOpBuffer::AddClientRecvStatus(
- std::multimap<grpc::string, grpc::string>* metadata, Status* status) {
- recv_trailing_metadata_ = metadata;
+void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) {
+ recv_trailing_metadata_ = &context->trailing_metadata_;
recv_status_ = status;
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 21a61af3a0..df4c4dc314 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -57,4 +57,14 @@ ServerContext::~ServerContext() {
}
}
+void ServerContext::AddInitialMetadata(const grpc::string& key,
+ const grpc::string& value) {
+ initial_metadata_.insert(std::make_pair(key, value));
+}
+
+void ServerContext::AddTrailingMetadata(const grpc::string& key,
+ const grpc::string& value) {
+ trailing_metadata_.insert(std::make_pair(key, value));
+}
+
} // namespace grpc
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index fbf9bcb117..3cd0ef5f12 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -364,6 +364,155 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
EXPECT_TRUE(recv_status.IsOk());
}
+TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::pair<grpc::string, grpc::string> meta1("key1", "val1");
+ std::pair<grpc::string, grpc::string> meta2("key2", "val2");
+
+ stub_->Echo(
+ &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
+
+ service_.RequestEcho(
+ &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
+ server_ok(2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
+ srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
+ response_writer.SendInitialMetadata(tag(3));
+ server_ok(3);
+
+ send_response.set_message(recv_request.message());
+ response_writer.Finish(send_response, Status::OK, tag(4));
+
+ server_ok(4);
+
+ client_ok(1);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+ auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
+ EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
+ EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
+ EXPECT_EQ(2, server_initial_metadata.size());
+}
+
+TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::pair<grpc::string, grpc::string> meta1("key1", "val1");
+ std::pair<grpc::string, grpc::string> meta2("key2", "val2");
+
+ stub_->Echo(
+ &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
+
+ service_.RequestEcho(
+ &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
+ server_ok(2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ response_writer.SendInitialMetadata(tag(3));
+ server_ok(3);
+
+ send_response.set_message(recv_request.message());
+ srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
+ srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
+ response_writer.Finish(send_response, Status::OK, tag(4));
+
+ server_ok(4);
+
+ client_ok(1);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+ auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
+ EXPECT_EQ(meta1.second, server_trailing_metadata.find(meta1.first)->second);
+ EXPECT_EQ(meta2.second, server_trailing_metadata.find(meta2.first)->second);
+ EXPECT_EQ(2, server_trailing_metadata.size());
+}
+
+TEST_F(AsyncEnd2endTest, MetadataRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::pair<grpc::string, grpc::string> meta1("key1", "val1");
+ std::pair<grpc::string, grpc::string> meta2("key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13});
+ std::pair<grpc::string, grpc::string> meta3("key3", "val3");
+ std::pair<grpc::string, grpc::string> meta6("key4-bin", {"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14});
+ std::pair<grpc::string, grpc::string> meta5("key5", "val5");
+ std::pair<grpc::string, grpc::string> meta4("key6-bin", {"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15});
+
+ cli_ctx.AddMetadata(meta1.first, meta1.second);
+ cli_ctx.AddMetadata(meta2.first, meta2.second);
+
+ stub_->Echo(
+ &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
+
+ service_.RequestEcho(
+ &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
+ server_ok(2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ auto client_initial_metadata = srv_ctx.client_metadata();
+ EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
+ EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
+ EXPECT_EQ(2, client_initial_metadata.size());
+
+ srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
+ srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
+ response_writer.SendInitialMetadata(tag(3));
+ server_ok(3);
+
+ send_response.set_message(recv_request.message());
+ srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
+ srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
+ response_writer.Finish(send_response, Status::OK, tag(4));
+
+ server_ok(4);
+
+ client_ok(1);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+ auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
+ EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
+ EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
+ EXPECT_EQ(2, server_initial_metadata.size());
+ auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
+ EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second);
+ EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second);
+ EXPECT_EQ(2, server_trailing_metadata.size());
+}
} // namespace
} // namespace testing
} // namespace grpc