aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-07 15:51:35 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-07 15:51:35 -0700
commit23af48d9721c8c3409e03c8620ba717a12921814 (patch)
treecbc2d82cf2ce907cc6be2c380454726d9e9cba93 /test/cpp
parent561dc32247534d9204d9f68e92259759875c6d93 (diff)
parent41630a29507c8dd5b6110f0397b346b7feab442b (diff)
Merge github.com:grpc/grpc into server_stats
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/BUILD2
-rw-r--r--test/cpp/end2end/async_end2end_test.cc134
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc7
-rw-r--r--test/cpp/end2end/end2end_test.cc26
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc36
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc4
-rw-r--r--test/cpp/qps/BUILD1
7 files changed, 132 insertions, 78 deletions
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index b8505c1ae7..b29a13d4fb 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -193,6 +193,7 @@ grpc_cc_test(
"//test/cpp/util:test_util",
],
external_deps = [
+ "gmock",
"gtest",
],
)
@@ -235,6 +236,7 @@ grpc_cc_test(
"//test/cpp/util:test_util",
],
external_deps = [
+ "gmock",
"gtest",
],
)
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 7cb7b262de..e841a702d4 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -260,11 +260,31 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
server_address_ << "localhost:" << port_;
// Setup server
+ BuildAndStartServer();
+
+ gpr_tls_set(&g_is_async_end2end_test, 1);
+ }
+
+ void TearDown() override {
+ server_->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ cq_->Shutdown();
+ while (cq_->Next(&ignored_tag, &ignored_ok))
+ ;
+ stub_.reset();
+ poll_overrider_.reset();
+ gpr_tls_set(&g_is_async_end2end_test, 0);
+ grpc_recycle_unused_port(port_);
+ }
+
+ void BuildAndStartServer() {
ServerBuilder builder;
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), server_creds);
- builder.RegisterService(&service_);
+ service_.reset(new grpc::testing::EchoTestService::AsyncService());
+ builder.RegisterService(service_.get());
if (GetParam().health_check_service) {
builder.RegisterService(&health_check_);
}
@@ -276,20 +296,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
new ServerBuilderSyncPluginDisabler());
builder.SetOption(move(sync_plugin_disabler));
server_ = builder.BuildAndStart();
-
- gpr_tls_set(&g_is_async_end2end_test, 1);
- }
-
- void TearDown() override {
- server_->Shutdown();
- void* ignored_tag;
- bool ignored_ok;
- cq_->Shutdown();
- while (cq_->Next(&ignored_tag, &ignored_ok))
- ;
- poll_overrider_.reset();
- gpr_tls_set(&g_is_async_end2end_test, 0);
- grpc_recycle_unused_port(port_);
}
void ResetStub() {
@@ -319,8 +325,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -341,7 +347,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
- grpc::testing::EchoTestService::AsyncService service_;
+ std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
HealthCheck health_check_;
std::ostringstream server_address_;
int port_;
@@ -359,6 +365,26 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}
+TEST_P(AsyncEnd2endTest, ReconnectChannel) {
+ if (GetParam().inproc) {
+ return;
+ }
+ ResetStub();
+ SendRpc(1);
+ server_->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ cq_->Shutdown();
+ while (cq_->Next(&ignored_tag, &ignored_ok))
+ ;
+ BuildAndStartServer();
+ // It needs more than kConnectivityCheckIntervalMsec time to reconnect the
+ // channel.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(1600, GPR_TIMESPAN)));
+ SendRpc(1);
+}
+
// We do not need to protect notify because the use is synchronized.
void ServerWait(Server* server, int* notify) {
server->Wait();
@@ -407,8 +433,8 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(2, true)
@@ -444,8 +470,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking)
.Expect(2, true)
@@ -506,8 +532,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->Write(send_request, tag(3));
@@ -579,8 +605,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -635,8 +661,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -687,8 +713,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -741,8 +767,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@@ -801,8 +827,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
@@ -869,8 +895,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
@@ -946,8 +972,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@@ -991,8 +1017,8 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
@@ -1041,8 +1067,8 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
@@ -1104,8 +1130,8 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@@ -1168,8 +1194,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1203,8 +1229,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
- service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
- cq_.get(), tag(2));
+ service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+ cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1295,8 +1321,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
@@ -1426,8 +1452,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
- cq_.get(), cq_.get(), tag(2));
+ service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
+ cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@@ -1562,8 +1588,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
- service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
- tag(2));
+ service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+ tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index b588eda84f..54408db600 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -180,16 +180,18 @@ class ClientLbEnd2endTest : public ::testing::Test {
std::unique_ptr<Server> server_;
MyTestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
+ bool server_ready_ = false;
explicit ServerData(const grpc::string& server_host, int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "starting server on port %d", port_);
std::mutex mu;
+ std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
thread_.reset(new std::thread(
std::bind(&ServerData::Start, this, server_host, &mu, &cond)));
- std::unique_lock<std::mutex> lock(mu);
- cond.wait(lock);
+ cond.wait(lock, [this] { return server_ready_; });
+ server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
@@ -203,6 +205,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
std::lock_guard<std::mutex> lock(*mu);
+ server_ready_ = true;
cond->notify_one();
}
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 8bada48a2b..1f4861a7e6 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -238,6 +238,18 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
int port = grpc_pick_unused_port_or_die();
server_address_ << "127.0.0.1:" << port;
// Setup server
+ BuildAndStartServer(processor);
+ }
+
+ void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
+ if (is_server_started_) {
+ server_->Shutdown();
+ BuildAndStartServer(processor);
+ }
+ }
+
+ void BuildAndStartServer(
+ const std::shared_ptr<AuthMetadataProcessor>& processor) {
ServerBuilder builder;
ConfigureServerBuilder(&builder);
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
@@ -685,6 +697,20 @@ TEST_P(End2endTest, MultipleRpcs) {
}
}
+TEST_P(End2endTest, ReconnectChannel) {
+ if (GetParam().inproc) {
+ return;
+ }
+ ResetStub();
+ SendRpc(stub_.get(), 1, false);
+ RestartServer(std::shared_ptr<AuthMetadataProcessor>());
+ // It needs more than kConnectivityCheckIntervalMsec time to reconnect the
+ // channel.
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(1600, GPR_TIMESPAN)));
+ SendRpc(stub_.get(), 1, false);
+}
+
TEST_P(End2endTest, RequestStreamOneRequest) {
ResetStub();
EchoRequest request;
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 508f7f94d6..518c65ac8d 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -39,6 +39,7 @@ extern "C" {
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -396,10 +397,6 @@ grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
void DestroyChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {}
-char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return gpr_strdup("peer");
-}
-
void GetChannelInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
const grpc_channel_info *channel_info) {}
@@ -412,7 +409,6 @@ static const grpc_channel_filter dummy_filter = {StartTransportStreamOp,
0,
InitChannelElem,
DestroyChannelElem,
- GetPeer,
GetChannelInfo,
"dummy_filter"};
@@ -459,11 +455,6 @@ void DestroyStream(grpc_exec_ctx *exec_ctx, grpc_transport *self,
/* implementation of grpc_transport_destroy */
void Destroy(grpc_exec_ctx *exec_ctx, grpc_transport *self) {}
-/* implementation of grpc_transport_get_peer */
-char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_transport *self) {
- return gpr_strdup("transport_peer");
-}
-
/* implementation of grpc_transport_get_endpoint */
grpc_endpoint *GetEndpoint(grpc_exec_ctx *exec_ctx, grpc_transport *self) {
return nullptr;
@@ -473,7 +464,7 @@ static const grpc_transport_vtable dummy_transport_vtable = {
0, "dummy_http2", InitStream,
SetPollset, SetPollsetSet, PerformStreamOp,
PerformOp, DestroyStream, Destroy,
- GetPeer, GetEndpoint};
+ GetEndpoint};
static grpc_transport dummy_transport = {&dummy_transport_vtable};
@@ -639,18 +630,22 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata);
namespace isolated_call_filter {
+typedef struct { grpc_call_combiner *call_combiner; } call_data;
+
static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
+ call_data *calld = static_cast<call_data *>(elem->call_data);
if (op->recv_initial_metadata) {
- GRPC_CLOSURE_SCHED(
- exec_ctx,
+ GRPC_CALL_COMBINER_START(
+ exec_ctx, calld->call_combiner,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ GRPC_ERROR_NONE, "recv_initial_metadata");
}
if (op->recv_message) {
- GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready,
- GRPC_ERROR_NONE);
+ GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
+ op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_NONE, "recv_message");
}
GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
}
@@ -667,6 +662,8 @@ static void StartTransportOp(grpc_exec_ctx *exec_ctx,
static grpc_error *InitCallElem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
+ call_data *calld = static_cast<call_data *>(elem->call_data);
+ calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -687,24 +684,19 @@ grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
void DestroyChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {}
-char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return gpr_strdup("peer");
-}
-
void GetChannelInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
const grpc_channel_info *channel_info) {}
static const grpc_channel_filter isolated_call_filter = {
StartTransportStreamOp,
StartTransportOp,
- 0,
+ sizeof(call_data),
InitCallElem,
SetPollsetOrPollsetSet,
DestroyCallElem,
0,
InitChannelElem,
DestroyChannelElem,
- GetPeer,
GetChannelInfo,
"isolated_call_filter"};
} // namespace isolated_call_filter
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index cb113c5254..936681fec1 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -286,6 +286,7 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) {
Stream s(&f);
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload;
+ memset(&op_payload, 0, sizeof(op_payload));
std::unique_ptr<Closure> start;
std::unique_ptr<Closure> done;
@@ -337,6 +338,7 @@ static void BM_TransportEmptyOp(benchmark::State &state) {
s.Init(state);
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload;
+ memset(&op_payload, 0, sizeof(op_payload));
auto reset_op = [&]() {
memset(&op, 0, sizeof(op));
op.payload = &op_payload;
@@ -364,6 +366,7 @@ static void BM_TransportStreamSend(benchmark::State &state) {
s.Init(state);
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload;
+ memset(&op_payload, 0, sizeof(op_payload));
auto reset_op = [&]() {
memset(&op, 0, sizeof(op));
op.payload = &op_payload;
@@ -485,6 +488,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
Stream s(&f);
s.Init(state);
grpc_transport_stream_op_batch_payload op_payload;
+ memset(&op_payload, 0, sizeof(op_payload));
grpc_transport_stream_op_batch op;
grpc_byte_stream *recv_stream;
grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384);
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD
index 31f210dec0..3352269517 100644
--- a/test/cpp/qps/BUILD
+++ b/test/cpp/qps/BUILD
@@ -46,6 +46,7 @@ grpc_cc_library(
":usage_timer",
"//:grpc",
"//:grpc++",
+ "//:grpc++_core_stats",
"//src/proto/grpc/testing:control_proto",
"//src/proto/grpc/testing:payloads_proto",
"//src/proto/grpc/testing:services_proto",