diff options
-rw-r--r-- | include/grpc++/client_context.h | 10 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 6 | ||||
-rw-r--r-- | include/grpc++/stream.h | 48 | ||||
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 8 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 11 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 10 | ||||
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 149 |
7 files changed, 188 insertions, 54 deletions
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 4594cbaeb6..7f1069ea5e 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -119,16 +119,6 @@ class ClientContext { friend class ::grpc::ClientAsyncWriter; template <class R, class W> friend class ::grpc::ClientAsyncReaderWriter; - friend Status BlockingUnaryCall(ChannelInterface *channel, - const RpcMethod &method, - ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result); - friend void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, - ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result, Status *status, - CompletionQueue *cq, void *tag); grpc_call *call() { return call_; } void set_call(grpc_call *call) { diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 64f0f890c5..4ab226339d 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -65,13 +65,11 @@ class CallOpBuffer : public CompletionQueueTag { void AddSendInitialMetadata( std::multimap<grpc::string, grpc::string> *metadata); void AddSendInitialMetadata(ClientContext *ctx); - void AddRecvInitialMetadata( - std::multimap<grpc::string, grpc::string> *metadata); + void AddRecvInitialMetadata(ClientContext* ctx); void AddSendMessage(const google::protobuf::Message &message); void AddRecvMessage(google::protobuf::Message *message); void AddClientSendClose(); - void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata, - Status *status); + void AddClientRecvStatus(ClientContext *ctx, Status *status); void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, const Status &status); void AddServerRecvClose(bool *cancelled); diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index be5b29589f..20ba3fb790 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -106,17 +106,15 @@ class ClientReader final : public ClientStreamingInterface, GPR_ASSERT(!context_->initial_metadata_received_); CallOpBuffer buf; - buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + buf.AddRecvInitialMetadata(context_); call_.PerformOps(&buf); GPR_ASSERT(cq_.Pluck(&buf)); - context_->initial_metadata_received_ = true; } virtual bool Read(R* msg) override { CallOpBuffer buf; if (!context_->initial_metadata_received_) { - buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - context_->initial_metadata_received_ = true; + buf.AddRecvInitialMetadata(context_); } buf.AddRecvMessage(msg); call_.PerformOps(&buf); @@ -126,7 +124,7 @@ class ClientReader final : public ClientStreamingInterface, virtual Status Finish() override { CallOpBuffer buf; Status status; - buf.AddClientRecvStatus(&context_->trailing_metadata_, &status); + buf.AddClientRecvStatus(context_, &status); call_.PerformOps(&buf); GPR_ASSERT(cq_.Pluck(&buf)); return status; @@ -173,7 +171,7 @@ class ClientWriter final : public ClientStreamingInterface, CallOpBuffer buf; Status status; buf.AddRecvMessage(response_); - buf.AddClientRecvStatus(&context_->trailing_metadata_, &status); + buf.AddClientRecvStatus(context_, &status); call_.PerformOps(&buf); GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message); return status; @@ -210,17 +208,15 @@ class ClientReaderWriter final : public ClientStreamingInterface, GPR_ASSERT(!context_->initial_metadata_received_); CallOpBuffer buf; - buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + buf.AddRecvInitialMetadata(context_); call_.PerformOps(&buf); GPR_ASSERT(cq_.Pluck(&buf)); - context_->initial_metadata_received_ = true; } virtual bool Read(R* msg) override { CallOpBuffer buf; if (!context_->initial_metadata_received_) { - buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - context_->initial_metadata_received_ = true; + buf.AddRecvInitialMetadata(context_); } buf.AddRecvMessage(msg); call_.PerformOps(&buf); @@ -244,7 +240,7 @@ class ClientReaderWriter final : public ClientStreamingInterface, virtual Status Finish() override { CallOpBuffer buf; Status status; - buf.AddClientRecvStatus(&context_->trailing_metadata_, &status); + buf.AddClientRecvStatus(context_, &status); call_.PerformOps(&buf); GPR_ASSERT(cq_.Pluck(&buf)); return status; @@ -403,16 +399,14 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface, GPR_ASSERT(!context_->initial_metadata_received_); meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + meta_buf_.AddRecvInitialMetadata(context_); call_.PerformOps(&meta_buf_); - context_->initial_metadata_received_ = true; } void Read(R* msg, void* tag) override { read_buf_.Reset(tag); if (!context_->initial_metadata_received_) { - read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - context_->initial_metadata_received_ = true; + read_buf_.AddRecvInitialMetadata(context_); } read_buf_.AddRecvMessage(msg); call_.PerformOps(&read_buf_); @@ -421,10 +415,9 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface, void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - context_->initial_metadata_received_ = true; + finish_buf_.AddRecvInitialMetadata(context_); } - finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); + finish_buf_.AddClientRecvStatus(context_, status); call_.PerformOps(&finish_buf_); } @@ -456,9 +449,8 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface, GPR_ASSERT(!context_->initial_metadata_received_); meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + meta_buf_.AddRecvInitialMetadata(context_); call_.PerformOps(&meta_buf_); - context_->initial_metadata_received_ = true; } void Write(const W& msg, void* tag) override { @@ -476,11 +468,10 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface, void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - context_->initial_metadata_received_ = true; + finish_buf_.AddRecvInitialMetadata(context_); } finish_buf_.AddRecvMessage(response_); - finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); + finish_buf_.AddClientRecvStatus(context_, status); call_.PerformOps(&finish_buf_); } @@ -514,16 +505,14 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, GPR_ASSERT(!context_->initial_metadata_received_); meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + meta_buf_.AddRecvInitialMetadata(context_); call_.PerformOps(&meta_buf_); - context_->initial_metadata_received_ = true; } void Read(R* msg, void* tag) override { read_buf_.Reset(tag); if (!context_->initial_metadata_received_) { - read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - context_->initial_metadata_received_ = true; + read_buf_.AddRecvInitialMetadata(context_); } read_buf_.AddRecvMessage(msg); call_.PerformOps(&read_buf_); @@ -544,10 +533,9 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - context_->initial_metadata_received_ = true; + finish_buf_.AddRecvInitialMetadata(context_); } - finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); + finish_buf_.AddClientRecvStatus(context_, status); call_.PerformOps(&finish_buf_); } diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 284af33b43..03a0326128 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -52,10 +52,10 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, Status status; buf.AddSendInitialMetadata(context); buf.AddSendMessage(request); - buf.AddRecvInitialMetadata(&context->recv_initial_metadata_); + buf.AddRecvInitialMetadata(context); buf.AddRecvMessage(result); buf.AddClientSendClose(); - buf.AddClientRecvStatus(&context->trailing_metadata_, &status); + buf.AddClientRecvStatus(context, &status); call.PerformOps(&buf); GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk()); return status; @@ -79,10 +79,10 @@ void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, Call call(channel->CreateCall(method, context, cq)); buf->AddSendInitialMetadata(context); buf->AddSendMessage(request); - buf->AddRecvInitialMetadata(&context->recv_initial_metadata_); + buf->AddRecvInitialMetadata(context); buf->AddRecvMessage(result); buf->AddClientSendClose(); - buf->AddClientRecvStatus(&context->trailing_metadata_, status); + buf->AddClientRecvStatus(context, status); call.PerformOps(buf); } diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index f1142cf8e5..04af36f312 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -130,9 +130,9 @@ void CallOpBuffer::AddSendInitialMetadata( initial_metadata_ = FillMetadataArray(metadata); } -void CallOpBuffer::AddRecvInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) { - recv_initial_metadata_ = metadata; +void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) { + ctx->initial_metadata_received_ = true; + recv_initial_metadata_ = &ctx->recv_initial_metadata_; } void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) { @@ -154,9 +154,8 @@ void CallOpBuffer::AddServerRecvClose(bool* cancelled) { recv_closed_ = cancelled; } -void CallOpBuffer::AddClientRecvStatus( - std::multimap<grpc::string, grpc::string>* metadata, Status* status) { - recv_trailing_metadata_ = metadata; +void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) { + recv_trailing_metadata_ = &context->trailing_metadata_; recv_status_ = status; } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 21a61af3a0..df4c4dc314 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -57,4 +57,14 @@ ServerContext::~ServerContext() { } } +void ServerContext::AddInitialMetadata(const grpc::string& key, + const grpc::string& value) { + initial_metadata_.insert(std::make_pair(key, value)); +} + +void ServerContext::AddTrailingMetadata(const grpc::string& key, + const grpc::string& value) { + trailing_metadata_.insert(std::make_pair(key, value)); +} + } // namespace grpc diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index fbf9bcb117..3cd0ef5f12 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -364,6 +364,155 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { EXPECT_TRUE(recv_status.IsOk()); } +TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::pair<grpc::string, grpc::string> meta1("key1", "val1"); + std::pair<grpc::string, grpc::string> meta2("key2", "val2"); + + stub_->Echo( + &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1)); + + service_.RequestEcho( + &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + server_ok(2); + EXPECT_EQ(send_request.message(), recv_request.message()); + srv_ctx.AddInitialMetadata(meta1.first, meta1.second); + srv_ctx.AddInitialMetadata(meta2.first, meta2.second); + response_writer.SendInitialMetadata(tag(3)); + server_ok(3); + + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(4)); + + server_ok(4); + + client_ok(1); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); + auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); + EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second); + EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second); + EXPECT_EQ(2, server_initial_metadata.size()); +} + +TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::pair<grpc::string, grpc::string> meta1("key1", "val1"); + std::pair<grpc::string, grpc::string> meta2("key2", "val2"); + + stub_->Echo( + &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1)); + + service_.RequestEcho( + &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + server_ok(2); + EXPECT_EQ(send_request.message(), recv_request.message()); + response_writer.SendInitialMetadata(tag(3)); + server_ok(3); + + send_response.set_message(recv_request.message()); + srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); + srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); + response_writer.Finish(send_response, Status::OK, tag(4)); + + server_ok(4); + + client_ok(1); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); + auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); + EXPECT_EQ(meta1.second, server_trailing_metadata.find(meta1.first)->second); + EXPECT_EQ(meta2.second, server_trailing_metadata.find(meta2.first)->second); + EXPECT_EQ(2, server_trailing_metadata.size()); +} + +TEST_F(AsyncEnd2endTest, MetadataRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::pair<grpc::string, grpc::string> meta1("key1", "val1"); + std::pair<grpc::string, grpc::string> meta2("key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13}); + std::pair<grpc::string, grpc::string> meta3("key3", "val3"); + std::pair<grpc::string, grpc::string> meta6("key4-bin", {"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14}); + std::pair<grpc::string, grpc::string> meta5("key5", "val5"); + std::pair<grpc::string, grpc::string> meta4("key6-bin", {"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15}); + + cli_ctx.AddMetadata(meta1.first, meta1.second); + cli_ctx.AddMetadata(meta2.first, meta2.second); + + stub_->Echo( + &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1)); + + service_.RequestEcho( + &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + server_ok(2); + EXPECT_EQ(send_request.message(), recv_request.message()); + auto client_initial_metadata = srv_ctx.client_metadata(); + EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); + EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); + EXPECT_EQ(2, client_initial_metadata.size()); + + srv_ctx.AddInitialMetadata(meta3.first, meta3.second); + srv_ctx.AddInitialMetadata(meta4.first, meta4.second); + response_writer.SendInitialMetadata(tag(3)); + server_ok(3); + + send_response.set_message(recv_request.message()); + srv_ctx.AddTrailingMetadata(meta5.first, meta5.second); + srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); + response_writer.Finish(send_response, Status::OK, tag(4)); + + server_ok(4); + + client_ok(1); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); + auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); + EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second); + EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second); + EXPECT_EQ(2, server_initial_metadata.size()); + auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); + EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second); + EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second); + EXPECT_EQ(2, server_trailing_metadata.size()); +} } // namespace } // namespace testing } // namespace grpc |