aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-05-05 13:31:40 -0700
committerGravatar Yang Gao <yangg@google.com>2015-05-05 13:31:40 -0700
commit691ff71da5f4baf4a027b157cf8b6f64697e0a6f (patch)
treee498d21c00e7e2f8a2643100a15cf46fa8b50949 /test/cpp
parent196ade3e4d03fa550bf0adf3a114988061e3cf0f (diff)
parent97c5559040204dcff338df79b16390014fbc82c9 (diff)
Merge remote-tracking branch 'upstream/master' into mock
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/end2end_test.cc24
-rw-r--r--test/cpp/qps/client_async.cc133
2 files changed, 67 insertions, 90 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 93d7ace9a2..0945ed269d 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -172,7 +172,7 @@ class TestServiceImplDupPkg
class End2endTest : public ::testing::Test {
protected:
- End2endTest() : thread_pool_(2) {}
+ End2endTest() : kMaxMessageSize_(8192), thread_pool_(2) {}
void SetUp() GRPC_OVERRIDE {
int port = grpc_pick_unused_port_or_die();
@@ -182,6 +182,8 @@ class End2endTest : public ::testing::Test {
builder.AddListeningPort(server_address_.str(),
InsecureServerCredentials());
builder.RegisterService(&service_);
+ builder.SetMaxMessageSize(
+ kMaxMessageSize_); // For testing max message size.
builder.RegisterService(&dup_pkg_service_);
builder.SetThreadPool(&thread_pool_);
server_ = builder.BuildAndStart();
@@ -198,6 +200,7 @@ class End2endTest : public ::testing::Test {
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
+ const int kMaxMessageSize_;
TestServiceImpl service_;
TestServiceImplDupPkg dup_pkg_service_;
ThreadPool thread_pool_;
@@ -426,8 +429,7 @@ TEST_F(End2endTest, DiffPackageServices) {
// rpc and stream should fail on bad credentials.
TEST_F(End2endTest, BadCredentials) {
- std::unique_ptr<Credentials> bad_creds =
- ServiceAccountCredentials("", "", 1);
+ std::unique_ptr<Credentials> bad_creds = ServiceAccountCredentials("", "", 1);
EXPECT_EQ(nullptr, bad_creds.get());
std::shared_ptr<ChannelInterface> channel =
CreateChannel(server_address_.str(), bad_creds, ChannelArguments());
@@ -501,14 +503,13 @@ TEST_F(End2endTest, ClientCancelsRequestStream) {
auto stream = stub_->RequestStream(&context, &response);
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Write(request));
-
+
context.TryCancel();
Status s = stream->Finish();
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code());
-
- EXPECT_EQ(response.message(), "");
+ EXPECT_EQ(response.message(), "");
}
// Client cancels server stream after sending some messages
@@ -588,6 +589,17 @@ TEST_F(End2endTest, ThreadStress) {
}
}
+TEST_F(End2endTest, RpcMaxMessageSize) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message(string(kMaxMessageSize_ * 2, 'a'));
+
+ ClientContext context;
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_FALSE(s.IsOk());
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 0a6d9beeca..0aec1b1a57 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -130,39 +130,26 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
response_reader_;
};
-class AsyncUnaryClient GRPC_FINAL : public Client {
+class AsyncClient : public Client {
public:
- explicit AsyncUnaryClient(const ClientConfig& config) : Client(config) {
+ explicit AsyncClient(const ClientConfig& config,
+ std::function<void(CompletionQueue*, TestService::Stub*,
+ const SimpleRequest&)> setup_ctx) :
+ Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue);
}
-
- auto check_done = [](grpc::Status s, SimpleResponse* response) {};
-
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
- auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
- const SimpleRequest& request, void* tag) {
- return stub->AsyncUnaryCall(ctx, request, cq, tag);
- };
-
- TestService::Stub* stub = channel->get_stub();
- const SimpleRequest& request = request_;
- new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
- stub, request, start_req, check_done);
+ setup_ctx(cq, channel->get_stub(), request_);
}
}
-
- StartThreads(config.async_client_threads());
}
-
- ~AsyncUnaryClient() GRPC_OVERRIDE {
- EndThreads();
-
+ virtual ~AsyncClient() {
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
void* got_tag;
@@ -173,10 +160,13 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
}
}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(Histogram* histogram, size_t thread_idx)
+ GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) {
+ switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok,
+ std::chrono::system_clock::now() +
+ std::chrono::seconds(1))) {
case CompletionQueue::SHUTDOWN: return false;
case CompletionQueue::TIMEOUT: return true;
case CompletionQueue::GOT_EVENT: break;
@@ -192,10 +182,30 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
return true;
}
-
+ private:
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
};
+class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
+ public:
+ explicit AsyncUnaryClient(const ClientConfig& config) :
+ AsyncClient(config, SetupCtx) {
+ StartThreads(config.async_client_threads());
+ }
+ ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
+private:
+ static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
+ const SimpleRequest& req) {
+ auto check_done = [](grpc::Status s, SimpleResponse* response) {};
+ auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
+ const SimpleRequest& request, void* tag) {
+ return stub->AsyncUnaryCall(ctx, request, cq, tag);
+ };
+ new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
+ stub, req, start_req, check_done);
+ }
+};
+
template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
@@ -241,7 +251,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return(false);
}
next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
- stream_->Read(&response_, ClientRpcContext::tag(this));
+ stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
}
bool ReadDone(bool ok, Histogram *hist) {
@@ -263,71 +273,26 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_;
};
-class AsyncStreamingClient GRPC_FINAL : public Client {
+class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
- explicit AsyncStreamingClient(const ClientConfig &config) : Client(config) {
- for (int i = 0; i < config.async_client_threads(); i++) {
- cli_cqs_.emplace_back(new CompletionQueue);
- }
-
- auto check_done = [](grpc::Status s, SimpleResponse* response) {};
-
- int t = 0;
- for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
- for (auto channel = channels_.begin(); channel != channels_.end();
- channel++) {
- auto* cq = cli_cqs_[t].get();
- t = (t + 1) % cli_cqs_.size();
- auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
- void *tag) {
- auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
- return stream;
- };
-
- TestService::Stub *stub = channel->get_stub();
- const SimpleRequest &request = request_;
- new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- stub, request, start_req, check_done);
- }
- }
-
+ explicit AsyncStreamingClient(const ClientConfig &config) :
+ AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads());
}
- ~AsyncStreamingClient() GRPC_OVERRIDE {
- EndThreads();
-
- for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
- (*cq)->Shutdown();
- void *got_tag;
- bool ok;
- while ((*cq)->Next(&got_tag, &ok)) {
- delete ClientRpcContext::detag(got_tag);
- }
- }
- }
-
- bool ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE {
- void *got_tag;
- bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) {
- case CompletionQueue::SHUTDOWN: return false;
- case CompletionQueue::TIMEOUT: return true;
- case CompletionQueue::GOT_EVENT: break;
- }
-
- ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
- if (ctx->RunNextState(ok, histogram) == false) {
- // call the callback and then delete it
- ctx->RunNextState(ok, histogram);
- ctx->StartNewClone();
- delete ctx;
- }
-
- return true;
+ ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+private:
+ static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
+ const SimpleRequest& req) {
+ auto check_done = [](grpc::Status s, SimpleResponse* response) {};
+ auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
+ void *tag) {
+ auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
+ return stream;
+ };
+ new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
+ stub, req, start_req, check_done);
}
-
- std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
};
std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {