aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-11 12:03:36 -0700
committerGravatar GitHub <noreply@github.com>2017-09-11 12:03:36 -0700
commit55c4b31389d5557b88d39bde6d783d68aa747de7 (patch)
tree57b63eb6f54ab024326468cfe36828eff3794cd9 /test
parent4a7e0594cc8434a6834ac61cb0b1198ac859affd (diff)
parent8e71287bd768dd9b8698363d062a817c39f0e07d (diff)
Merge pull request #11758 from ctiller/write_completion
Write completion changes
Diffstat (limited to 'test')
-rw-r--r--test/core/bad_client/bad_client.c8
-rw-r--r--test/core/end2end/bad_server_response_test.c6
-rw-r--r--test/core/end2end/tests/resource_quota_server.c8
-rw-r--r--test/core/iomgr/tcp_posix_test.c12
-rw-r--r--test/core/security/secure_endpoint_test.c6
-rw-r--r--test/cpp/end2end/async_end2end_test.cc6
-rw-r--r--test/cpp/end2end/end2end_test.cc16
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc166
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_trickle.cc5
9 files changed, 146 insertions, 87 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index c3964ca84b..383d1240cb 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -45,18 +45,18 @@ typedef struct {
} thd_args;
static void thd_func(void *arg) {
- thd_args *a = arg;
+ thd_args *a = (thd_args *)arg;
a->validator(a->server, a->cq, a->registered_method);
gpr_event_set(&a->done_thd, (void *)1);
}
static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- thd_args *a = arg;
+ thd_args *a = (thd_args *)arg;
gpr_event_set(&a->done_write, (void *)1);
}
static void server_setup_transport(void *ts, grpc_transport *transport) {
- thd_args *a = ts;
+ thd_args *a = (thd_args *)ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_server_setup_transport(&exec_ctx, a->server, transport, NULL,
grpc_server_get_channel_args(a->server));
@@ -64,7 +64,7 @@ static void server_setup_transport(void *ts, grpc_transport *transport) {
}
static void read_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- gpr_event *read_done = arg;
+ gpr_event *read_done = (gpr_event *)arg;
gpr_event_set(read_done, (void *)1);
}
diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c
index 5f89058c45..eeabc769d3 100644
--- a/test/core/end2end/bad_server_response_test.c
+++ b/test/core/end2end/bad_server_response_test.c
@@ -136,7 +136,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
gpr_free(acceptor);
- test_tcp_server *server = arg;
+ test_tcp_server *server = (test_tcp_server *)arg;
GRPC_CLOSURE_INIT(&on_read, handle_read, NULL, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_write, done_write, NULL, grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&state.temp_incoming_buffer);
@@ -237,7 +237,7 @@ typedef struct {
} poll_args;
static void actually_poll_server(void *arg) {
- poll_args *pa = arg;
+ poll_args *pa = (poll_args *)arg;
gpr_timespec deadline = n_sec_deadline(10);
while (true) {
bool done = gpr_atm_acq_load(&state.done_atm) != 0;
@@ -259,7 +259,7 @@ static void poll_server_until_read_done(test_tcp_server *server,
gpr_atm_rel_store(&state.done_atm, 0);
state.write_done = 0;
gpr_thd_id id;
- poll_args *pa = gpr_malloc(sizeof(*pa));
+ poll_args *pa = (poll_args *)gpr_malloc(sizeof(*pa));
pa->server = server;
pa->signal_when_done = signal_when_done;
gpr_thd_new(&id, actually_poll_server, pa, NULL);
diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c
index 57018628ce..0316920762 100644
--- a/test/core/end2end/tests/resource_quota_server.c
+++ b/test/core/end2end/tests/resource_quota_server.c
@@ -111,10 +111,10 @@ void resource_quota_server(grpc_end2end_test_config config) {
grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024);
#define NUM_CALLS 100
-#define CLIENT_BASE_TAG 1000
-#define SERVER_START_BASE_TAG 2000
-#define SERVER_RECV_BASE_TAG 3000
-#define SERVER_END_BASE_TAG 4000
+#define CLIENT_BASE_TAG 0x1000
+#define SERVER_START_BASE_TAG 0x2000
+#define SERVER_RECV_BASE_TAG 0x3000
+#define SERVER_END_BASE_TAG 0x4000
grpc_arg arg;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index c45068e7ec..cdaa2ce2af 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -89,7 +89,7 @@ static ssize_t fill_socket(int fd) {
static size_t fill_socket_partial(int fd, size_t bytes) {
ssize_t write_bytes;
size_t total_bytes = 0;
- unsigned char *buf = gpr_malloc(bytes);
+ unsigned char *buf = (unsigned char *)gpr_malloc(bytes);
unsigned i;
for (i = 0; i < bytes; ++i) {
buf[i] = (uint8_t)(i % 256);
@@ -267,7 +267,7 @@ struct write_socket_state {
static grpc_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
size_t *num_blocks, uint8_t *current_data) {
size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
- grpc_slice *slices = gpr_malloc(sizeof(grpc_slice) * nslices);
+ grpc_slice *slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * nslices);
size_t num_bytes_left = num_bytes;
unsigned i, j;
unsigned char *buf;
@@ -301,7 +301,7 @@ static void write_done(grpc_exec_ctx *exec_ctx,
}
void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
- unsigned char *buf = gpr_malloc(read_size);
+ unsigned char *buf = (unsigned char *)gpr_malloc(read_size);
ssize_t bytes_read;
size_t bytes_left = num_bytes;
int flags;
@@ -404,7 +404,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
}
void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *errors) {
- int *done = arg;
+ int *done = (int *)arg;
*done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
@@ -548,7 +548,7 @@ static grpc_endpoint_test_config configs[] = {
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
grpc_error *error) {
- grpc_pollset_destroy(exec_ctx, p);
+ grpc_pollset_destroy(exec_ctx, (grpc_pollset *)p);
}
int main(int argc, char **argv) {
@@ -556,7 +556,7 @@ int main(int argc, char **argv) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- g_pollset = gpr_zalloc(grpc_pollset_size());
+ g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(g_pollset, &g_mu);
grpc_endpoint_tests(configs[0], g_pollset, g_mu);
run_tests();
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index e9f2c76738..839a05fa9b 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -70,7 +70,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
size_t still_pending_size;
size_t total_buffer_size = 8192;
size_t buffer_size = total_buffer_size;
- uint8_t *encrypted_buffer = gpr_malloc(buffer_size);
+ uint8_t *encrypted_buffer = (uint8_t *)gpr_malloc(buffer_size);
uint8_t *cur = encrypted_buffer;
grpc_slice encrypted_leftover;
for (i = 0; i < leftover_nslices; i++) {
@@ -202,7 +202,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
grpc_error *error) {
- grpc_pollset_destroy(exec_ctx, p);
+ grpc_pollset_destroy(exec_ctx, (grpc_pollset *)p);
}
int main(int argc, char **argv) {
@@ -211,7 +211,7 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
- g_pollset = gpr_zalloc(grpc_pollset_size());
+ g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(g_pollset, &g_mu);
grpc_endpoint_tests(configs[0], g_pollset, g_mu);
grpc_endpoint_tests(configs[1], g_pollset, g_mu);
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index e841a702d4..41090d161a 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -266,6 +266,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
}
void TearDown() override {
+ gpr_tls_set(&g_is_async_end2end_test, 0);
server_->Shutdown();
void* ignored_tag;
bool ignored_ok;
@@ -274,7 +275,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
;
stub_.reset();
poll_overrider_.reset();
- gpr_tls_set(&g_is_async_end2end_test, 0);
grpc_recycle_unused_port(port_);
}
@@ -396,6 +396,7 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
ResetStub();
SendRpc(1);
EXPECT_EQ(0, notify);
+ gpr_tls_set(&g_is_async_end2end_test, 0);
server_->Shutdown();
wait_thread.join();
EXPECT_EQ(1, notify);
@@ -404,8 +405,9 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
ResetStub();
SendRpc(1);
- server_->Shutdown();
+ std::thread t([this]() { server_->Shutdown(); });
server_->Wait();
+ t.join();
}
// Test a simple RPC using the async version of Next
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 1f4861a7e6..e54cd03ca2 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -757,6 +757,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ auto stream = stub_->RequestStream(&context, &response);
+ request.set_message("hello");
+ EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
+ EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
+ stream->WritesDone();
+ Status s = stream->Finish();
+ EXPECT_EQ(response.message(), "hellohello");
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
ResetStub();
EchoRequest request;
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 936681fec1..070034fe33 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -29,6 +29,7 @@
extern "C" {
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -154,23 +155,59 @@ class Fixture {
grpc_transport *t_;
};
-static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+class Closure : public grpc_closure {
+ public:
+ virtual ~Closure() {}
+};
+
+template <class F>
+std::unique_ptr<Closure> MakeClosure(
+ F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
+ struct C : public Closure {
+ C(const F &f, grpc_closure_scheduler *sched) : f_(f) {
+ GRPC_CLOSURE_INIT(this, Execute, this, sched);
+ }
+ F f_;
+ static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ static_cast<C *>(arg)->f_(exec_ctx, error);
+ }
+ };
+ return std::unique_ptr<Closure>(new C(f, sched));
+}
+
+template <class F>
+grpc_closure *MakeOnceClosure(
+ F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
+ struct C : public grpc_closure {
+ C(const F &f) : f_(f) {}
+ F f_;
+ static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ static_cast<C *>(arg)->f_(exec_ctx, error);
+ delete static_cast<C *>(arg);
+ }
+ };
+ auto *c = new C{f};
+ return GRPC_CLOSURE_INIT(c, C::Execute, c, sched);
+}
class Stream {
public:
Stream(Fixture *f) : f_(f) {
- GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream");
stream_size_ = grpc_transport_stream_size(f->transport());
stream_ = gpr_malloc(stream_size_);
arena_ = gpr_arena_create(4096);
}
~Stream() {
+ gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_free(stream_);
gpr_arena_destroy(arena_);
}
void Init(benchmark::State &state) {
+ GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this,
+ "test_stream");
+ gpr_event_init(&done_);
memset(stream_, 0, stream_size_);
if ((state.iterations() & 0xffff) == 0) {
gpr_arena_destroy(arena_);
@@ -181,13 +218,17 @@ class Stream {
NULL, arena_);
}
- void DestroyThen(grpc_closure *closure) {
- grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(),
- static_cast<grpc_stream *>(stream_), closure);
+ void DestroyThen(grpc_exec_ctx *exec_ctx, grpc_closure *closure) {
+ destroy_closure_ = closure;
+#ifndef NDEBUG
+ grpc_stream_unref(exec_ctx, &refcount_, "DestroyThen");
+#else
+ grpc_stream_unref(exec_ctx, &refcount_);
+#endif
}
- void Op(grpc_transport_stream_op_batch *op) {
- grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(),
+ void Op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op) {
+ grpc_transport_perform_stream_op(exec_ctx, f_->transport(),
static_cast<grpc_stream *>(stream_), op);
}
@@ -196,48 +237,24 @@ class Stream {
}
private:
+ static void FinishDestroy(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ auto stream = static_cast<Stream *>(arg);
+ grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(),
+ static_cast<grpc_stream *>(stream->stream_),
+ stream->destroy_closure_);
+ gpr_event_set(&stream->done_, (void *)1);
+ }
+
Fixture *f_;
grpc_stream_refcount refcount_;
gpr_arena *arena_;
size_t stream_size_;
void *stream_;
+ grpc_closure *destroy_closure_ = nullptr;
+ gpr_event done_;
};
-class Closure : public grpc_closure {
- public:
- virtual ~Closure() {}
-};
-
-template <class F>
-std::unique_ptr<Closure> MakeClosure(
- F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
- struct C : public Closure {
- C(const F &f, grpc_closure_scheduler *sched) : f_(f) {
- GRPC_CLOSURE_INIT(this, Execute, this, sched);
- }
- F f_;
- static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- static_cast<C *>(arg)->f_(exec_ctx, error);
- }
- };
- return std::unique_ptr<Closure>(new C(f, sched));
-}
-
-template <class F>
-grpc_closure *MakeOnceClosure(
- F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
- struct C : public grpc_closure {
- C(const F &f) : f_(f) {}
- F f_;
- static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- static_cast<C *>(arg)->f_(exec_ctx, error);
- delete static_cast<C *>(arg);
- }
- };
- auto *c = new C{f};
- return GRPC_CLOSURE_INIT(c, C::Execute, c, sched);
-}
-
////////////////////////////////////////////////////////////////////////////////
// Benchmarks
//
@@ -246,11 +263,18 @@ static void BM_StreamCreateDestroy(benchmark::State &state) {
TrackCounters track_counters;
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
+ grpc_transport_stream_op_batch op;
+ grpc_transport_stream_op_batch_payload op_payload;
+ memset(&op, 0, sizeof(op));
+ op.cancel_stream = true;
+ op.payload = &op_payload;
+ op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
std::unique_ptr<Closure> next =
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (!state.KeepRunning()) return;
s.Init(state);
- s.DestroyThen(next.get());
+ s.Op(exec_ctx, &op);
+ s.DestroyThen(exec_ctx, next.get());
});
GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
@@ -314,14 +338,14 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) {
op.on_complete = done.get();
op.send_initial_metadata = true;
op.payload->send_initial_metadata.send_initial_metadata = &b;
- s.Op(&op);
+ s.Op(exec_ctx, &op);
});
done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
reset_op();
op.cancel_stream = true;
op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
- s.Op(&op);
- s.DestroyThen(start.get());
+ s.Op(exec_ctx, &op);
+ s.DestroyThen(exec_ctx, start.get());
});
GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
@@ -348,22 +372,28 @@ static void BM_TransportEmptyOp(benchmark::State &state) {
if (!state.KeepRunning()) return;
reset_op();
op.on_complete = c.get();
- s.Op(&op);
+ s.Op(exec_ctx, &op);
});
GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
- s.DestroyThen(
- MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
+ reset_op();
+ op.cancel_stream = true;
+ op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
+ s.Op(f.exec_ctx(), &op);
+ s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx,
+ grpc_error *error) {}));
f.FlushExecCtx();
track_counters.Finish(state);
}
BENCHMARK(BM_TransportEmptyOp);
+std::vector<std::unique_ptr<gpr_event>> done_events;
+
static void BM_TransportStreamSend(benchmark::State &state) {
TrackCounters track_counters;
Fixture f(grpc::ChannelArguments(), true);
- Stream s(&f);
- s.Init(state);
+ auto s = std::unique_ptr<Stream>(new Stream(&f));
+ s->Init(state);
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload;
memset(&op_payload, 0, sizeof(op_payload));
@@ -390,11 +420,17 @@ static void BM_TransportStreamSend(benchmark::State &state) {
grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i])));
}
+ gpr_event *bm_done = new gpr_event;
+ gpr_event_init(bm_done);
+
std::unique_ptr<Closure> c =
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
- if (!state.KeepRunning()) return;
+ if (!state.KeepRunning()) {
+ gpr_event_set(bm_done, (void *)1);
+ return;
+ }
// force outgoing window to be yuge
- s.chttp2_stream()->flow_control.remote_window_delta =
+ s->chttp2_stream()->flow_control.remote_window_delta =
1024 * 1024 * 1024;
f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024;
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
@@ -402,23 +438,27 @@ static void BM_TransportStreamSend(benchmark::State &state) {
op.on_complete = c.get();
op.send_message = true;
op.payload->send_message.send_message = &send_stream.base;
- s.Op(&op);
+ s->Op(exec_ctx, &op);
});
reset_op();
op.send_initial_metadata = true;
op.payload->send_initial_metadata.send_initial_metadata = &b;
op.on_complete = c.get();
- s.Op(&op);
+ s->Op(f.exec_ctx(), &op);
f.FlushExecCtx();
+ gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME));
+ done_events.emplace_back(bm_done);
+
reset_op();
op.cancel_stream = true;
op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
- s.Op(&op);
- s.DestroyThen(
- MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
+ s->Op(f.exec_ctx(), &op);
+ s->DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx,
+ grpc_error *error) {}));
f.FlushExecCtx();
+ s.reset();
track_counters.Finish(state);
grpc_metadata_batch_destroy(f.exec_ctx(), &b);
grpc_slice_buffer_destroy(&send_buffer);
@@ -535,7 +575,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
op.recv_message = true;
op.payload->recv_message.recv_message = &recv_stream;
op.payload->recv_message.recv_message_ready = drain_start.get();
- s.Op(&op);
+ s.Op(exec_ctx, &op);
f.PushInput(grpc_slice_ref(incoming_data));
});
@@ -578,7 +618,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
op.payload->recv_initial_metadata.recv_initial_metadata_ready =
do_nothing.get();
op.on_complete = c.get();
- s.Op(&op);
+ s.Op(f.exec_ctx(), &op);
f.PushInput(SLICE_FROM_BUFFER(
"\x00\x00\x00\x04\x00\x00\x00\x00\x00"
// Generated using:
@@ -596,9 +636,9 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
reset_op();
op.cancel_stream = true;
op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
- s.Op(&op);
- s.DestroyThen(
- MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
+ s.Op(f.exec_ctx(), &op);
+ s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx,
+ grpc_error *error) {}));
f.FlushExecCtx();
track_counters.Finish(state);
grpc_metadata_batch_destroy(f.exec_ctx(), &b);
diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
index 135b4710ce..59fb29dd60 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
@@ -105,7 +105,7 @@ class TrickledCHTTP2 : public EndpointPairFixture {
(double)state.iterations());
}
- void Log(int64_t iteration) {
+ void Log(int64_t iteration) GPR_ATTRIBUTE_NO_TSAN {
auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_);
grpc_chttp2_transport* client =
reinterpret_cast<grpc_chttp2_transport*>(client_transport_);
@@ -193,7 +193,8 @@ class TrickledCHTTP2 : public EndpointPairFixture {
return p;
}
- void UpdateStats(grpc_chttp2_transport* t, Stats* s, size_t backlog) {
+ void UpdateStats(grpc_chttp2_transport* t, Stats* s,
+ size_t backlog) GPR_ATTRIBUTE_NO_TSAN {
if (backlog == 0) {
if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != NULL) {
s->streams_stalled_due_to_stream_flow_control++;