diff options
author | ncteisen <ncteisen@gmail.com> | 2018-09-27 10:26:08 -0500 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2018-09-27 14:20:57 -0500 |
commit | 038e760a7da9098dd1e0b76db0342b1eb1a6351d (patch) | |
tree | 02b2079f022a279e061172cd86acb533d975b160 | |
parent | 404b2515af9c4dcc29440dea8b955ba341521b68 (diff) |
Channelz C++ Socket support
-rw-r--r-- | src/cpp/server/channelz/channelz_service.cc | 17 | ||||
-rw-r--r-- | src/cpp/server/channelz/channelz_service.h | 4 | ||||
-rw-r--r-- | test/cpp/end2end/channelz_service_test.cc | 152 |
3 files changed, 173 insertions, 0 deletions
diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc index e096c1f421..4e3fe8c1c9 100644 --- a/src/cpp/server/channelz/channelz_service.cc +++ b/src/cpp/server/channelz/channelz_service.cc @@ -92,4 +92,21 @@ Status ChannelzService::GetSubchannel( return Status::OK; } +Status ChannelzService::GetSocket(ServerContext* unused, + const channelz::v1::GetSocketRequest* request, + channelz::v1::GetSocketResponse* response) { + char* json_str = grpc_channelz_get_socket(request->socket_id()); + gpr_log(GPR_ERROR, "%s", json_str); + if (json_str == nullptr) { + return Status(NOT_FOUND, "No object found for that SocketId"); + } + google::protobuf::util::Status s = + google::protobuf::util::JsonStringToMessage(json_str, response); + gpr_free(json_str); + if (s != google::protobuf::util::Status::OK) { + return Status(INTERNAL, s.ToString()); + } + return Status::OK; +} + } // namespace grpc diff --git a/src/cpp/server/channelz/channelz_service.h b/src/cpp/server/channelz/channelz_service.h index 9e0b5b6ead..1be4e01c73 100644 --- a/src/cpp/server/channelz/channelz_service.h +++ b/src/cpp/server/channelz/channelz_service.h @@ -44,6 +44,10 @@ class ChannelzService final : public channelz::v1::Channelz::Service { Status GetSubchannel(ServerContext* unused, const channelz::v1::GetSubchannelRequest* request, channelz::v1::GetSubchannelResponse* response) override; + // implementation of GetSocket rpc + Status GetSocket(ServerContext* unused, + const channelz::v1::GetSocketRequest* request, + channelz::v1::GetSocketResponse* response) override; }; } // namespace grpc diff --git a/test/cpp/end2end/channelz_service_test.cc b/test/cpp/end2end/channelz_service_test.cc index e96d68f93b..a597fd9c4b 100644 --- a/test/cpp/end2end/channelz_service_test.cc +++ b/test/cpp/end2end/channelz_service_test.cc @@ -43,6 +43,8 @@ using grpc::channelz::v1::GetChannelRequest; using grpc::channelz::v1::GetChannelResponse; using grpc::channelz::v1::GetServersRequest; using grpc::channelz::v1::GetServersResponse; +using grpc::channelz::v1::GetSocketRequest; +using grpc::channelz::v1::GetSocketResponse; using grpc::channelz::v1::GetSubchannelRequest; using grpc::channelz::v1::GetSubchannelResponse; using grpc::channelz::v1::GetTopChannelsRequest; @@ -71,6 +73,26 @@ class Proxy : public ::grpc::testing::EchoTestService::Service { return stubs_[idx]->Echo(client_context.get(), *request, response); } + Status BidiStream(ServerContext* server_context, + ServerReaderWriter<EchoResponse, EchoRequest>* + stream_from_client) override { + EchoRequest request; + EchoResponse response; + std::unique_ptr<ClientContext> client_context = + ClientContext::FromServerContext(*server_context); + + // always use the first proxy for streaming + auto stream_to_backend = stubs_[0]->BidiStream(client_context.get()); + while (stream_from_client->Read(&request)) { + stream_to_backend->Write(request); + stream_to_backend->Read(&response); + stream_from_client->Write(response); + } + + stream_to_backend->WritesDone(); + return stream_to_backend->Finish(); + } + private: std::vector<std::unique_ptr<::grpc::testing::EchoTestService::Stub>> stubs_; }; @@ -149,6 +171,21 @@ class ChannelzServerTest : public ::testing::Test { EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); } + void SendSuccessfulStream(int num_messages) { + EchoRequest request; + EchoResponse response; + request.set_message("Hello channelz"); + ClientContext context; + auto stream_to_proxy = echo_stub_->BidiStream(&context); + for (int i = 0; i < num_messages; ++i) { + EXPECT_TRUE(stream_to_proxy->Write(request)); + EXPECT_TRUE(stream_to_proxy->Read(&response)); + } + stream_to_proxy->WritesDone(); + Status s = stream_to_proxy->Finish(); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + } + void SendFailedEcho(int channel_idx) { EchoRequest request; EchoResponse response; @@ -448,6 +485,121 @@ TEST_F(ChannelzServerTest, ServerCallTest) { kNumSuccess + kNumFailed + 1); } +TEST_F(ChannelzServerTest, ManySubchannelsAndSockets) { + ResetStubs(); + const int kNumChannels = 4; + ConfigureProxy(kNumChannels); + const int kNumSuccess = 10; + const int kNumFailed = 11; + for (int i = 0; i < kNumSuccess; ++i) { + SendSuccessfulEcho(0); + SendSuccessfulEcho(2); + } + for (int i = 0; i < kNumFailed; ++i) { + SendFailedEcho(1); + SendFailedEcho(2); + } + GetTopChannelsRequest gtc_request; + GetTopChannelsResponse gtc_response; + gtc_request.set_start_channel_id(0); + ClientContext context; + Status s = + channelz_stub_->GetTopChannels(&context, gtc_request, >c_response); + EXPECT_TRUE(s.ok()) << s.error_message(); + EXPECT_EQ(gtc_response.channel_size(), kNumChannels); + for (int i = 0; i < gtc_response.channel_size(); ++i) { + // if the channel sent no RPCs, then expect no subchannels to have been + // created. + if (gtc_response.channel(i).data().calls_started() == 0) { + EXPECT_EQ(gtc_response.channel(i).subchannel_ref_size(), 0); + continue; + } + // The resolver must return at least one address. + ASSERT_GT(gtc_response.channel(i).subchannel_ref_size(), 0); + // First grab the subchannel + GetSubchannelRequest get_subchannel_req; + GetSubchannelResponse get_subchannel_resp; + get_subchannel_req.set_subchannel_id( + gtc_response.channel(i).subchannel_ref(0).subchannel_id()); + ClientContext get_subchannel_ctx; + Status s = channelz_stub_->GetSubchannel( + &get_subchannel_ctx, get_subchannel_req, &get_subchannel_resp); + EXPECT_TRUE(s.ok()) << s.error_message(); + EXPECT_EQ(get_subchannel_resp.subchannel().socket_ref_size(), 1); + // Now grab the socket. + GetSocketRequest get_socket_req; + GetSocketResponse get_socket_resp; + ClientContext get_socket_ctx; + get_socket_req.set_socket_id( + get_subchannel_resp.subchannel().socket_ref(0).socket_id()); + s = channelz_stub_->GetSocket(&get_socket_ctx, get_socket_req, + &get_socket_resp); + EXPECT_TRUE(s.ok()) << s.error_message(); + // calls started == streams started AND stream succeeded. Since none of + // these RPCs were canceled, all of the streams will succeeded even though + // the RPCs they represent might have failed. + EXPECT_EQ(get_subchannel_resp.subchannel().data().calls_started(), + get_socket_resp.socket().data().streams_started()); + EXPECT_EQ(get_subchannel_resp.subchannel().data().calls_started(), + get_socket_resp.socket().data().streams_succeeded()); + // All of the calls were unary, so calls started == messages sent. + EXPECT_EQ(get_subchannel_resp.subchannel().data().calls_started(), + get_socket_resp.socket().data().messages_sent()); + // We only get responses when the RPC was successful, so + // calls succeeded == messages received. + EXPECT_EQ(get_subchannel_resp.subchannel().data().calls_succeeded(), + get_socket_resp.socket().data().messages_received()); + } +} + +TEST_F(ChannelzServerTest, StreamingRPC) { + ResetStubs(); + ConfigureProxy(1); + const int kNumMessages = 5; + SendSuccessfulStream(kNumMessages); + // Get the channel + GetChannelRequest get_channel_request; + GetChannelResponse get_channel_response; + get_channel_request.set_channel_id(GetChannelId(0)); + ClientContext get_channel_context; + Status s = channelz_stub_->GetChannel( + &get_channel_context, get_channel_request, &get_channel_response); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + EXPECT_EQ(get_channel_response.channel().data().calls_started(), 1); + EXPECT_EQ(get_channel_response.channel().data().calls_succeeded(), 1); + EXPECT_EQ(get_channel_response.channel().data().calls_failed(), 0); + // Get the subchannel + ASSERT_GT(get_channel_response.channel().subchannel_ref_size(), 0); + GetSubchannelRequest get_subchannel_request; + GetSubchannelResponse get_subchannel_response; + ClientContext get_subchannel_context; + get_subchannel_request.set_subchannel_id( + get_channel_response.channel().subchannel_ref(0).subchannel_id()); + s = channelz_stub_->GetSubchannel(&get_subchannel_context, + get_subchannel_request, + &get_subchannel_response); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + EXPECT_EQ(get_subchannel_response.subchannel().data().calls_started(), 1); + EXPECT_EQ(get_subchannel_response.subchannel().data().calls_succeeded(), 1); + EXPECT_EQ(get_subchannel_response.subchannel().data().calls_failed(), 0); + // Get the socket + ASSERT_GT(get_subchannel_response.subchannel().socket_ref_size(), 0); + GetSocketRequest get_socket_request; + GetSocketResponse get_socket_response; + ClientContext get_socket_context; + get_socket_request.set_socket_id( + get_subchannel_response.subchannel().socket_ref(0).socket_id()); + s = channelz_stub_->GetSocket(&get_socket_context, get_socket_request, + &get_socket_response); + EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message(); + EXPECT_EQ(get_socket_response.socket().data().streams_started(), 1); + EXPECT_EQ(get_socket_response.socket().data().streams_succeeded(), 1); + EXPECT_EQ(get_socket_response.socket().data().streams_failed(), 0); + EXPECT_EQ(get_socket_response.socket().data().messages_sent(), kNumMessages); + EXPECT_EQ(get_socket_response.socket().data().messages_received(), + kNumMessages); +} + } // namespace testing } // namespace grpc |