diff options
author | 2017-09-11 12:03:36 -0700 | |
---|---|---|
committer | 2017-09-11 12:03:36 -0700 | |
commit | 55c4b31389d5557b88d39bde6d783d68aa747de7 (patch) | |
tree | 57b63eb6f54ab024326468cfe36828eff3794cd9 /test | |
parent | 4a7e0594cc8434a6834ac61cb0b1198ac859affd (diff) | |
parent | 8e71287bd768dd9b8698363d062a817c39f0e07d (diff) |
Merge pull request #11758 from ctiller/write_completion
Write completion changes
Diffstat (limited to 'test')
-rw-r--r-- | test/core/bad_client/bad_client.c | 8 | ||||
-rw-r--r-- | test/core/end2end/bad_server_response_test.c | 6 | ||||
-rw-r--r-- | test/core/end2end/tests/resource_quota_server.c | 8 | ||||
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 12 | ||||
-rw-r--r-- | test/core/security/secure_endpoint_test.c | 6 | ||||
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 6 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 16 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_chttp2_transport.cc | 166 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 5 |
9 files changed, 146 insertions, 87 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index c3964ca84b..383d1240cb 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -45,18 +45,18 @@ typedef struct { } thd_args; static void thd_func(void *arg) { - thd_args *a = arg; + thd_args *a = (thd_args *)arg; a->validator(a->server, a->cq, a->registered_method); gpr_event_set(&a->done_thd, (void *)1); } static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - thd_args *a = arg; + thd_args *a = (thd_args *)arg; gpr_event_set(&a->done_write, (void *)1); } static void server_setup_transport(void *ts, grpc_transport *transport) { - thd_args *a = ts; + thd_args *a = (thd_args *)ts; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_server_setup_transport(&exec_ctx, a->server, transport, NULL, grpc_server_get_channel_args(a->server)); @@ -64,7 +64,7 @@ static void server_setup_transport(void *ts, grpc_transport *transport) { } static void read_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - gpr_event *read_done = arg; + gpr_event *read_done = (gpr_event *)arg; gpr_event_set(read_done, (void *)1); } diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index 5f89058c45..eeabc769d3 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -136,7 +136,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { gpr_free(acceptor); - test_tcp_server *server = arg; + test_tcp_server *server = (test_tcp_server *)arg; GRPC_CLOSURE_INIT(&on_read, handle_read, NULL, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_write, done_write, NULL, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&state.temp_incoming_buffer); @@ -237,7 +237,7 @@ typedef struct { } poll_args; static void actually_poll_server(void *arg) { - poll_args *pa = arg; + poll_args *pa = (poll_args *)arg; gpr_timespec deadline = n_sec_deadline(10); while (true) { bool done = gpr_atm_acq_load(&state.done_atm) != 0; @@ -259,7 +259,7 @@ static void poll_server_until_read_done(test_tcp_server *server, gpr_atm_rel_store(&state.done_atm, 0); state.write_done = 0; gpr_thd_id id; - poll_args *pa = gpr_malloc(sizeof(*pa)); + poll_args *pa = (poll_args *)gpr_malloc(sizeof(*pa)); pa->server = server; pa->signal_when_done = signal_when_done; gpr_thd_new(&id, actually_poll_server, pa, NULL); diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index 57018628ce..0316920762 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -111,10 +111,10 @@ void resource_quota_server(grpc_end2end_test_config config) { grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024); #define NUM_CALLS 100 -#define CLIENT_BASE_TAG 1000 -#define SERVER_START_BASE_TAG 2000 -#define SERVER_RECV_BASE_TAG 3000 -#define SERVER_END_BASE_TAG 4000 +#define CLIENT_BASE_TAG 0x1000 +#define SERVER_START_BASE_TAG 0x2000 +#define SERVER_RECV_BASE_TAG 0x3000 +#define SERVER_END_BASE_TAG 0x4000 grpc_arg arg; arg.key = GRPC_ARG_RESOURCE_QUOTA; diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index c45068e7ec..cdaa2ce2af 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -89,7 +89,7 @@ static ssize_t fill_socket(int fd) { static size_t fill_socket_partial(int fd, size_t bytes) { ssize_t write_bytes; size_t total_bytes = 0; - unsigned char *buf = gpr_malloc(bytes); + unsigned char *buf = (unsigned char *)gpr_malloc(bytes); unsigned i; for (i = 0; i < bytes; ++i) { buf[i] = (uint8_t)(i % 256); @@ -267,7 +267,7 @@ struct write_socket_state { static grpc_slice *allocate_blocks(size_t num_bytes, size_t slice_size, size_t *num_blocks, uint8_t *current_data) { size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u); - grpc_slice *slices = gpr_malloc(sizeof(grpc_slice) * nslices); + grpc_slice *slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * nslices); size_t num_bytes_left = num_bytes; unsigned i, j; unsigned char *buf; @@ -301,7 +301,7 @@ static void write_done(grpc_exec_ctx *exec_ctx, } void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { - unsigned char *buf = gpr_malloc(read_size); + unsigned char *buf = (unsigned char *)gpr_malloc(read_size); ssize_t bytes_read; size_t bytes_left = num_bytes; int flags; @@ -404,7 +404,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { } void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *errors) { - int *done = arg; + int *done = (int *)arg; *done = 1; GPR_ASSERT( GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); @@ -548,7 +548,7 @@ static grpc_endpoint_test_config configs[] = { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(exec_ctx, p); + grpc_pollset_destroy(exec_ctx, (grpc_pollset *)p); } int main(int argc, char **argv) { @@ -556,7 +556,7 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_zalloc(grpc_pollset_size()); + g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(g_pollset, &g_mu); grpc_endpoint_tests(configs[0], g_pollset, g_mu); run_tests(); diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index e9f2c76738..839a05fa9b 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -70,7 +70,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( size_t still_pending_size; size_t total_buffer_size = 8192; size_t buffer_size = total_buffer_size; - uint8_t *encrypted_buffer = gpr_malloc(buffer_size); + uint8_t *encrypted_buffer = (uint8_t *)gpr_malloc(buffer_size); uint8_t *cur = encrypted_buffer; grpc_slice encrypted_leftover; for (i = 0; i < leftover_nslices; i++) { @@ -202,7 +202,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(exec_ctx, p); + grpc_pollset_destroy(exec_ctx, (grpc_pollset *)p); } int main(int argc, char **argv) { @@ -211,7 +211,7 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_zalloc(grpc_pollset_size()); + g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(g_pollset, &g_mu); grpc_endpoint_tests(configs[0], g_pollset, g_mu); grpc_endpoint_tests(configs[1], g_pollset, g_mu); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index e841a702d4..41090d161a 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -266,6 +266,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { } void TearDown() override { + gpr_tls_set(&g_is_async_end2end_test, 0); server_->Shutdown(); void* ignored_tag; bool ignored_ok; @@ -274,7 +275,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { ; stub_.reset(); poll_overrider_.reset(); - gpr_tls_set(&g_is_async_end2end_test, 0); grpc_recycle_unused_port(port_); } @@ -396,6 +396,7 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { ResetStub(); SendRpc(1); EXPECT_EQ(0, notify); + gpr_tls_set(&g_is_async_end2end_test, 0); server_->Shutdown(); wait_thread.join(); EXPECT_EQ(1, notify); @@ -404,8 +405,9 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { TEST_P(AsyncEnd2endTest, ShutdownThenWait) { ResetStub(); SendRpc(1); - server_->Shutdown(); + std::thread t([this]() { server_->Shutdown(); }); server_->Wait(); + t.join(); } // Test a simple RPC using the async version of Next diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 1f4861a7e6..e54cd03ca2 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -757,6 +757,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { ResetStub(); EchoRequest request; diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 936681fec1..070034fe33 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -29,6 +29,7 @@ extern "C" { #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" @@ -154,23 +155,59 @@ class Fixture { grpc_transport *t_; }; -static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template <class F> +std::unique_ptr<Closure> MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + GRPC_CLOSURE_INIT(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr<Closure>(new C(f, sched)); +} + +template <class F> +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + delete static_cast<C *>(arg); + } + }; + auto *c = new C{f}; + return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); +} class Stream { public: Stream(Fixture *f) : f_(f) { - GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); stream_size_ = grpc_transport_stream_size(f->transport()); stream_ = gpr_malloc(stream_size_); arena_ = gpr_arena_create(4096); } ~Stream() { + gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_free(stream_); gpr_arena_destroy(arena_); } void Init(benchmark::State &state) { + GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this, + "test_stream"); + gpr_event_init(&done_); memset(stream_, 0, stream_size_); if ((state.iterations() & 0xffff) == 0) { gpr_arena_destroy(arena_); @@ -181,13 +218,17 @@ class Stream { NULL, arena_); } - void DestroyThen(grpc_closure *closure) { - grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), - static_cast<grpc_stream *>(stream_), closure); + void DestroyThen(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { + destroy_closure_ = closure; +#ifndef NDEBUG + grpc_stream_unref(exec_ctx, &refcount_, "DestroyThen"); +#else + grpc_stream_unref(exec_ctx, &refcount_); +#endif } - void Op(grpc_transport_stream_op_batch *op) { - grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(), + void Op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op) { + grpc_transport_perform_stream_op(exec_ctx, f_->transport(), static_cast<grpc_stream *>(stream_), op); } @@ -196,48 +237,24 @@ class Stream { } private: + static void FinishDestroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + auto stream = static_cast<Stream *>(arg); + grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(), + static_cast<grpc_stream *>(stream->stream_), + stream->destroy_closure_); + gpr_event_set(&stream->done_, (void *)1); + } + Fixture *f_; grpc_stream_refcount refcount_; gpr_arena *arena_; size_t stream_size_; void *stream_; + grpc_closure *destroy_closure_ = nullptr; + gpr_event done_; }; -class Closure : public grpc_closure { - public: - virtual ~Closure() {} -}; - -template <class F> -std::unique_ptr<Closure> MakeClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public Closure { - C(const F &f, grpc_closure_scheduler *sched) : f_(f) { - GRPC_CLOSURE_INIT(this, Execute, this, sched); - } - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast<C *>(arg)->f_(exec_ctx, error); - } - }; - return std::unique_ptr<Closure>(new C(f, sched)); -} - -template <class F> -grpc_closure *MakeOnceClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public grpc_closure { - C(const F &f) : f_(f) {} - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast<C *>(arg)->f_(exec_ctx, error); - delete static_cast<C *>(arg); - } - }; - auto *c = new C{f}; - return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); -} - //////////////////////////////////////////////////////////////////////////////// // Benchmarks // @@ -246,11 +263,18 @@ static void BM_StreamCreateDestroy(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + memset(&op, 0, sizeof(op)); + op.cancel_stream = true; + op.payload = &op_payload; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; std::unique_ptr<Closure> next = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; s.Init(state); - s.DestroyThen(next.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, next.get()); }); GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -314,14 +338,14 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { op.on_complete = done.get(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; - s.Op(&op); + s.Op(exec_ctx, &op); }); done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen(start.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, start.get()); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -348,22 +372,28 @@ static void BM_TransportEmptyOp(benchmark::State &state) { if (!state.KeepRunning()) return; reset_op(); op.on_complete = c.get(); - s.Op(&op); + s.Op(exec_ctx, &op); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + reset_op(); + op.cancel_stream = true; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); } BENCHMARK(BM_TransportEmptyOp); +std::vector<std::unique_ptr<gpr_event>> done_events; + static void BM_TransportStreamSend(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); - Stream s(&f); - s.Init(state); + auto s = std::unique_ptr<Stream>(new Stream(&f)); + s->Init(state); grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload op_payload; memset(&op_payload, 0, sizeof(op_payload)); @@ -390,11 +420,17 @@ static void BM_TransportStreamSend(benchmark::State &state) { grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); } + gpr_event *bm_done = new gpr_event; + gpr_event_init(bm_done); + std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { - if (!state.KeepRunning()) return; + if (!state.KeepRunning()) { + gpr_event_set(bm_done, (void *)1); + return; + } // force outgoing window to be yuge - s.chttp2_stream()->flow_control.remote_window_delta = + s->chttp2_stream()->flow_control.remote_window_delta = 1024 * 1024 * 1024; f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024; grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); @@ -402,23 +438,27 @@ static void BM_TransportStreamSend(benchmark::State &state) { op.on_complete = c.get(); op.send_message = true; op.payload->send_message.send_message = &send_stream.base; - s.Op(&op); + s->Op(exec_ctx, &op); }); reset_op(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; op.on_complete = c.get(); - s.Op(&op); + s->Op(f.exec_ctx(), &op); f.FlushExecCtx(); + gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME)); + done_events.emplace_back(bm_done); + reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s->Op(f.exec_ctx(), &op); + s->DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); + s.reset(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); grpc_slice_buffer_destroy(&send_buffer); @@ -535,7 +575,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.recv_message = true; op.payload->recv_message.recv_message = &recv_stream; op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(&op); + s.Op(exec_ctx, &op); f.PushInput(grpc_slice_ref(incoming_data)); }); @@ -578,7 +618,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.payload->recv_initial_metadata.recv_initial_metadata_ready = do_nothing.get(); op.on_complete = c.get(); - s.Op(&op); + s.Op(f.exec_ctx(), &op); f.PushInput(SLICE_FROM_BUFFER( "\x00\x00\x00\x04\x00\x00\x00\x00\x00" // Generated using: @@ -596,9 +636,9 @@ static void BM_TransportStreamRecv(benchmark::State &state) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 135b4710ce..59fb29dd60 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -105,7 +105,7 @@ class TrickledCHTTP2 : public EndpointPairFixture { (double)state.iterations()); } - void Log(int64_t iteration) { + void Log(int64_t iteration) GPR_ATTRIBUTE_NO_TSAN { auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_); grpc_chttp2_transport* client = reinterpret_cast<grpc_chttp2_transport*>(client_transport_); @@ -193,7 +193,8 @@ class TrickledCHTTP2 : public EndpointPairFixture { return p; } - void UpdateStats(grpc_chttp2_transport* t, Stats* s, size_t backlog) { + void UpdateStats(grpc_chttp2_transport* t, Stats* s, + size_t backlog) GPR_ATTRIBUTE_NO_TSAN { if (backlog == 0) { if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != NULL) { s->streams_stalled_due_to_stream_flow_control++; |