diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 12 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 166 | ||||
-rw-r--r-- | test/cpp/end2end/hybrid_end2end_test.cc | 298 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 198 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.h | 85 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 2 |
7 files changed, 585 insertions, 178 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index cfda571326..0616cc07ee 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -180,21 +180,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { int port = grpc_pick_unused_port_or_die(); server_address_ << "localhost:" << port; - // It is currently unsupported to mix sync and async services - // in the same server, so first test that (for coverage) - ServerBuilder build_bad; - build_bad.AddListeningPort(server_address_.str(), - grpc::InsecureServerCredentials()); - build_bad.RegisterAsyncService(&service_); - grpc::testing::EchoTestService::Service sync_service; - build_bad.RegisterService(&sync_service); - GPR_ASSERT(build_bad.BuildAndStart() == nullptr); - // Setup server ServerBuilder builder; builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials()); - builder.RegisterAsyncService(&service_); + builder.RegisterService(&service_); cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index f8027bcf0b..58b20e61c9 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -54,6 +54,7 @@ #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/string_ref_helper.h" using grpc::testing::EchoRequest; @@ -64,40 +65,6 @@ namespace grpc { namespace testing { namespace { -const char* kServerCancelAfterReads = "cancel_after_reads"; - -// When echo_deadline is requested, deadline seen in the ServerContext is set in -// the response in seconds. -void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, - EchoResponse* response) { - if (request->has_param() && request->param().echo_deadline()) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - if (context->deadline() != system_clock::time_point::max()) { - Timepoint2Timespec(context->deadline(), &deadline); - } - response->mutable_param()->set_request_deadline(deadline.tv_sec); - } -} - -void CheckServerAuthContext(const ServerContext* context, - const grpc::string& expected_client_identity) { - std::shared_ptr<const AuthContext> auth_ctx = context->auth_context(); - std::vector<grpc::string_ref> ssl = - auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, ssl.size()); - EXPECT_EQ("ssl", ToString(ssl[0])); - if (expected_client_identity.length() == 0) { - EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); - EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); - EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); - } else { - auto identity = auth_ctx->GetPeerIdentity(); - EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); - EXPECT_EQ(1u, identity.size()); - EXPECT_EQ(expected_client_identity, identity[0]); - } -} - bool CheckIsLocalhost(const grpc::string& addr) { const grpc::string kIpv6("ipv6:[::1]:"); const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:"); @@ -212,137 +179,6 @@ class Proxy : public ::grpc::testing::EchoTestService::Service { std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_; }; -class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { - public: - TestServiceImpl() : signal_client_(false), host_() {} - explicit TestServiceImpl(const grpc::string& host) - : signal_client_(false), host_(new grpc::string(host)) {} - - Status Echo(ServerContext* context, const EchoRequest* request, - EchoResponse* response) GRPC_OVERRIDE { - response->set_message(request->message()); - MaybeEchoDeadline(context, request, response); - if (host_) { - response->mutable_param()->set_host(*host_); - } - if (request->has_param() && request->param().client_cancel_after_us()) { - { - std::unique_lock<std::mutex> lock(mu_); - signal_client_ = true; - } - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().client_cancel_after_us(), - GPR_TIMESPAN))); - } - return Status::CANCELLED; - } else if (request->has_param() && - request->param().server_cancel_after_us()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().server_cancel_after_us(), - GPR_TIMESPAN))); - return Status::CANCELLED; - } else { - EXPECT_FALSE(context->IsCancelled()); - } - - if (request->has_param() && request->param().echo_metadata()) { - const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = - context->client_metadata(); - for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator - iter = client_metadata.begin(); - iter != client_metadata.end(); ++iter) { - context->AddTrailingMetadata(ToString(iter->first), - ToString(iter->second)); - } - } - if (request->has_param() && - (request->param().expected_client_identity().length() > 0 || - request->param().check_auth_context())) { - CheckServerAuthContext(context, - request->param().expected_client_identity()); - } - if (request->has_param() && - request->param().response_message_length() > 0) { - response->set_message( - grpc::string(request->param().response_message_length(), '\0')); - } - if (request->has_param() && request->param().echo_peer()) { - response->mutable_param()->set_peer(context->peer()); - } - return Status::OK; - } - - // Unimplemented is left unimplemented to test the returned error. - - Status RequestStream(ServerContext* context, - ServerReader<EchoRequest>* reader, - EchoResponse* response) GRPC_OVERRIDE { - EchoRequest request; - response->set_message(""); - int cancel_after_reads = 0; - const std::multimap<grpc::string_ref, grpc::string_ref>& - client_initial_metadata = context->client_metadata(); - if (client_initial_metadata.find(kServerCancelAfterReads) != - client_initial_metadata.end()) { - std::istringstream iss(ToString( - client_initial_metadata.find(kServerCancelAfterReads)->second)); - iss >> cancel_after_reads; - gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); - } - while (reader->Read(&request)) { - if (cancel_after_reads == 1) { - gpr_log(GPR_INFO, "return cancel status"); - return Status::CANCELLED; - } else if (cancel_after_reads > 0) { - cancel_after_reads--; - } - response->mutable_message()->append(request.message()); - } - return Status::OK; - } - - // Return 3 messages. - // TODO(yangg) make it generic by adding a parameter into EchoRequest - Status ResponseStream(ServerContext* context, const EchoRequest* request, - ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE { - EchoResponse response; - response.set_message(request->message() + "0"); - writer->Write(response); - response.set_message(request->message() + "1"); - writer->Write(response); - response.set_message(request->message() + "2"); - writer->Write(response); - - return Status::OK; - } - - Status BidiStream(ServerContext* context, - ServerReaderWriter<EchoResponse, EchoRequest>* stream) - GRPC_OVERRIDE { - EchoRequest request; - EchoResponse response; - while (stream->Read(&request)) { - gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); - response.set_message(request.message()); - stream->Write(response); - } - return Status::OK; - } - - bool signal_client() { - std::unique_lock<std::mutex> lock(mu_); - return signal_client_; - } - - private: - bool signal_client_; - std::mutex mu_; - std::unique_ptr<grpc::string> host_; -}; - class TestServiceImplDupPkg : public ::grpc::testing::duplicate::EchoTestService::Service { public: diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc new file mode 100644 index 0000000000..555d5d2ec6 --- /dev/null +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -0,0 +1,298 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <memory> +#include <thread> + +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc/grpc.h> +#include <gtest/gtest.h> + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" +// #include "test/cpp/util/string_ref_helper.h" + +namespace grpc { +namespace testing { + +namespace { + +void* tag(int i) { return (void*)(intptr_t)i; } + +bool VerifyReturnSuccess(CompletionQueue* cq, int i) { + void* got_tag; + bool ok; + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + EXPECT_EQ(tag(i), got_tag); + return ok; +} + +void Verify(CompletionQueue* cq, int i, bool expect_ok) { + EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i)); +} + +// Handlers to handle async request at a server. To be run in a separate thread. +template <class Service> +void HandleEcho(Service* service, ServerCompletionQueue* cq) { + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + EchoRequest recv_request; + EchoResponse send_response; + service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq, tag(1)); + Verify(cq, 1, true); + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(2)); + Verify(cq, 2, true); +} + +template <class Service> +void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) { + ServerContext srv_ctx; + EchoRequest recv_request; + EchoResponse send_response; + ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1)); + Verify(cq, 1, true); + int i = 1; + do { + i++; + send_response.mutable_message()->append(recv_request.message()); + srv_stream.Read(&recv_request, tag(i)); + } while (VerifyReturnSuccess(cq, i)); + srv_stream.Finish(send_response, Status::OK, tag(100)); + Verify(cq, 100, true); +} + +template <class Service> +void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) { + ServerContext srv_ctx; + EchoRequest recv_request; + EchoResponse send_response; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq, + tag(1)); + Verify(cq, 1, true); + send_response.set_message(recv_request.message() + "0"); + srv_stream.Write(send_response, tag(2)); + Verify(cq, 2, true); + send_response.set_message(recv_request.message() + "1"); + srv_stream.Write(send_response, tag(3)); + Verify(cq, 3, true); + send_response.set_message(recv_request.message() + "2"); + srv_stream.Write(send_response, tag(4)); + Verify(cq, 4, true); + srv_stream.Finish(Status::OK, tag(5)); + Verify(cq, 5, true); +} + +class HybridEnd2endTest : public ::testing::Test { + protected: + HybridEnd2endTest() {} + + void SetUpServer(::grpc::Service* service) { + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + + // Setup server + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + grpc::InsecureServerCredentials()); + builder.RegisterService(service); + // Create a separate cq for each potential handler. + for (int i = 0; i < 5; i++) { + cqs_.push_back(std::move(builder.AddCompletionQueue())); + } + server_ = builder.BuildAndStart(); + } + + void TearDown() GRPC_OVERRIDE { + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { + (*it)->Shutdown(); + while ((*it)->Next(&ignored_tag, &ignored_ok)) + ; + } + } + + void ResetStub() { + std::shared_ptr<Channel> channel = + CreateChannel(server_address_.str(), InsecureChannelCredentials()); + stub_ = grpc::testing::EchoTestService::NewStub(channel); + } + + void TestAllMethods() { + SendEcho(); + SendSimpleClientStreaming(); + SendSimpleServerStreaming(); + SendBidiStreaming(); + } + + void SendEcho() { + EchoRequest send_request; + EchoResponse recv_response; + ClientContext cli_ctx; + send_request.set_message("Hello"); + Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response); + EXPECT_EQ(send_request.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + } + + void SendSimpleClientStreaming() { + EchoRequest send_request; + EchoResponse recv_response; + grpc::string expected_message; + ClientContext cli_ctx; + send_request.set_message("Hello"); + auto stream = stub_->RequestStream(&cli_ctx, &recv_response); + for (int i = 0; i < 5; i++) { + EXPECT_TRUE(stream->Write(send_request)); + expected_message.append(send_request.message()); + } + stream->WritesDone(); + Status recv_status = stream->Finish(); + EXPECT_EQ(expected_message, recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + } + + void SendSimpleServerStreaming() { + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + + auto stream = stub_->ResponseStream(&context, request); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "2"); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); + } + + void SendBidiStreaming() { + EchoRequest request; + EchoResponse response; + ClientContext context; + grpc::string msg("hello"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "1"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "2"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + stream->WritesDone(); + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); + } + + std::vector<std::unique_ptr<ServerCompletionQueue> > cqs_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<Server> server_; + std::ostringstream server_address_; +}; + +TEST_F(HybridEnd2endTest, AsyncEcho) { + EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> service; + SetUpServer(&service); + ResetStub(); + std::thread echo_handler_thread( + [this, &service] { HandleEcho(&service, cqs_[0].get()); }); + TestAllMethods(); + echo_handler_thread.join(); +} + +TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) { + EchoTestService::WithAsyncMethod_RequestStream<EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> > service; + SetUpServer(&service); + ResetStub(); + std::thread echo_handler_thread( + [this, &service] { HandleEcho(&service, cqs_[0].get()); }); + std::thread request_stream_handler_thread( + [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + TestAllMethods(); + echo_handler_thread.join(); + request_stream_handler_thread.join(); +} + +TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) { + EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > + service; + SetUpServer(&service); + ResetStub(); + std::thread echo_handler_thread( + [this, &service] { HandleServerStreaming(&service, cqs_[0].get()); }); + std::thread request_stream_handler_thread( + [this, &service] { HandleClientStreaming(&service, cqs_[1].get()); }); + TestAllMethods(); + echo_handler_thread.join(); + request_stream_handler_thread.join(); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc new file mode 100644 index 0000000000..97d15b13ca --- /dev/null +++ b/test/cpp/end2end/test_service_impl.cc @@ -0,0 +1,198 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/end2end/test_service_impl.h" + +#include <grpc++/security/credentials.h> +#include <grpc++/server_context.h> +#include <grpc/grpc.h> +#include <gtest/gtest.h> + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/util/string_ref_helper.h" + +using std::chrono::system_clock; + +namespace grpc { +namespace testing { +namespace { + +// When echo_deadline is requested, deadline seen in the ServerContext is set in +// the response in seconds. +void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, + EchoResponse* response) { + if (request->has_param() && request->param().echo_deadline()) { + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + if (context->deadline() != system_clock::time_point::max()) { + Timepoint2Timespec(context->deadline(), &deadline); + } + response->mutable_param()->set_request_deadline(deadline.tv_sec); + } +} + +void CheckServerAuthContext(const ServerContext* context, + const grpc::string& expected_client_identity) { + std::shared_ptr<const AuthContext> auth_ctx = context->auth_context(); + std::vector<grpc::string_ref> ssl = + auth_ctx->FindPropertyValues("transport_security_type"); + EXPECT_EQ(1u, ssl.size()); + EXPECT_EQ("ssl", ToString(ssl[0])); + if (expected_client_identity.length() == 0) { + EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); + EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); + EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); + } else { + auto identity = auth_ctx->GetPeerIdentity(); + EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); + EXPECT_EQ(1u, identity.size()); + EXPECT_EQ(expected_client_identity, identity[0]); + } +} +} // namespace + +Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) { + response->set_message(request->message()); + MaybeEchoDeadline(context, request, response); + if (host_) { + response->mutable_param()->set_host(*host_); + } + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock<std::mutex> lock(mu_); + signal_client_ = true; + } + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN))); + } + return Status::CANCELLED; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().server_cancel_after_us(), + GPR_TIMESPAN))); + return Status::CANCELLED; + } else { + EXPECT_FALSE(context->IsCancelled()); + } + + if (request->has_param() && request->param().echo_metadata()) { + const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = + context->client_metadata(); + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = client_metadata.begin(); + iter != client_metadata.end(); ++iter) { + context->AddTrailingMetadata(ToString(iter->first), + ToString(iter->second)); + } + } + if (request->has_param() && + (request->param().expected_client_identity().length() > 0 || + request->param().check_auth_context())) { + CheckServerAuthContext(context, + request->param().expected_client_identity()); + } + if (request->has_param() && request->param().response_message_length() > 0) { + response->set_message( + grpc::string(request->param().response_message_length(), '\0')); + } + if (request->has_param() && request->param().echo_peer()) { + response->mutable_param()->set_peer(context->peer()); + } + return Status::OK; +} + +// Unimplemented is left unimplemented to test the returned error. + +Status TestServiceImpl::RequestStream(ServerContext* context, + ServerReader<EchoRequest>* reader, + EchoResponse* response) { + EchoRequest request; + response->set_message(""); + int cancel_after_reads = 0; + const std::multimap<grpc::string_ref, grpc::string_ref>& + client_initial_metadata = context->client_metadata(); + if (client_initial_metadata.find(kServerCancelAfterReads) != + client_initial_metadata.end()) { + std::istringstream iss(ToString( + client_initial_metadata.find(kServerCancelAfterReads)->second)); + iss >> cancel_after_reads; + gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads); + } + while (reader->Read(&request)) { + if (cancel_after_reads == 1) { + gpr_log(GPR_INFO, "return cancel status"); + return Status::CANCELLED; + } else if (cancel_after_reads > 0) { + cancel_after_reads--; + } + response->mutable_message()->append(request.message()); + } + return Status::OK; +} + +// Return 3 messages. +// TODO(yangg) make it generic by adding a parameter into EchoRequest +Status TestServiceImpl::ResponseStream(ServerContext* context, + const EchoRequest* request, + ServerWriter<EchoResponse>* writer) { + EchoResponse response; + response.set_message(request->message() + "0"); + writer->Write(response); + response.set_message(request->message() + "1"); + writer->Write(response); + response.set_message(request->message() + "2"); + writer->Write(response); + + return Status::OK; +} + +Status TestServiceImpl::BidiStream( + ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) { + EchoRequest request; + EchoResponse response; + while (stream->Read(&request)) { + gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); + response.set_message(request.message()); + stream->Write(response); + } + return Status::OK; +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h new file mode 100644 index 0000000000..2c35b5614c --- /dev/null +++ b/test/cpp/end2end/test_service_impl.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +#ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H +#define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H + +#include <memory> +#include <mutex> + +#include <grpc++/server_context.h> +#include <grpc/grpc.h> + +#include "src/proto/grpc/testing/echo.grpc.pb.h" + +namespace grpc { +namespace testing { + +const char* const kServerCancelAfterReads = "cancel_after_reads"; + +class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { + public: + TestServiceImpl() : signal_client_(false), host_() {} + explicit TestServiceImpl(const grpc::string& host) + : signal_client_(false), host_(new grpc::string(host)) {} + + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) GRPC_OVERRIDE; + + // Unimplemented is left unimplemented to test the returned error. + + Status RequestStream(ServerContext* context, + ServerReader<EchoRequest>* reader, + EchoResponse* response) GRPC_OVERRIDE; + + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE; + + Status BidiStream(ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) + GRPC_OVERRIDE; + + bool signal_client() { + std::unique_lock<std::mutex> lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; + std::unique_ptr<grpc::string> host_; +}; + +} // namespace testing +} // namespace grpc + +#endif // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f270cd0987..d02769d8e6 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -49,10 +49,10 @@ #include <grpc/support/histogram.h> #include <grpc/support/log.h> +#include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" -#include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index d530dac86b..32a8fc48e6 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -350,7 +350,7 @@ class AsyncQpsServerTest : public Server { static void RegisterBenchmarkService(ServerBuilder *builder, BenchmarkService::AsyncService *service) { - builder->RegisterAsyncService(service); + builder->RegisterService(service); } static void RegisterGenericService(ServerBuilder *builder, grpc::AsyncGenericService *service) { |