diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-06 09:05:05 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-06 09:05:05 -0800 |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /test/cpp | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'test/cpp')
24 files changed, 626 insertions, 557 deletions
diff --git a/test/cpp/client/client_channel_stress_test.cc b/test/cpp/client/client_channel_stress_test.cc index e829d5278b..0954b28df0 100644 --- a/test/cpp/client/client_channel_stress_test.cc +++ b/test/cpp/client/client_channel_stress_test.cc @@ -212,13 +212,13 @@ class ClientChannelStressTest { }; void SetNextResolution(const std::vector<AddressData>& address_data) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_lb_addresses* addresses = grpc_lb_addresses_create(address_data.size(), nullptr); for (size_t i = 0; i < address_data.size(); ++i) { char* lb_uri_str; gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port); - grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); + grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true); GPR_ASSERT(lb_uri != nullptr); grpc_lb_addresses_set_address_from_uri( addresses, i, lb_uri, address_data[i].is_balancer, @@ -228,9 +228,10 @@ class ClientChannelStressTest { } grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses); grpc_channel_args fake_result = {1, &fake_addresses}; - grpc_fake_resolver_response_generator_set_response(response_generator_, - &fake_result); - grpc_lb_addresses_destroy(addresses); + grpc_fake_resolver_response_generator_set_response( + &exec_ctx, response_generator_, &fake_result); + grpc_lb_addresses_destroy(&exec_ctx, addresses); + grpc_exec_ctx_finish(&exec_ctx); } void KeepSendingRequests() { diff --git a/test/cpp/common/channel_arguments_test.cc b/test/cpp/common/channel_arguments_test.cc index f330c01281..d6ed2e5aa2 100644 --- a/test/cpp/common/channel_arguments_test.cc +++ b/test/cpp/common/channel_arguments_test.cc @@ -249,8 +249,5 @@ TEST_F(ChannelArgumentsTest, SetUserAgentPrefix) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_init(); - int ret = RUN_ALL_TESTS(); - grpc_shutdown(); - return ret; + return RUN_ALL_TESTS(); } diff --git a/test/cpp/common/channel_filter_test.cc b/test/cpp/common/channel_filter_test.cc index 7bdd53f9e7..638518107b 100644 --- a/test/cpp/common/channel_filter_test.cc +++ b/test/cpp/common/channel_filter_test.cc @@ -28,7 +28,7 @@ class MyChannelData : public ChannelData { public: MyChannelData() {} - grpc_error* Init(grpc_channel_element* elem, + grpc_error* Init(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) override { (void)args->channel_args; // Make sure field is available. return GRPC_ERROR_NONE; @@ -39,7 +39,7 @@ class MyCallData : public CallData { public: MyCallData() {} - grpc_error* Init(grpc_call_element* elem, + grpc_error* Init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) override { (void)args->path; // Make sure field is available. return GRPC_ERROR_NONE; diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e6e6e71f42..f8bb12fde1 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -112,13 +112,13 @@ class ClientLbEnd2endTest : public ::testing::Test { } void SetNextResolution(const std::vector<int>& ports) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_lb_addresses* addresses = grpc_lb_addresses_create(ports.size(), nullptr); for (size_t i = 0; i < ports.size(); ++i) { char* lb_uri_str; gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]); - grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); + grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true); GPR_ASSERT(lb_uri != nullptr); grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri, false /* is balancer */, @@ -130,10 +130,11 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc_lb_addresses_create_channel_arg(addresses); grpc_channel_args* fake_result = grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1); - grpc_fake_resolver_response_generator_set_response(response_generator_, - fake_result); - grpc_channel_args_destroy(fake_result); - grpc_lb_addresses_destroy(addresses); + grpc_fake_resolver_response_generator_set_response( + &exec_ctx, response_generator_, fake_result); + grpc_channel_args_destroy(&exec_ctx, fake_result); + grpc_lb_addresses_destroy(&exec_ctx, addresses); + grpc_exec_ctx_finish(&exec_ctx); } void ResetStub(const grpc::string& lb_policy_name = "") { diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index c4430379db..f260ea0016 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -100,7 +100,7 @@ int GetCallCounterValue() { class ChannelDataImpl : public ChannelData { public: - grpc_error* Init(grpc_channel_element* elem, + grpc_error* Init(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { IncrementConnectionCounter(); return GRPC_ERROR_NONE; @@ -109,12 +109,13 @@ class ChannelDataImpl : public ChannelData { class CallDataImpl : public CallData { public: - void StartTransportStreamOpBatch(grpc_call_element* elem, + void StartTransportStreamOpBatch(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, TransportStreamOpBatch* op) override { // Incrementing the counter could be done from Init(), but we want // to test that the individual methods are actually called correctly. if (op->recv_initial_metadata() != nullptr) IncrementCallCounter(); - grpc_call_next_op(elem, op->op()); + grpc_call_next_op(exec_ctx, elem, op->op()); } }; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index d4ee6b429f..bbf3da4663 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -454,13 +454,13 @@ class GrpclbEnd2endTest : public ::testing::Test { }; void SetNextResolution(const std::vector<AddressData>& address_data) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_lb_addresses* addresses = grpc_lb_addresses_create(address_data.size(), nullptr); for (size_t i = 0; i < address_data.size(); ++i) { char* lb_uri_str; gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port); - grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); + grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true); GPR_ASSERT(lb_uri != nullptr); grpc_lb_addresses_set_address_from_uri( addresses, i, lb_uri, address_data[i].is_balancer, @@ -470,9 +470,10 @@ class GrpclbEnd2endTest : public ::testing::Test { } grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses); grpc_channel_args fake_result = {1, &fake_addresses}; - grpc_fake_resolver_response_generator_set_response(response_generator_, - &fake_result); - grpc_lb_addresses_destroy(addresses); + grpc_fake_resolver_response_generator_set_response( + &exec_ctx, response_generator_, &fake_result); + grpc_lb_addresses_destroy(&exec_ctx, addresses); + grpc_exec_ctx_finish(&exec_ctx); } const std::vector<int> GetBackendPorts(const size_t start_index = 0) const { diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc index a494d6f519..7b62080b49 100644 --- a/test/cpp/grpclb/grpclb_api_test.cc +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -17,7 +17,6 @@ */ #include <grpc++/impl/codegen/config.h> -#include <grpc/grpc.h> #include <gtest/gtest.h> #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" @@ -136,8 +135,5 @@ TEST_F(GrpclbTest, ParseResponseServerList) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_init(); - int ret = RUN_ALL_TESTS(); - grpc_shutdown(); - return ret; + return RUN_ALL_TESTS(); } diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 64c53b1442..a469fbb7e3 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -562,7 +562,7 @@ static void perform_request(client_fixture* cf) { #define BALANCERS_NAME "lb.name" static void setup_client(const server_fixture* lb_server, const server_fixture* backends, client_fixture* cf) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; char* expected_target_names = nullptr; const char* backends_name = lb_server->servers_hostport; @@ -574,7 +574,7 @@ static void setup_client(const server_fixture* lb_server, grpc_lb_addresses* addresses = grpc_lb_addresses_create(1, nullptr); char* lb_uri_str; gpr_asprintf(&lb_uri_str, "ipv4:%s", lb_server->servers_hostport); - grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); + grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true); GPR_ASSERT(lb_uri != nullptr); grpc_lb_addresses_set_address_from_uri(addresses, 0, lb_uri, true, lb_server->balancer_name, nullptr); @@ -586,7 +586,7 @@ static void setup_client(const server_fixture* lb_server, grpc_lb_addresses_create_channel_arg(addresses); grpc_channel_args* fake_result = grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1); - grpc_lb_addresses_destroy(addresses); + grpc_lb_addresses_destroy(&exec_ctx, addresses); const grpc_arg new_args[] = { grpc_fake_transport_expected_targets_arg(expected_target_names), @@ -601,12 +601,13 @@ static void setup_client(const server_fixture* lb_server, grpc_fake_transport_security_credentials_create(); cf->client = grpc_secure_channel_create(fake_creds, cf->server_uri, args, nullptr); - grpc_fake_resolver_response_generator_set_response(response_generator, - fake_result); - grpc_channel_args_destroy(fake_result); - grpc_channel_credentials_unref(fake_creds); - grpc_channel_args_destroy(args); + grpc_fake_resolver_response_generator_set_response( + &exec_ctx, response_generator, fake_result); + grpc_channel_args_destroy(&exec_ctx, fake_result); + grpc_channel_credentials_unref(&exec_ctx, fake_creds); + grpc_channel_args_destroy(&exec_ctx, args); grpc_fake_resolver_response_generator_unref(response_generator); + grpc_exec_ctx_finish(&exec_ctx); } static void teardown_client(client_fixture* cf) { diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 5c2c38c27d..a45c577320 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -311,9 +311,12 @@ static void BM_LameChannelCallCreateCoreSeparateBatch(benchmark::State& state) { } BENCHMARK(BM_LameChannelCallCreateCoreSeparateBatch); -static void FilterDestroy(void* arg, grpc_error* error) { gpr_free(arg); } +static void FilterDestroy(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + gpr_free(arg); +} -static void DoNothing(void* arg, grpc_error* error) {} +static void DoNothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} class FakeClientChannelFactory : public grpc_client_channel_factory { public: @@ -321,12 +324,15 @@ class FakeClientChannelFactory : public grpc_client_channel_factory { private: static void NoRef(grpc_client_channel_factory* factory) {} - static void NoUnref(grpc_client_channel_factory* factory) {} - static grpc_subchannel* CreateSubchannel(grpc_client_channel_factory* factory, + static void NoUnref(grpc_exec_ctx* exec_ctx, + grpc_client_channel_factory* factory) {} + static grpc_subchannel* CreateSubchannel(grpc_exec_ctx* exec_ctx, + grpc_client_channel_factory* factory, const grpc_subchannel_args* args) { return nullptr; } - static grpc_channel* CreateClientChannel(grpc_client_channel_factory* factory, + static grpc_channel* CreateClientChannel(grpc_exec_ctx* exec_ctx, + grpc_client_channel_factory* factory, const char* target, grpc_client_channel_type type, const grpc_channel_args* args) { @@ -360,32 +366,36 @@ struct Fixture { namespace dummy_filter { -static void StartTransportStreamOp(grpc_call_element* elem, +static void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_transport_stream_op_batch* op) {} -static void StartTransportOp(grpc_channel_element* elem, +static void StartTransportOp(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_transport_op* op) {} -static grpc_error* InitCallElem(grpc_call_element* elem, +static grpc_error* InitCallElem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { return GRPC_ERROR_NONE; } -static void SetPollsetOrPollsetSet(grpc_call_element* elem, +static void SetPollsetOrPollsetSet(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_polling_entity* pollent) {} -static void DestroyCallElem(grpc_call_element* elem, +static void DestroyCallElem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_sched_closure) {} -grpc_error* InitChannelElem(grpc_channel_element* elem, +grpc_error* InitChannelElem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { return GRPC_ERROR_NONE; } -void DestroyChannelElem(grpc_channel_element* elem) {} +void DestroyChannelElem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem) {} -void GetChannelInfo(grpc_channel_element* elem, +void GetChannelInfo(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, const grpc_channel_info* channel_info) {} static const grpc_channel_filter dummy_filter = {StartTransportStreamOp, @@ -412,38 +422,41 @@ size_t sizeof_stream; /* = sizeof(transport stream) */ const char* name; /* implementation of grpc_transport_init_stream */ -int InitStream(grpc_transport* self, grpc_stream* stream, - grpc_stream_refcount* refcount, const void* server_data, - gpr_arena* arena) { +int InitStream(grpc_exec_ctx* exec_ctx, grpc_transport* self, + grpc_stream* stream, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) { return 0; } /* implementation of grpc_transport_set_pollset */ -void SetPollset(grpc_transport* self, grpc_stream* stream, - grpc_pollset* pollset) {} +void SetPollset(grpc_exec_ctx* exec_ctx, grpc_transport* self, + grpc_stream* stream, grpc_pollset* pollset) {} /* implementation of grpc_transport_set_pollset */ -void SetPollsetSet(grpc_transport* self, grpc_stream* stream, - grpc_pollset_set* pollset_set) {} +void SetPollsetSet(grpc_exec_ctx* exec_ctx, grpc_transport* self, + grpc_stream* stream, grpc_pollset_set* pollset_set) {} /* implementation of grpc_transport_perform_stream_op */ -void PerformStreamOp(grpc_transport* self, grpc_stream* stream, - grpc_transport_stream_op_batch* op) { - GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE); +void PerformStreamOp(grpc_exec_ctx* exec_ctx, grpc_transport* self, + grpc_stream* stream, grpc_transport_stream_op_batch* op) { + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_NONE); } /* implementation of grpc_transport_perform_op */ -void PerformOp(grpc_transport* self, grpc_transport_op* op) {} +void PerformOp(grpc_exec_ctx* exec_ctx, grpc_transport* self, + grpc_transport_op* op) {} /* implementation of grpc_transport_destroy_stream */ -void DestroyStream(grpc_transport* self, grpc_stream* stream, - grpc_closure* then_sched_closure) {} +void DestroyStream(grpc_exec_ctx* exec_ctx, grpc_transport* self, + grpc_stream* stream, grpc_closure* then_sched_closure) {} /* implementation of grpc_transport_destroy */ -void Destroy(grpc_transport* self) {} +void Destroy(grpc_exec_ctx* exec_ctx, grpc_transport* self) {} /* implementation of grpc_transport_get_endpoint */ -grpc_endpoint* GetEndpoint(grpc_transport* self) { return nullptr; } +grpc_endpoint* GetEndpoint(grpc_exec_ctx* exec_ctx, grpc_transport* self) { + return nullptr; +} static const grpc_transport_vtable dummy_transport_vtable = { 0, "dummy_http2", InitStream, @@ -459,8 +472,8 @@ class NoOp { public: class Op { public: - Op(NoOp* p, grpc_call_stack* s) {} - void Finish() {} + Op(grpc_exec_ctx* exec_ctx, NoOp* p, grpc_call_stack* s) {} + void Finish(grpc_exec_ctx* exec_ctx) {} }; }; @@ -476,11 +489,13 @@ class SendEmptyMetadata { class Op { public: - Op(SendEmptyMetadata* p, grpc_call_stack* s) { + Op(grpc_exec_ctx* exec_ctx, SendEmptyMetadata* p, grpc_call_stack* s) { grpc_metadata_batch_init(&batch_); p->op_payload_.send_initial_metadata.send_initial_metadata = &batch_; } - void Finish() { grpc_metadata_batch_destroy(&batch_); } + void Finish(grpc_exec_ctx* exec_ctx) { + grpc_metadata_batch_destroy(exec_ctx, &batch_); + } private: grpc_metadata_batch batch_; @@ -521,20 +536,20 @@ static void BM_IsolatedFilter(benchmark::State& state) { label << " #has_dummy_filter"; } - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t channel_size = grpc_channel_stack_size( filters.size() == 0 ? nullptr : &filters[0], filters.size()); grpc_channel_stack* channel_stack = static_cast<grpc_channel_stack*>(gpr_zalloc(channel_size)); GPR_ASSERT(GRPC_LOG_IF_ERROR( "channel_stack_init", - grpc_channel_stack_init(1, FilterDestroy, channel_stack, &filters[0], - filters.size(), &channel_args, + grpc_channel_stack_init(&exec_ctx, 1, FilterDestroy, channel_stack, + &filters[0], filters.size(), &channel_args, fixture.flags & REQUIRES_TRANSPORT ? &dummy_transport::dummy_transport : nullptr, "CHANNEL", channel_stack))); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); grpc_call_stack* call_stack = static_cast<grpc_call_stack*>(gpr_zalloc(channel_stack->call_stack_size)); grpc_millis deadline = GRPC_MILLIS_INF_FUTURE; @@ -553,12 +568,12 @@ static void BM_IsolatedFilter(benchmark::State& state) { call_args.arena = gpr_arena_create(kArenaSize); while (state.KeepRunning()) { GPR_TIMER_SCOPE("BenchmarkCycle", 0); - GRPC_ERROR_UNREF( - grpc_call_stack_init(channel_stack, 1, DoNothing, nullptr, &call_args)); - typename TestOp::Op op(&test_op_data, call_stack); - grpc_call_stack_destroy(call_stack, &final_info, nullptr); - op.Finish(); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1, + DoNothing, nullptr, &call_args)); + typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack); + grpc_call_stack_destroy(&exec_ctx, call_stack, &final_info, nullptr); + op.Finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); // recreate arena every 64k iterations to avoid oom if (0 == (state.iterations() & 0xffff)) { gpr_arena_destroy(call_args.arena); @@ -566,8 +581,8 @@ static void BM_IsolatedFilter(benchmark::State& state) { } } gpr_arena_destroy(call_args.arena); - grpc_channel_stack_destroy(channel_stack); - + grpc_channel_stack_destroy(&exec_ctx, channel_stack); + grpc_exec_ctx_finish(&exec_ctx); gpr_free(channel_stack); gpr_free(call_stack); @@ -617,55 +632,59 @@ typedef struct { grpc_call_combiner* call_combiner; } call_data; -static void StartTransportStreamOp(grpc_call_element* elem, +static void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_transport_stream_op_batch* op) { call_data* calld = static_cast<call_data*>(elem->call_data); if (op->recv_initial_metadata) { GRPC_CALL_COMBINER_START( - calld->call_combiner, + exec_ctx, calld->call_combiner, op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE, "recv_initial_metadata"); } if (op->recv_message) { - GRPC_CALL_COMBINER_START(calld->call_combiner, + GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE, "recv_message"); } - GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_NONE); } -static void StartTransportOp(grpc_channel_element* elem, +static void StartTransportOp(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_transport_op* op) { if (op->disconnect_with_error != GRPC_ERROR_NONE) { GRPC_ERROR_UNREF(op->disconnect_with_error); } - GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); } -static grpc_error* InitCallElem(grpc_call_element* elem, +static grpc_error* InitCallElem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = static_cast<call_data*>(elem->call_data); calld->call_combiner = args->call_combiner; return GRPC_ERROR_NONE; } -static void SetPollsetOrPollsetSet(grpc_call_element* elem, +static void SetPollsetOrPollsetSet(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_polling_entity* pollent) {} -static void DestroyCallElem(grpc_call_element* elem, +static void DestroyCallElem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_sched_closure) { - GRPC_CLOSURE_SCHED(then_sched_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, then_sched_closure, GRPC_ERROR_NONE); } -grpc_error* InitChannelElem(grpc_channel_element* elem, +grpc_error* InitChannelElem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { return GRPC_ERROR_NONE; } -void DestroyChannelElem(grpc_channel_element* elem) {} +void DestroyChannelElem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem) {} -void GetChannelInfo(grpc_channel_element* elem, +void GetChannelInfo(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, const grpc_channel_info* channel_info) {} static const grpc_channel_filter isolated_call_filter = { @@ -692,8 +711,10 @@ class IsolatedCallFixture : public TrackCounters { builder, &isolated_call_filter::isolated_call_filter, nullptr, nullptr)); { - grpc_core::ExecCtx exec_ctx; - channel_ = grpc_channel_create_with_builder(builder, GRPC_CLIENT_CHANNEL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + channel_ = grpc_channel_create_with_builder(&exec_ctx, builder, + GRPC_CLIENT_CHANNEL); + grpc_exec_ctx_finish(&exec_ctx); } cq_ = grpc_completion_queue_create_for_next(nullptr); } diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index 4b7310389c..3fff8b02d6 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -50,22 +50,22 @@ static grpc_slice MakeSlice(std::vector<uint8_t> bytes) { static void BM_HpackEncoderInitDestroy(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_chttp2_hpack_compressor c; while (state.KeepRunning()) { grpc_chttp2_hpack_compressor_init(&c); - grpc_chttp2_hpack_compressor_destroy(&c); - grpc_core::ExecCtx::Get()->Flush(); + grpc_chttp2_hpack_compressor_destroy(&exec_ctx, &c); + grpc_exec_ctx_flush(&exec_ctx); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_HpackEncoderInitDestroy); static void BM_HpackEncoderEncodeDeadline(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; - grpc_millis saved_now = grpc_core::ExecCtx::Get()->Now(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_millis saved_now = grpc_exec_ctx_now(&exec_ctx); grpc_metadata_batch b; grpc_metadata_batch_init(&b); @@ -85,13 +85,14 @@ static void BM_HpackEncoderEncodeDeadline(benchmark::State& state) { (size_t)1024, &stats, }; - grpc_chttp2_encode_header(&c, nullptr, 0, &b, &hopt, &outbuf); - grpc_slice_buffer_reset_and_unref_internal(&outbuf); - grpc_core::ExecCtx::Get()->Flush(); + grpc_chttp2_encode_header(&exec_ctx, &c, nullptr, 0, &b, &hopt, &outbuf); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &outbuf); + grpc_exec_ctx_flush(&exec_ctx); } - grpc_metadata_batch_destroy(&b); - grpc_chttp2_hpack_compressor_destroy(&c); - grpc_slice_buffer_destroy_internal(&outbuf); + grpc_metadata_batch_destroy(&exec_ctx, &b); + grpc_chttp2_hpack_compressor_destroy(&exec_ctx, &c); + grpc_slice_buffer_destroy_internal(&exec_ctx, &outbuf); + grpc_exec_ctx_finish(&exec_ctx); std::ostringstream label; label << "framing_bytes/iter:" @@ -108,16 +109,17 @@ BENCHMARK(BM_HpackEncoderEncodeDeadline); template <class Fixture> static void BM_HpackEncoderEncodeHeader(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; static bool logged_representative_output = false; grpc_metadata_batch b; grpc_metadata_batch_init(&b); - std::vector<grpc_mdelem> elems = Fixture::GetElems(); + std::vector<grpc_mdelem> elems = Fixture::GetElems(&exec_ctx); std::vector<grpc_linked_mdelem> storage(elems.size()); for (size_t i = 0; i < elems.size(); i++) { GPR_ASSERT(GRPC_LOG_IF_ERROR( - "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i]))); + "addmd", + grpc_metadata_batch_add_tail(&exec_ctx, &b, &storage[i], elems[i]))); } grpc_chttp2_hpack_compressor c; @@ -134,7 +136,7 @@ static void BM_HpackEncoderEncodeHeader(benchmark::State& state) { (size_t)state.range(1), &stats, }; - grpc_chttp2_encode_header(&c, nullptr, 0, &b, &hopt, &outbuf); + grpc_chttp2_encode_header(&exec_ctx, &c, nullptr, 0, &b, &hopt, &outbuf); if (!logged_representative_output && state.iterations() > 3) { logged_representative_output = true; for (size_t i = 0; i < outbuf.count; i++) { @@ -143,12 +145,13 @@ static void BM_HpackEncoderEncodeHeader(benchmark::State& state) { gpr_free(s); } } - grpc_slice_buffer_reset_and_unref_internal(&outbuf); - grpc_core::ExecCtx::Get()->Flush(); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &outbuf); + grpc_exec_ctx_flush(&exec_ctx); } - grpc_metadata_batch_destroy(&b); - grpc_chttp2_hpack_compressor_destroy(&c); - grpc_slice_buffer_destroy_internal(&outbuf); + grpc_metadata_batch_destroy(&exec_ctx, &b); + grpc_chttp2_hpack_compressor_destroy(&exec_ctx, &c); + grpc_slice_buffer_destroy_internal(&exec_ctx, &outbuf); + grpc_exec_ctx_finish(&exec_ctx); std::ostringstream label; label << "framing_bytes/iter:" @@ -166,13 +169,15 @@ namespace hpack_encoder_fixtures { class EmptyBatch { public: static constexpr bool kEnableTrueBinary = false; - static std::vector<grpc_mdelem> GetElems() { return {}; } + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { + return {}; + } }; class SingleStaticElem { public: static constexpr bool kEnableTrueBinary = false; - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { return {GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE}; } }; @@ -180,9 +185,9 @@ class SingleStaticElem { class SingleInternedElem { public: static constexpr bool kEnableTrueBinary = false; - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { return {grpc_mdelem_from_slices( - grpc_slice_intern(grpc_slice_from_static_string("abc")), + exec_ctx, grpc_slice_intern(grpc_slice_from_static_string("abc")), grpc_slice_intern(grpc_slice_from_static_string("def")))}; } }; @@ -191,10 +196,10 @@ template <int kLength, bool kTrueBinary> class SingleInternedBinaryElem { public: static constexpr bool kEnableTrueBinary = kTrueBinary; - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { grpc_slice bytes = MakeBytes(); std::vector<grpc_mdelem> out = {grpc_mdelem_from_slices( - grpc_slice_intern(grpc_slice_from_static_string("abc-bin")), + exec_ctx, grpc_slice_intern(grpc_slice_from_static_string("abc-bin")), grpc_slice_intern(bytes))}; grpc_slice_unref(bytes); return out; @@ -213,9 +218,9 @@ class SingleInternedBinaryElem { class SingleInternedKeyElem { public: static constexpr bool kEnableTrueBinary = false; - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { return {grpc_mdelem_from_slices( - grpc_slice_intern(grpc_slice_from_static_string("abc")), + exec_ctx, grpc_slice_intern(grpc_slice_from_static_string("abc")), grpc_slice_from_static_string("def"))}; } }; @@ -223,8 +228,9 @@ class SingleInternedKeyElem { class SingleNonInternedElem { public: static constexpr bool kEnableTrueBinary = false; - static std::vector<grpc_mdelem> GetElems() { - return {grpc_mdelem_from_slices(grpc_slice_from_static_string("abc"), + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { + return {grpc_mdelem_from_slices(exec_ctx, + grpc_slice_from_static_string("abc"), grpc_slice_from_static_string("def"))}; } }; @@ -233,9 +239,9 @@ template <int kLength, bool kTrueBinary> class SingleNonInternedBinaryElem { public: static constexpr bool kEnableTrueBinary = kTrueBinary; - static std::vector<grpc_mdelem> GetElems() { - return {grpc_mdelem_from_slices(grpc_slice_from_static_string("abc-bin"), - MakeBytes())}; + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { + return {grpc_mdelem_from_slices( + exec_ctx, grpc_slice_from_static_string("abc-bin"), MakeBytes())}; } private: @@ -251,21 +257,21 @@ class SingleNonInternedBinaryElem { class RepresentativeClientInitialMetadata { public: static constexpr bool kEnableTrueBinary = true; - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { return { GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_METHOD_POST, grpc_mdelem_from_slices( - GRPC_MDSTR_PATH, + exec_ctx, GRPC_MDSTR_PATH, grpc_slice_intern(grpc_slice_from_static_string("/foo/bar"))), - grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY, + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_AUTHORITY, grpc_slice_intern(grpc_slice_from_static_string( "foo.test.google.fr:1234"))), GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE_COMMA_GZIP, GRPC_MDELEM_TE_TRAILERS, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, grpc_mdelem_from_slices( - GRPC_MDSTR_USER_AGENT, + exec_ctx, GRPC_MDSTR_USER_AGENT, grpc_slice_intern(grpc_slice_from_static_string( "grpc-c/3.0.0-dev (linux; chttp2; green)")))}; } @@ -277,18 +283,18 @@ class RepresentativeClientInitialMetadata { class MoreRepresentativeClientInitialMetadata { public: static constexpr bool kEnableTrueBinary = true; - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { return { GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_METHOD_POST, - grpc_mdelem_from_slices(GRPC_MDSTR_PATH, + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, grpc_slice_intern(grpc_slice_from_static_string( "/grpc.test.FooService/BarMethod"))), - grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY, + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_AUTHORITY, grpc_slice_intern(grpc_slice_from_static_string( "foo.test.google.fr:1234"))), grpc_mdelem_from_slices( - GRPC_MDSTR_GRPC_TRACE_BIN, + exec_ctx, GRPC_MDSTR_GRPC_TRACE_BIN, grpc_slice_from_static_string("\x00\x01\x02\x03\x04\x05\x06\x07\x08" "\x09\x0a\x0b\x0c\x0d\x0e\x0f" "\x10\x11\x12\x13\x14\x15\x16\x17\x18" @@ -297,7 +303,7 @@ class MoreRepresentativeClientInitialMetadata { "\x29\x2a\x2b\x2c\x2d\x2e\x2f" "\x30")), grpc_mdelem_from_slices( - GRPC_MDSTR_GRPC_TAGS_BIN, + exec_ctx, GRPC_MDSTR_GRPC_TAGS_BIN, grpc_slice_from_static_string("\x00\x01\x02\x03\x04\x05\x06\x07\x08" "\x09\x0a\x0b\x0c\x0d\x0e\x0f" "\x10\x11\x12\x13")), @@ -305,7 +311,7 @@ class MoreRepresentativeClientInitialMetadata { GRPC_MDELEM_TE_TRAILERS, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, grpc_mdelem_from_slices( - GRPC_MDSTR_USER_AGENT, + exec_ctx, GRPC_MDSTR_USER_AGENT, grpc_slice_intern(grpc_slice_from_static_string( "grpc-c/3.0.0-dev (linux; chttp2; green)")))}; } @@ -314,7 +320,7 @@ class MoreRepresentativeClientInitialMetadata { class RepresentativeServerInitialMetadata { public: static constexpr bool kEnableTrueBinary = true; - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { return {GRPC_MDELEM_STATUS_200, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE_COMMA_GZIP}; @@ -324,7 +330,7 @@ class RepresentativeServerInitialMetadata { class RepresentativeServerTrailingMetadata { public: static constexpr bool kEnableTrueBinary = true; - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { return {GRPC_MDELEM_GRPC_STATUS_0}; } }; @@ -425,45 +431,48 @@ BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, static void BM_HpackParserInitDestroy(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_chttp2_hpack_parser p; while (state.KeepRunning()) { - grpc_chttp2_hpack_parser_init(&p); - grpc_chttp2_hpack_parser_destroy(&p); - grpc_core::ExecCtx::Get()->Flush(); + grpc_chttp2_hpack_parser_init(&exec_ctx, &p); + grpc_chttp2_hpack_parser_destroy(&exec_ctx, &p); + grpc_exec_ctx_flush(&exec_ctx); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_HpackParserInitDestroy); -static void UnrefHeader(void* user_data, grpc_mdelem md) { - GRPC_MDELEM_UNREF(md); +static void UnrefHeader(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_mdelem md) { + GRPC_MDELEM_UNREF(exec_ctx, md); } -template <class Fixture, void (*OnHeader)(void*, grpc_mdelem)> +template <class Fixture, void (*OnHeader)(grpc_exec_ctx*, void*, grpc_mdelem)> static void BM_HpackParserParseHeader(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; std::vector<grpc_slice> init_slices = Fixture::GetInitSlices(); std::vector<grpc_slice> benchmark_slices = Fixture::GetBenchmarkSlices(); grpc_chttp2_hpack_parser p; - grpc_chttp2_hpack_parser_init(&p); + grpc_chttp2_hpack_parser_init(&exec_ctx, &p); p.on_header = OnHeader; p.on_header_user_data = nullptr; for (auto slice : init_slices) { - GPR_ASSERT(GRPC_ERROR_NONE == grpc_chttp2_hpack_parser_parse(&p, slice)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_chttp2_hpack_parser_parse(&exec_ctx, &p, slice)); } while (state.KeepRunning()) { for (auto slice : benchmark_slices) { - GPR_ASSERT(GRPC_ERROR_NONE == grpc_chttp2_hpack_parser_parse(&p, slice)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_chttp2_hpack_parser_parse(&exec_ctx, &p, slice)); } - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); } for (auto slice : init_slices) grpc_slice_unref(slice); for (auto slice : benchmark_slices) grpc_slice_unref(slice); - grpc_chttp2_hpack_parser_destroy(&p); - + grpc_chttp2_hpack_parser_destroy(&exec_ctx, &p); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } @@ -760,7 +769,8 @@ class RepresentativeServerTrailingMetadata { static void free_timeout(void* p) { gpr_free(p); } // New implementation. -static void OnHeaderNew(void* user_data, grpc_mdelem md) { +static void OnHeaderNew(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_mdelem md) { if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) { grpc_millis* cached_timeout = static_cast<grpc_millis*>(grpc_mdelem_get_user_data(md, free_timeout)); @@ -783,7 +793,7 @@ static void OnHeaderNew(void* user_data, grpc_mdelem md) { } } benchmark::DoNotOptimize(timeout); - GRPC_MDELEM_UNREF(md); + GRPC_MDELEM_UNREF(exec_ctx, md); } else { GPR_ASSERT(0); } diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index fcb1677d09..be4da4d0bd 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -58,7 +58,7 @@ class DummyEndpoint : public grpc_endpoint { ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint"); } - void PushInput(grpc_slice slice) { + void PushInput(grpc_exec_ctx* exec_ctx, grpc_slice slice) { if (read_cb_ == nullptr) { GPR_ASSERT(!have_slice_); buffered_slice_ = slice; @@ -66,7 +66,7 @@ class DummyEndpoint : public grpc_endpoint { return; } grpc_slice_buffer_add(slices_, slice); - GRPC_CLOSURE_SCHED(read_cb_, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, read_cb_, GRPC_ERROR_NONE); read_cb_ = nullptr; } @@ -77,45 +77,50 @@ class DummyEndpoint : public grpc_endpoint { bool have_slice_ = false; grpc_slice buffered_slice_; - void QueueRead(grpc_slice_buffer* slices, grpc_closure* cb) { + void QueueRead(grpc_exec_ctx* exec_ctx, grpc_slice_buffer* slices, + grpc_closure* cb) { GPR_ASSERT(read_cb_ == nullptr); if (have_slice_) { have_slice_ = false; grpc_slice_buffer_add(slices, buffered_slice_); - GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE); return; } read_cb_ = cb; slices_ = slices; } - static void read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { - static_cast<DummyEndpoint*>(ep)->QueueRead(slices, cb); + static void read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, + grpc_slice_buffer* slices, grpc_closure* cb) { + static_cast<DummyEndpoint*>(ep)->QueueRead(exec_ctx, slices, cb); } - static void write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { - GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); + static void write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, + grpc_slice_buffer* slices, grpc_closure* cb) { + GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE); } static grpc_workqueue* get_workqueue(grpc_endpoint* ep) { return nullptr; } - static void add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {} + static void add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, + grpc_pollset* pollset) {} - static void add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) { - } + static void add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, + grpc_pollset_set* pollset) {} - static void delete_from_pollset_set(grpc_endpoint* ep, + static void delete_from_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_endpoint* ep, grpc_pollset_set* pollset) {} - static void shutdown(grpc_endpoint* ep, grpc_error* why) { - grpc_resource_user_shutdown(static_cast<DummyEndpoint*>(ep)->ru_); - GRPC_CLOSURE_SCHED(static_cast<DummyEndpoint*>(ep)->read_cb_, why); + static void shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, + grpc_error* why) { + grpc_resource_user_shutdown(exec_ctx, static_cast<DummyEndpoint*>(ep)->ru_); + GRPC_CLOSURE_SCHED(exec_ctx, static_cast<DummyEndpoint*>(ep)->read_cb_, + why); } - static void destroy(grpc_endpoint* ep) { - grpc_resource_user_unref(static_cast<DummyEndpoint*>(ep)->ru_); + static void destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { + grpc_resource_user_unref(exec_ctx, static_cast<DummyEndpoint*>(ep)->ru_); delete static_cast<DummyEndpoint*>(ep); } @@ -131,24 +136,29 @@ class Fixture { Fixture(const grpc::ChannelArguments& args, bool client) { grpc_channel_args c_args = args.c_channel_args(); ep_ = new DummyEndpoint; - t_ = grpc_create_chttp2_transport(&c_args, ep_, client); - grpc_chttp2_transport_start_reading(t_, nullptr, nullptr); + t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client); + grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr, nullptr); FlushExecCtx(); } - void FlushExecCtx() { grpc_core::ExecCtx::Get()->Flush(); } + void FlushExecCtx() { grpc_exec_ctx_flush(&exec_ctx_); } - ~Fixture() { grpc_transport_destroy(t_); } + ~Fixture() { + grpc_transport_destroy(&exec_ctx_, t_); + grpc_exec_ctx_finish(&exec_ctx_); + } grpc_chttp2_transport* chttp2_transport() { return reinterpret_cast<grpc_chttp2_transport*>(t_); } grpc_transport* transport() { return t_; } + grpc_exec_ctx* exec_ctx() { return &exec_ctx_; } - void PushInput(grpc_slice slice) { ep_->PushInput(slice); } + void PushInput(grpc_slice slice) { ep_->PushInput(exec_ctx(), slice); } private: DummyEndpoint* ep_; + grpc_exec_ctx exec_ctx_ = GRPC_EXEC_CTX_INIT; grpc_transport* t_; }; @@ -165,8 +175,8 @@ std::unique_ptr<Closure> MakeClosure( GRPC_CLOSURE_INIT(this, Execute, this, sched); } F f_; - static void Execute(void* arg, grpc_error* error) { - static_cast<C*>(arg)->f_(error); + static void Execute(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + static_cast<C*>(arg)->f_(exec_ctx, error); } }; return std::unique_ptr<Closure>(new C(f, sched)); @@ -178,8 +188,8 @@ grpc_closure* MakeOnceClosure( struct C : public grpc_closure { C(const F& f) : f_(f) {} F f_; - static void Execute(void* arg, grpc_error* error) { - static_cast<C*>(arg)->f_(error); + static void Execute(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + static_cast<C*>(arg)->f_(exec_ctx, error); delete static_cast<C*>(arg); } }; @@ -210,22 +220,22 @@ class Stream { gpr_arena_destroy(arena_); arena_ = gpr_arena_create(4096); } - grpc_transport_init_stream(f_->transport(), + grpc_transport_init_stream(f_->exec_ctx(), f_->transport(), static_cast<grpc_stream*>(stream_), &refcount_, nullptr, arena_); } - void DestroyThen(grpc_closure* closure) { + void DestroyThen(grpc_exec_ctx* exec_ctx, grpc_closure* closure) { destroy_closure_ = closure; #ifndef NDEBUG - grpc_stream_unref(&refcount_, "DestroyThen"); + grpc_stream_unref(exec_ctx, &refcount_, "DestroyThen"); #else - grpc_stream_unref(&refcount_); + grpc_stream_unref(exec_ctx, &refcount_); #endif } - void Op(grpc_transport_stream_op_batch* op) { - grpc_transport_perform_stream_op(f_->transport(), + void Op(grpc_exec_ctx* exec_ctx, grpc_transport_stream_op_batch* op) { + grpc_transport_perform_stream_op(exec_ctx, f_->transport(), static_cast<grpc_stream*>(stream_), op); } @@ -234,9 +244,10 @@ class Stream { } private: - static void FinishDestroy(void* arg, grpc_error* error) { + static void FinishDestroy(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { auto stream = static_cast<Stream*>(arg); - grpc_transport_destroy_stream(stream->f_->transport(), + grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(), static_cast<grpc_stream*>(stream->stream_), stream->destroy_closure_); gpr_event_set(&stream->done_, (void*)1); @@ -257,7 +268,6 @@ class Stream { static void BM_StreamCreateDestroy(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); grpc_transport_stream_op_batch op; @@ -266,13 +276,14 @@ static void BM_StreamCreateDestroy(benchmark::State& state) { op.cancel_stream = true; op.payload = &op_payload; op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - std::unique_ptr<Closure> next = MakeClosure([&](grpc_error* error) { - if (!state.KeepRunning()) return; - s.Init(state); - s.Op(&op); - s.DestroyThen(next.get()); - }); - GRPC_CLOSURE_RUN(next.get(), GRPC_ERROR_NONE); + std::unique_ptr<Closure> next = + MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { + if (!state.KeepRunning()) return; + s.Init(state); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, next.get()); + }); + GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); track_counters.Finish(state); } @@ -280,21 +291,21 @@ BENCHMARK(BM_StreamCreateDestroy); class RepresentativeClientInitialMetadata { public: - static std::vector<grpc_mdelem> GetElems() { + static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx* exec_ctx) { return { GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_METHOD_POST, grpc_mdelem_from_slices( - GRPC_MDSTR_PATH, + exec_ctx, GRPC_MDSTR_PATH, grpc_slice_intern(grpc_slice_from_static_string("/foo/bar"))), - grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY, + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_AUTHORITY, grpc_slice_intern(grpc_slice_from_static_string( "foo.test.google.fr:1234"))), GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE_COMMA_GZIP, GRPC_MDELEM_TE_TRAILERS, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, grpc_mdelem_from_slices( - GRPC_MDSTR_USER_AGENT, + exec_ctx, GRPC_MDSTR_USER_AGENT, grpc_slice_intern(grpc_slice_from_static_string( "grpc-c/3.0.0-dev (linux; chttp2; green)")))}; } @@ -303,7 +314,6 @@ class RepresentativeClientInitialMetadata { template <class Metadata> static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); grpc_transport_stream_op_batch op; @@ -320,33 +330,34 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State& state) { grpc_metadata_batch b; grpc_metadata_batch_init(&b); b.deadline = GRPC_MILLIS_INF_FUTURE; - std::vector<grpc_mdelem> elems = Metadata::GetElems(); + std::vector<grpc_mdelem> elems = Metadata::GetElems(f.exec_ctx()); std::vector<grpc_linked_mdelem> storage(elems.size()); for (size_t i = 0; i < elems.size(); i++) { GPR_ASSERT(GRPC_LOG_IF_ERROR( - "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i]))); + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); } f.FlushExecCtx(); - start = MakeClosure([&](grpc_error* error) { + start = MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { if (!state.KeepRunning()) return; s.Init(state); reset_op(); op.on_complete = done.get(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; - s.Op(&op); + s.Op(exec_ctx, &op); }); - done = MakeClosure([&](grpc_error* error) { + done = MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen(start.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, start.get()); }); - GRPC_CLOSURE_SCHED(start.get(), GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); - grpc_metadata_batch_destroy(&b); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); track_counters.Finish(state); } BENCHMARK_TEMPLATE(BM_StreamCreateSendInitialMetadataDestroy, @@ -354,7 +365,6 @@ BENCHMARK_TEMPLATE(BM_StreamCreateSendInitialMetadataDestroy, static void BM_TransportEmptyOp(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); s.Init(state); @@ -365,19 +375,21 @@ static void BM_TransportEmptyOp(benchmark::State& state) { memset(&op, 0, sizeof(op)); op.payload = &op_payload; }; - std::unique_ptr<Closure> c = MakeClosure([&](grpc_error* error) { - if (!state.KeepRunning()) return; - reset_op(); - op.on_complete = c.get(); - s.Op(&op); - }); - GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE); + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { + if (!state.KeepRunning()) return; + reset_op(); + op.on_complete = c.get(); + s.Op(exec_ctx, &op); + }); + GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); reset_op(); op.cancel_stream = true; op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen(MakeOnceClosure([](grpc_error* error) {})); + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx* exec_ctx, + grpc_error* error) {})); f.FlushExecCtx(); track_counters.Finish(state); } @@ -387,7 +399,6 @@ std::vector<std::unique_ptr<gpr_event>> done_events; static void BM_TransportStreamSend(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; Fixture f(grpc::ChannelArguments(), true); auto s = std::unique_ptr<Stream>(new Stream(&f)); s->Init(state); @@ -409,37 +420,39 @@ static void BM_TransportStreamSend(benchmark::State& state) { grpc_metadata_batch_init(&b); b.deadline = GRPC_MILLIS_INF_FUTURE; std::vector<grpc_mdelem> elems = - RepresentativeClientInitialMetadata::GetElems(); + RepresentativeClientInitialMetadata::GetElems(f.exec_ctx()); std::vector<grpc_linked_mdelem> storage(elems.size()); for (size_t i = 0; i < elems.size(); i++) { GPR_ASSERT(GRPC_LOG_IF_ERROR( - "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i]))); + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); } gpr_event* bm_done = new gpr_event; gpr_event_init(bm_done); - std::unique_ptr<Closure> c = MakeClosure([&](grpc_error* error) { - if (!state.KeepRunning()) { - gpr_event_set(bm_done, (void*)1); - return; - } - // force outgoing window to be yuge - s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); - f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); - grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); - reset_op(); - op.on_complete = c.get(); - op.send_message = true; - op.payload->send_message.send_message = &send_stream.base; - s->Op(&op); - }); + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { + if (!state.KeepRunning()) { + gpr_event_set(bm_done, (void*)1); + return; + } + // force outgoing window to be yuge + s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); + f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); + grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); + reset_op(); + op.on_complete = c.get(); + op.send_message = true; + op.payload->send_message.send_message = &send_stream.base; + s->Op(exec_ctx, &op); + }); reset_op(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; op.on_complete = c.get(); - s->Op(&op); + s->Op(f.exec_ctx(), &op); f.FlushExecCtx(); gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -448,12 +461,13 @@ static void BM_TransportStreamSend(benchmark::State& state) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s->Op(&op); - s->DestroyThen(MakeOnceClosure([](grpc_error* error) {})); + s->Op(f.exec_ctx(), &op); + s->DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx* exec_ctx, + grpc_error* error) {})); f.FlushExecCtx(); s.reset(); track_counters.Finish(state); - grpc_metadata_batch_destroy(&b); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); grpc_slice_buffer_destroy(&send_buffer); } BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024); @@ -517,7 +531,6 @@ static grpc_slice CreateIncomingDataSlice(size_t length, size_t frame_size) { static void BM_TransportStreamRecv(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); s.Init(state); @@ -538,14 +551,16 @@ static void BM_TransportStreamRecv(benchmark::State& state) { grpc_metadata_batch_init(&b_recv); b.deadline = GRPC_MILLIS_INF_FUTURE; std::vector<grpc_mdelem> elems = - RepresentativeClientInitialMetadata::GetElems(); + RepresentativeClientInitialMetadata::GetElems(f.exec_ctx()); std::vector<grpc_linked_mdelem> storage(elems.size()); for (size_t i = 0; i < elems.size(); i++) { GPR_ASSERT(GRPC_LOG_IF_ERROR( - "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i]))); + "addmd", + grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); } - std::unique_ptr<Closure> do_nothing = MakeClosure([](grpc_error* error) {}); + std::unique_ptr<Closure> do_nothing = + MakeClosure([](grpc_exec_ctx* exec_ctx, grpc_error* error) {}); uint32_t received; @@ -554,49 +569,51 @@ static void BM_TransportStreamRecv(benchmark::State& state) { std::unique_ptr<Closure> drain_continue; grpc_slice recv_slice; - std::unique_ptr<Closure> c = MakeClosure([&](grpc_error* error) { - if (!state.KeepRunning()) return; - // force outgoing window to be yuge - s.chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); - f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); - received = 0; - reset_op(); - op.on_complete = do_nothing.get(); - op.recv_message = true; - op.payload->recv_message.recv_message = &recv_stream; - op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(&op); - f.PushInput(grpc_slice_ref(incoming_data)); - }); - - drain_start = MakeClosure([&](grpc_error* error) { + std::unique_ptr<Closure> c = + MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); + f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); + received = 0; + reset_op(); + op.on_complete = do_nothing.get(); + op.recv_message = true; + op.payload->recv_message.recv_message = &recv_stream; + op.payload->recv_message.recv_message_ready = drain_start.get(); + s.Op(exec_ctx, &op); + f.PushInput(grpc_slice_ref(incoming_data)); + }); + + drain_start = MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { if (recv_stream == nullptr) { GPR_ASSERT(!state.KeepRunning()); return; } - GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(exec_ctx, drain.get(), GRPC_ERROR_NONE); }); - drain = MakeClosure([&](grpc_error* error) { + drain = MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { do { if (received == recv_stream->length) { - grpc_byte_stream_destroy(recv_stream); - GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE); + grpc_byte_stream_destroy(exec_ctx, recv_stream); + GRPC_CLOSURE_SCHED(exec_ctx, c.get(), GRPC_ERROR_NONE); return; } - } while (grpc_byte_stream_next(recv_stream, recv_stream->length - received, + } while (grpc_byte_stream_next(exec_ctx, recv_stream, + recv_stream->length - received, drain_continue.get()) && GRPC_ERROR_NONE == - grpc_byte_stream_pull(recv_stream, &recv_slice) && + grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) && (received += GRPC_SLICE_LENGTH(recv_slice), - grpc_slice_unref_internal(recv_slice), true)); + grpc_slice_unref_internal(exec_ctx, recv_slice), true)); }); - drain_continue = MakeClosure([&](grpc_error* error) { - grpc_byte_stream_pull(recv_stream, &recv_slice); + drain_continue = MakeClosure([&](grpc_exec_ctx* exec_ctx, grpc_error* error) { + grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice); received += GRPC_SLICE_LENGTH(recv_slice); - grpc_slice_unref_internal(recv_slice); - GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE); + grpc_slice_unref_internal(exec_ctx, recv_slice); + GRPC_CLOSURE_RUN(exec_ctx, drain.get(), GRPC_ERROR_NONE); }); reset_op(); @@ -607,7 +624,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) { op.payload->recv_initial_metadata.recv_initial_metadata_ready = do_nothing.get(); op.on_complete = c.get(); - s.Op(&op); + s.Op(f.exec_ctx(), &op); f.PushInput(SLICE_FROM_BUFFER( "\x00\x00\x00\x04\x00\x00\x00\x00\x00" // Generated using: @@ -625,12 +642,13 @@ static void BM_TransportStreamRecv(benchmark::State& state) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen(MakeOnceClosure([](grpc_error* error) {})); + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx* exec_ctx, + grpc_error* error) {})); f.FlushExecCtx(); track_counters.Finish(state); - grpc_metadata_batch_destroy(&b); - grpc_metadata_batch_destroy(&b_recv); + grpc_metadata_batch_destroy(f.exec_ctx(), &b); + grpc_metadata_batch_destroy(f.exec_ctx(), &b_recv); grpc_slice_unref(incoming_data); } BENCHMARK(BM_TransportStreamRecv)->Range(0, 128 * 1024 * 1024); diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 4d5a82c3f6..2434d4e84e 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -34,7 +34,8 @@ auto& force_library_initialization = Library::get(); static void BM_NoOpExecCtx(benchmark::State& state) { TrackCounters track_counters; while (state.KeepRunning()) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx_finish(&exec_ctx); } track_counters.Finish(state); } @@ -42,16 +43,16 @@ BENCHMARK(BM_NoOpExecCtx); static void BM_WellFlushed(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_WellFlushed); -static void DoNothing(void* arg, grpc_error* error) {} +static void DoNothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} static void BM_ClosureInitAgainstExecCtx(benchmark::State& state) { TrackCounters track_counters; @@ -68,13 +69,13 @@ static void BM_ClosureInitAgainstCombiner(benchmark::State& state) { TrackCounters track_counters; grpc_combiner* combiner = grpc_combiner_create(); grpc_closure c; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { benchmark::DoNotOptimize(GRPC_CLOSURE_INIT( &c, DoNothing, nullptr, grpc_combiner_scheduler(combiner))); } - GRPC_COMBINER_UNREF(combiner, "finished"); - + GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished"); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureInitAgainstCombiner); @@ -83,39 +84,41 @@ static void BM_ClosureRunOnExecCtx(benchmark::State& state) { TrackCounters track_counters; grpc_closure c; GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_RUN(&c, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_RUN(&exec_ctx, &c, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureRunOnExecCtx); static void BM_ClosureCreateAndRun(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { GRPC_CLOSURE_RUN( + &exec_ctx, GRPC_CLOSURE_CREATE(DoNothing, nullptr, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureCreateAndRun); static void BM_ClosureInitAndRun(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure c; while (state.KeepRunning()) { GRPC_CLOSURE_RUN( + &exec_ctx, GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureInitAndRun); @@ -124,12 +127,12 @@ static void BM_ClosureSchedOnExecCtx(benchmark::State& state) { TrackCounters track_counters; grpc_closure c; GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_SCHED(&c, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_SCHED(&exec_ctx, &c, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureSchedOnExecCtx); @@ -140,13 +143,13 @@ static void BM_ClosureSched2OnExecCtx(benchmark::State& state) { grpc_closure c2; GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_schedule_on_exec_ctx); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureSched2OnExecCtx); @@ -159,14 +162,14 @@ static void BM_ClosureSched3OnExecCtx(benchmark::State& state) { GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, grpc_schedule_on_exec_ctx); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c3, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c3, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureSched3OnExecCtx); @@ -176,13 +179,13 @@ static void BM_AcquireMutex(benchmark::State& state) { // for comparison with the combiner stuff below gpr_mu mu; gpr_mu_init(&mu); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { gpr_mu_lock(&mu); - DoNothing(nullptr, GRPC_ERROR_NONE); + DoNothing(&exec_ctx, nullptr, GRPC_ERROR_NONE); gpr_mu_unlock(&mu); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_AcquireMutex); @@ -192,16 +195,16 @@ static void BM_TryAcquireMutex(benchmark::State& state) { // for comparison with the combiner stuff below gpr_mu mu; gpr_mu_init(&mu); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { if (gpr_mu_trylock(&mu)) { - DoNothing(nullptr, GRPC_ERROR_NONE); + DoNothing(&exec_ctx, nullptr, GRPC_ERROR_NONE); gpr_mu_unlock(&mu); } else { abort(); } } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_TryAcquireMutex); @@ -210,13 +213,13 @@ static void BM_AcquireSpinlock(benchmark::State& state) { TrackCounters track_counters; // for comparison with the combiner stuff below gpr_spinlock mu = GPR_SPINLOCK_INITIALIZER; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { gpr_spinlock_lock(&mu); - DoNothing(nullptr, GRPC_ERROR_NONE); + DoNothing(&exec_ctx, nullptr, GRPC_ERROR_NONE); gpr_spinlock_unlock(&mu); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_AcquireSpinlock); @@ -225,16 +228,16 @@ static void BM_TryAcquireSpinlock(benchmark::State& state) { TrackCounters track_counters; // for comparison with the combiner stuff below gpr_spinlock mu = GPR_SPINLOCK_INITIALIZER; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { if (gpr_spinlock_trylock(&mu)) { - DoNothing(nullptr, GRPC_ERROR_NONE); + DoNothing(&exec_ctx, nullptr, GRPC_ERROR_NONE); gpr_spinlock_unlock(&mu); } else { abort(); } } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_TryAcquireSpinlock); @@ -244,13 +247,13 @@ static void BM_ClosureSchedOnCombiner(benchmark::State& state) { grpc_combiner* combiner = grpc_combiner_create(); grpc_closure c; GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_SCHED(&c, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_SCHED(&exec_ctx, &c, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - GRPC_COMBINER_UNREF(combiner, "finished"); - + GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished"); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureSchedOnCombiner); @@ -262,14 +265,14 @@ static void BM_ClosureSched2OnCombiner(benchmark::State& state) { grpc_closure c2; GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - GRPC_COMBINER_UNREF(combiner, "finished"); - + GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished"); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureSched2OnCombiner); @@ -283,15 +286,15 @@ static void BM_ClosureSched3OnCombiner(benchmark::State& state) { GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c3, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c3, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - GRPC_COMBINER_UNREF(combiner, "finished"); - + GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished"); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureSched3OnCombiner); @@ -306,15 +309,15 @@ static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) { grpc_combiner_scheduler(combiner1)); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_combiner_scheduler(combiner2)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - GRPC_COMBINER_UNREF(combiner1, "finished"); - GRPC_COMBINER_UNREF(combiner2, "finished"); - + GRPC_COMBINER_UNREF(&exec_ctx, combiner1, "finished"); + GRPC_COMBINER_UNREF(&exec_ctx, combiner2, "finished"); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureSched2OnTwoCombiners); @@ -335,17 +338,17 @@ static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) { grpc_combiner_scheduler(combiner1)); GRPC_CLOSURE_INIT(&c4, DoNothing, nullptr, grpc_combiner_scheduler(combiner2)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c3, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c4, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + GRPC_CLOSURE_SCHED(&exec_ctx, &c1, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c2, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c3, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, &c4, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); } - GRPC_COMBINER_UNREF(combiner1, "finished"); - GRPC_COMBINER_UNREF(combiner2, "finished"); - + GRPC_COMBINER_UNREF(&exec_ctx, combiner1, "finished"); + GRPC_COMBINER_UNREF(&exec_ctx, combiner2, "finished"); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureSched4OnTwoCombiners); @@ -359,11 +362,13 @@ class Rescheduler { GRPC_CLOSURE_INIT(&closure_, Step, this, scheduler); } - void ScheduleFirst() { GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); } + void ScheduleFirst(grpc_exec_ctx* exec_ctx) { + GRPC_CLOSURE_SCHED(exec_ctx, &closure_, GRPC_ERROR_NONE); + } void ScheduleFirstAgainstDifferentScheduler( - grpc_closure_scheduler* scheduler) { - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(Step, this, scheduler), + grpc_exec_ctx* exec_ctx, grpc_closure_scheduler* scheduler) { + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(Step, this, scheduler), GRPC_ERROR_NONE); } @@ -371,46 +376,47 @@ class Rescheduler { benchmark::State& state_; grpc_closure closure_; - static void Step(void* arg, grpc_error* error) { + static void Step(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { Rescheduler* self = static_cast<Rescheduler*>(arg); if (self->state_.KeepRunning()) { - GRPC_CLOSURE_SCHED(&self->closure_, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &self->closure_, GRPC_ERROR_NONE); } } }; static void BM_ClosureReschedOnExecCtx(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; Rescheduler r(state, grpc_schedule_on_exec_ctx); - r.ScheduleFirst(); - + r.ScheduleFirst(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureReschedOnExecCtx); static void BM_ClosureReschedOnCombiner(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_combiner* combiner = grpc_combiner_create(); Rescheduler r(state, grpc_combiner_scheduler(combiner)); - r.ScheduleFirst(); - grpc_core::ExecCtx::Get()->Flush(); - GRPC_COMBINER_UNREF(combiner, "finished"); - + r.ScheduleFirst(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); + GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished"); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureReschedOnCombiner); static void BM_ClosureReschedOnCombinerFinally(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_combiner* combiner = grpc_combiner_create(); Rescheduler r(state, grpc_combiner_finally_scheduler(combiner)); - r.ScheduleFirstAgainstDifferentScheduler(grpc_combiner_scheduler(combiner)); - grpc_core::ExecCtx::Get()->Flush(); - GRPC_COMBINER_UNREF(combiner, "finished"); - + r.ScheduleFirstAgainstDifferentScheduler(&exec_ctx, + grpc_combiner_scheduler(combiner)); + grpc_exec_ctx_flush(&exec_ctx); + GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished"); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_ClosureReschedOnCombinerFinally); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 97242598f1..f0dede7333 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -66,7 +66,7 @@ static void BM_CreateDestroyCore(benchmark::State& state) { } BENCHMARK(BM_CreateDestroyCore); -static void DoneWithCompletionOnStack(void* arg, +static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg, grpc_cq_completion* completion) {} class DummyTag final : public internal::CompletionQueueTag { @@ -81,11 +81,11 @@ static void BM_Pass1Cpp(benchmark::State& state) { while (state.KeepRunning()) { grpc_cq_completion completion; DummyTag dummy_tag; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(grpc_cq_begin_op(c_cq, &dummy_tag)); - grpc_cq_end_op(c_cq, &dummy_tag, GRPC_ERROR_NONE, DoneWithCompletionOnStack, - nullptr, &completion); - + grpc_cq_end_op(&exec_ctx, c_cq, &dummy_tag, GRPC_ERROR_NONE, + DoneWithCompletionOnStack, nullptr, &completion); + grpc_exec_ctx_finish(&exec_ctx); void* tag; bool ok; cq.Next(&tag, &ok); @@ -101,11 +101,11 @@ static void BM_Pass1Core(benchmark::State& state) { gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_cq_completion completion; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(grpc_cq_begin_op(cq, nullptr)); - grpc_cq_end_op(cq, nullptr, GRPC_ERROR_NONE, DoneWithCompletionOnStack, - nullptr, &completion); - + grpc_cq_end_op(&exec_ctx, cq, nullptr, GRPC_ERROR_NONE, + DoneWithCompletionOnStack, nullptr, &completion); + grpc_exec_ctx_finish(&exec_ctx); grpc_completion_queue_next(cq, deadline, nullptr); } grpc_completion_queue_destroy(cq); @@ -120,11 +120,11 @@ static void BM_Pluck1Core(benchmark::State& state) { gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_cq_completion completion; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(grpc_cq_begin_op(cq, nullptr)); - grpc_cq_end_op(cq, nullptr, GRPC_ERROR_NONE, DoneWithCompletionOnStack, - nullptr, &completion); - + grpc_cq_end_op(&exec_ctx, cq, nullptr, GRPC_ERROR_NONE, + DoneWithCompletionOnStack, nullptr, &completion); + grpc_exec_ctx_finish(&exec_ctx); grpc_completion_queue_pluck(cq, nullptr, deadline, nullptr); } grpc_completion_queue_destroy(cq); diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index 874c834931..7ccebb55ee 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -43,8 +43,9 @@ static grpc_completion_queue* g_cq; static grpc_event_engine_vtable g_vtable; static const grpc_event_engine_vtable* g_old_vtable; -static void pollset_shutdown(grpc_pollset* ps, grpc_closure* closure) { - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); +static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, + grpc_closure* closure) { + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); } static void pollset_init(grpc_pollset* ps, gpr_mu** mu) { @@ -52,20 +53,25 @@ static void pollset_init(grpc_pollset* ps, gpr_mu** mu) { *mu = &ps->mu; } -static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); } +static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* ps) { + gpr_mu_destroy(&ps->mu); +} -static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) { +static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* p, + grpc_pollset_worker* worker) { return GRPC_ERROR_NONE; } /* Callback when the tag is dequeued from the completion queue. Does nothing */ -static void cq_done_cb(void* done_arg, grpc_cq_completion* cq_completion) { +static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg, + grpc_cq_completion* cq_completion) { gpr_free(cq_completion); } /* Queues a completion tag if deadline is > 0. * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */ -static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker, +static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, + grpc_pollset_worker** worker, grpc_millis deadline) { if (deadline == 0) { gpr_log(GPR_DEBUG, "no-op"); @@ -74,9 +80,9 @@ static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker, gpr_mu_unlock(&ps->mu); GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag)); - grpc_cq_end_op(g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, nullptr, + grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, nullptr, (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion))); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&ps->mu); return GRPC_ERROR_NONE; } diff --git a/test/cpp/microbenchmarks/bm_error.cc b/test/cpp/microbenchmarks/bm_error.cc index d12f475a49..bbd8b3c339 100644 --- a/test/cpp/microbenchmarks/bm_error.cc +++ b/test/cpp/microbenchmarks/bm_error.cc @@ -246,14 +246,14 @@ template <class Fixture> static void BM_ErrorGetStatus(benchmark::State& state) { TrackCounters track_counters; Fixture fixture; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { grpc_status_code status; grpc_slice slice; - grpc_error_get_status(fixture.error(), fixture.deadline(), &status, &slice, - nullptr, nullptr); + grpc_error_get_status(&exec_ctx, fixture.error(), fixture.deadline(), + &status, &slice, nullptr, nullptr); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } @@ -261,13 +261,13 @@ template <class Fixture> static void BM_ErrorGetStatusCode(benchmark::State& state) { TrackCounters track_counters; Fixture fixture; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { grpc_status_code status; - grpc_error_get_status(fixture.error(), fixture.deadline(), &status, nullptr, - nullptr, nullptr); + grpc_error_get_status(&exec_ctx, fixture.error(), fixture.deadline(), + &status, nullptr, nullptr, nullptr); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } @@ -275,13 +275,13 @@ template <class Fixture> static void BM_ErrorHttpError(benchmark::State& state) { TrackCounters track_counters; Fixture fixture; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { grpc_http2_error_code error; - grpc_error_get_status(fixture.error(), fixture.deadline(), nullptr, nullptr, - &error, nullptr); + grpc_error_get_status(&exec_ctx, fixture.error(), fixture.deadline(), + nullptr, nullptr, &error, nullptr); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index d6d7d41e5e..5e72213823 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -177,13 +177,13 @@ class TrickledCHTTP2 : public EndpointPairFixture { } void Step(bool update_stats) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; inc_time(); size_t client_backlog = - grpc_trickle_endpoint_trickle(endpoint_pair_.client); + grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client); size_t server_backlog = - grpc_trickle_endpoint_trickle(endpoint_pair_.server); - + grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server); + grpc_exec_ctx_finish(&exec_ctx); if (update_stats) { UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_, client_backlog); diff --git a/test/cpp/microbenchmarks/bm_metadata.cc b/test/cpp/microbenchmarks/bm_metadata.cc index f1e7890fc0..73bce08466 100644 --- a/test/cpp/microbenchmarks/bm_metadata.cc +++ b/test/cpp/microbenchmarks/bm_metadata.cc @@ -90,11 +90,11 @@ static void BM_MetadataFromNonInternedSlices(benchmark::State& state) { TrackCounters track_counters; gpr_slice k = grpc_slice_from_static_string("key"); gpr_slice v = grpc_slice_from_static_string("value"); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create(k, v, nullptr)); + GRPC_MDELEM_UNREF(&exec_ctx, grpc_mdelem_create(&exec_ctx, k, v, nullptr)); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_MetadataFromNonInternedSlices); @@ -103,11 +103,11 @@ static void BM_MetadataFromInternedSlices(benchmark::State& state) { TrackCounters track_counters; gpr_slice k = grpc_slice_intern(grpc_slice_from_static_string("key")); gpr_slice v = grpc_slice_intern(grpc_slice_from_static_string("value")); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create(k, v, nullptr)); + GRPC_MDELEM_UNREF(&exec_ctx, grpc_mdelem_create(&exec_ctx, k, v, nullptr)); } - + grpc_exec_ctx_finish(&exec_ctx); grpc_slice_unref(k); grpc_slice_unref(v); track_counters.Finish(state); @@ -119,13 +119,13 @@ static void BM_MetadataFromInternedSlicesAlreadyInIndex( TrackCounters track_counters; gpr_slice k = grpc_slice_intern(grpc_slice_from_static_string("key")); gpr_slice v = grpc_slice_intern(grpc_slice_from_static_string("value")); - grpc_core::ExecCtx exec_ctx; - grpc_mdelem seed = grpc_mdelem_create(k, v, nullptr); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_mdelem seed = grpc_mdelem_create(&exec_ctx, k, v, nullptr); while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create(k, v, nullptr)); + GRPC_MDELEM_UNREF(&exec_ctx, grpc_mdelem_create(&exec_ctx, k, v, nullptr)); } - GRPC_MDELEM_UNREF(seed); - + GRPC_MDELEM_UNREF(&exec_ctx, seed); + grpc_exec_ctx_finish(&exec_ctx); grpc_slice_unref(k); grpc_slice_unref(v); track_counters.Finish(state); @@ -136,11 +136,11 @@ static void BM_MetadataFromInternedKey(benchmark::State& state) { TrackCounters track_counters; gpr_slice k = grpc_slice_intern(grpc_slice_from_static_string("key")); gpr_slice v = grpc_slice_from_static_string("value"); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create(k, v, nullptr)); + GRPC_MDELEM_UNREF(&exec_ctx, grpc_mdelem_create(&exec_ctx, k, v, nullptr)); } - + grpc_exec_ctx_finish(&exec_ctx); grpc_slice_unref(k); track_counters.Finish(state); } @@ -152,12 +152,14 @@ static void BM_MetadataFromNonInternedSlicesWithBackingStore( gpr_slice k = grpc_slice_from_static_string("key"); gpr_slice v = grpc_slice_from_static_string("value"); char backing_store[sizeof(grpc_mdelem_data)]; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create( - k, v, reinterpret_cast<grpc_mdelem_data*>(backing_store))); + GRPC_MDELEM_UNREF( + &exec_ctx, + grpc_mdelem_create(&exec_ctx, k, v, + reinterpret_cast<grpc_mdelem_data*>(backing_store))); } - + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_MetadataFromNonInternedSlicesWithBackingStore); @@ -168,12 +170,14 @@ static void BM_MetadataFromInternedSlicesWithBackingStore( gpr_slice k = grpc_slice_intern(grpc_slice_from_static_string("key")); gpr_slice v = grpc_slice_intern(grpc_slice_from_static_string("value")); char backing_store[sizeof(grpc_mdelem_data)]; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create( - k, v, reinterpret_cast<grpc_mdelem_data*>(backing_store))); + GRPC_MDELEM_UNREF( + &exec_ctx, + grpc_mdelem_create(&exec_ctx, k, v, + reinterpret_cast<grpc_mdelem_data*>(backing_store))); } - + grpc_exec_ctx_finish(&exec_ctx); grpc_slice_unref(k); grpc_slice_unref(v); track_counters.Finish(state); @@ -186,12 +190,14 @@ static void BM_MetadataFromInternedKeyWithBackingStore( gpr_slice k = grpc_slice_intern(grpc_slice_from_static_string("key")); gpr_slice v = grpc_slice_from_static_string("value"); char backing_store[sizeof(grpc_mdelem_data)]; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create( - k, v, reinterpret_cast<grpc_mdelem_data*>(backing_store))); + GRPC_MDELEM_UNREF( + &exec_ctx, + grpc_mdelem_create(&exec_ctx, k, v, + reinterpret_cast<grpc_mdelem_data*>(backing_store))); } - + grpc_exec_ctx_finish(&exec_ctx); grpc_slice_unref(k); track_counters.Finish(state); } @@ -201,11 +207,11 @@ static void BM_MetadataFromStaticMetadataStrings(benchmark::State& state) { TrackCounters track_counters; gpr_slice k = GRPC_MDSTR_STATUS; gpr_slice v = GRPC_MDSTR_200; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create(k, v, nullptr)); + GRPC_MDELEM_UNREF(&exec_ctx, grpc_mdelem_create(&exec_ctx, k, v, nullptr)); } - + grpc_exec_ctx_finish(&exec_ctx); grpc_slice_unref(k); track_counters.Finish(state); } @@ -216,11 +222,11 @@ static void BM_MetadataFromStaticMetadataStringsNotIndexed( TrackCounters track_counters; gpr_slice k = GRPC_MDSTR_STATUS; gpr_slice v = GRPC_MDSTR_GZIP; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(grpc_mdelem_create(k, v, nullptr)); + GRPC_MDELEM_UNREF(&exec_ctx, grpc_mdelem_create(&exec_ctx, k, v, nullptr)); } - + grpc_exec_ctx_finish(&exec_ctx); grpc_slice_unref(k); track_counters.Finish(state); } @@ -229,15 +235,16 @@ BENCHMARK(BM_MetadataFromStaticMetadataStringsNotIndexed); static void BM_MetadataRefUnrefExternal(benchmark::State& state) { TrackCounters track_counters; char backing_store[sizeof(grpc_mdelem_data)]; - grpc_core::ExecCtx exec_ctx; - grpc_mdelem el = grpc_mdelem_create( - grpc_slice_from_static_string("a"), grpc_slice_from_static_string("b"), - reinterpret_cast<grpc_mdelem_data*>(backing_store)); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_mdelem el = + grpc_mdelem_create(&exec_ctx, grpc_slice_from_static_string("a"), + grpc_slice_from_static_string("b"), + reinterpret_cast<grpc_mdelem_data*>(backing_store)); while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(GRPC_MDELEM_REF(el)); + GRPC_MDELEM_UNREF(&exec_ctx, GRPC_MDELEM_REF(el)); } - GRPC_MDELEM_UNREF(el); - + GRPC_MDELEM_UNREF(&exec_ctx, el); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_MetadataRefUnrefExternal); @@ -245,47 +252,47 @@ BENCHMARK(BM_MetadataRefUnrefExternal); static void BM_MetadataRefUnrefInterned(benchmark::State& state) { TrackCounters track_counters; char backing_store[sizeof(grpc_mdelem_data)]; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_slice k = grpc_slice_intern(grpc_slice_from_static_string("key")); gpr_slice v = grpc_slice_intern(grpc_slice_from_static_string("value")); grpc_mdelem el = grpc_mdelem_create( - k, v, reinterpret_cast<grpc_mdelem_data*>(backing_store)); + &exec_ctx, k, v, reinterpret_cast<grpc_mdelem_data*>(backing_store)); grpc_slice_unref(k); grpc_slice_unref(v); while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(GRPC_MDELEM_REF(el)); + GRPC_MDELEM_UNREF(&exec_ctx, GRPC_MDELEM_REF(el)); } - GRPC_MDELEM_UNREF(el); - + GRPC_MDELEM_UNREF(&exec_ctx, el); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_MetadataRefUnrefInterned); static void BM_MetadataRefUnrefAllocated(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_mdelem el = - grpc_mdelem_create(grpc_slice_from_static_string("a"), + grpc_mdelem_create(&exec_ctx, grpc_slice_from_static_string("a"), grpc_slice_from_static_string("b"), nullptr); while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(GRPC_MDELEM_REF(el)); + GRPC_MDELEM_UNREF(&exec_ctx, GRPC_MDELEM_REF(el)); } - GRPC_MDELEM_UNREF(el); - + GRPC_MDELEM_UNREF(&exec_ctx, el); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_MetadataRefUnrefAllocated); static void BM_MetadataRefUnrefStatic(benchmark::State& state) { TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_mdelem el = - grpc_mdelem_create(GRPC_MDSTR_STATUS, GRPC_MDSTR_200, nullptr); + grpc_mdelem_create(&exec_ctx, GRPC_MDSTR_STATUS, GRPC_MDSTR_200, nullptr); while (state.KeepRunning()) { - GRPC_MDELEM_UNREF(GRPC_MDELEM_REF(el)); + GRPC_MDELEM_UNREF(&exec_ctx, GRPC_MDELEM_REF(el)); } - GRPC_MDELEM_UNREF(el); - + GRPC_MDELEM_UNREF(&exec_ctx, el); + grpc_exec_ctx_finish(&exec_ctx); track_counters.Finish(state); } BENCHMARK(BM_MetadataRefUnrefStatic); diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc index d9d5164cce..4da79693f1 100644 --- a/test/cpp/microbenchmarks/bm_pollset.cc +++ b/test/cpp/microbenchmarks/bm_pollset.cc @@ -41,8 +41,8 @@ auto& force_library_initialization = Library::get(); -static void shutdown_ps(void* ps, grpc_error* error) { - grpc_pollset_destroy(static_cast<grpc_pollset*>(ps)); +static void shutdown_ps(grpc_exec_ctx* exec_ctx, void* ps, grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(ps)); } static void BM_CreateDestroyPollset(benchmark::State& state) { @@ -50,7 +50,7 @@ static void BM_CreateDestroyPollset(benchmark::State& state) { size_t ps_sz = grpc_pollset_size(); grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_malloc(ps_sz)); gpr_mu* mu; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure shutdown_ps_closure; GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, grpc_schedule_on_exec_ctx); @@ -58,11 +58,11 @@ static void BM_CreateDestroyPollset(benchmark::State& state) { memset(ps, 0, ps_sz); grpc_pollset_init(ps, &mu); gpr_mu_lock(mu); - grpc_pollset_shutdown(ps, &shutdown_ps_closure); + grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure); gpr_mu_unlock(mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); } - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_finish(&exec_ctx); gpr_free(ps); track_counters.Finish(state); } @@ -114,17 +114,17 @@ static void BM_PollEmptyPollset(benchmark::State& state) { grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz)); gpr_mu* mu; grpc_pollset_init(ps, &mu); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(mu); while (state.KeepRunning()) { - GRPC_ERROR_UNREF(grpc_pollset_work(ps, nullptr, 0)); + GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, nullptr, 0)); } grpc_closure shutdown_ps_closure; GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(ps, &shutdown_ps_closure); + grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure); gpr_mu_unlock(mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_finish(&exec_ctx); gpr_free(ps); track_counters.Finish(state); } @@ -136,23 +136,24 @@ static void BM_PollAddFd(benchmark::State& state) { grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz)); gpr_mu* mu; grpc_pollset_init(ps, &mu); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_wakeup_fd wakeup_fd; GPR_ASSERT( GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd))); grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx"); while (state.KeepRunning()) { - grpc_pollset_add_fd(ps, fd); - grpc_core::ExecCtx::Get()->Flush(); + grpc_pollset_add_fd(&exec_ctx, ps, fd); + grpc_exec_ctx_flush(&exec_ctx); } - grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */, "xxx"); + grpc_fd_orphan(&exec_ctx, fd, nullptr, nullptr, false /* already_closed */, + "xxx"); grpc_closure shutdown_ps_closure; GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, grpc_schedule_on_exec_ctx); gpr_mu_lock(mu); - grpc_pollset_shutdown(ps, &shutdown_ps_closure); + grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure); gpr_mu_unlock(mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_finish(&exec_ctx); gpr_free(ps); track_counters.Finish(state); } @@ -169,7 +170,7 @@ Closure* MakeClosure(F f, grpc_closure_scheduler* scheduler) { C(F f, grpc_closure_scheduler* scheduler) : f_(f) { GRPC_CLOSURE_INIT(this, C::cbfn, this, scheduler); } - static void cbfn(void* arg, grpc_error* error) { + static void cbfn(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { C* p = static_cast<C*>(arg); p->f_(); } @@ -218,11 +219,11 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) { grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz)); gpr_mu* mu; grpc_pollset_init(ps, &mu); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_wakeup_fd wakeup_fd; GRPC_ERROR_UNREF(grpc_wakeup_fd_init(&wakeup_fd)); grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read"); - grpc_pollset_add_fd(ps, wakeup); + grpc_pollset_add_fd(&exec_ctx, ps, wakeup); bool done = false; Closure* continue_closure = MakeClosure( [&]() { @@ -232,23 +233,25 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) { return; } GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd)); - grpc_fd_notify_on_read(wakeup, continue_closure); + grpc_fd_notify_on_read(&exec_ctx, wakeup, continue_closure); }, grpc_schedule_on_exec_ctx); GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd)); - grpc_fd_notify_on_read(wakeup, continue_closure); + grpc_fd_notify_on_read(&exec_ctx, wakeup, continue_closure); gpr_mu_lock(mu); while (!done) { - GRPC_ERROR_UNREF(grpc_pollset_work(ps, nullptr, GRPC_MILLIS_INF_FUTURE)); + GRPC_ERROR_UNREF( + grpc_pollset_work(&exec_ctx, ps, nullptr, GRPC_MILLIS_INF_FUTURE)); } - grpc_fd_orphan(wakeup, nullptr, nullptr, false /* already_closed */, "done"); + grpc_fd_orphan(&exec_ctx, wakeup, nullptr, nullptr, + false /* already_closed */, "done"); wakeup_fd.read_fd = 0; grpc_closure shutdown_ps_closure; GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(ps, &shutdown_ps_closure); + grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure); gpr_mu_unlock(mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_finish(&exec_ctx); grpc_wakeup_fd_destroy(&wakeup_fd); gpr_free(ps); track_counters.Finish(state); diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index d1ede755a5..7e20843875 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -166,7 +166,7 @@ class EndpointPairFixture : public BaseFixture { fixture_configuration.ApplyCommonServerBuilderConfig(&b); server_ = b.BuildAndStart(); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; /* add server endpoint to server_ * */ @@ -174,19 +174,20 @@ class EndpointPairFixture : public BaseFixture { const grpc_channel_args* server_args = grpc_server_get_channel_args(server_->c_server()); server_transport_ = grpc_create_chttp2_transport( - server_args, endpoints.server, false /* is_client */); + &exec_ctx, server_args, endpoints.server, false /* is_client */); grpc_pollset** pollsets; size_t num_pollsets = 0; grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets); for (size_t i = 0; i < num_pollsets; i++) { - grpc_endpoint_add_to_pollset(endpoints.server, pollsets[i]); + grpc_endpoint_add_to_pollset(&exec_ctx, endpoints.server, pollsets[i]); } - grpc_server_setup_transport(server_->c_server(), server_transport_, - nullptr, server_args); - grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr); + grpc_server_setup_transport(&exec_ctx, server_->c_server(), + server_transport_, nullptr, server_args); + grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, nullptr, + nullptr); } /* create channel */ @@ -196,15 +197,19 @@ class EndpointPairFixture : public BaseFixture { fixture_configuration.ApplyCommonChannelArguments(&args); grpc_channel_args c_args = args.c_channel_args(); - client_transport_ = - grpc_create_chttp2_transport(&c_args, endpoints.client, true); + client_transport_ = grpc_create_chttp2_transport(&exec_ctx, &c_args, + endpoints.client, true); GPR_ASSERT(client_transport_); - grpc_channel* channel = grpc_channel_create( - "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_); - grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr); + grpc_channel* channel = + grpc_channel_create(&exec_ctx, "target", &c_args, + GRPC_CLIENT_DIRECT_CHANNEL, client_transport_); + grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, nullptr, + nullptr); channel_ = CreateChannelInternal("", channel); } + + grpc_exec_ctx_finish(&exec_ctx); } virtual ~EndpointPairFixture() { diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index 3481d9d1aa..6f1f0c44b9 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -149,33 +149,33 @@ struct ArgsStruct { std::string expected_lb_policy; }; -void ArgsInit(ArgsStruct* args) { +void ArgsInit(grpc_exec_ctx* exec_ctx, ArgsStruct* args) { gpr_event_init(&args->ev); args->pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(args->pollset, &args->mu); args->pollset_set = grpc_pollset_set_create(); - grpc_pollset_set_add_pollset(args->pollset_set, args->pollset); + grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset); args->lock = grpc_combiner_create(); gpr_atm_rel_store(&args->done_atm, 0); args->channel_args = nullptr; } -void DoNothing(void* arg, grpc_error* error) {} +void DoNothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} -void ArgsFinish(ArgsStruct* args) { +void ArgsFinish(grpc_exec_ctx* exec_ctx, ArgsStruct* args) { GPR_ASSERT(gpr_event_wait(&args->ev, TestDeadline())); - grpc_pollset_set_del_pollset(args->pollset_set, args->pollset); - grpc_pollset_set_destroy(args->pollset_set); + grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset); + grpc_pollset_set_destroy(exec_ctx, args->pollset_set); grpc_closure DoNothing_cb; GRPC_CLOSURE_INIT(&DoNothing_cb, DoNothing, nullptr, grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(args->pollset, &DoNothing_cb); + grpc_pollset_shutdown(exec_ctx, args->pollset, &DoNothing_cb); // exec_ctx needs to be flushed before calling grpc_pollset_destroy() - grpc_channel_args_destroy(args->channel_args); - grpc_core::ExecCtx::Get()->Flush(); - grpc_pollset_destroy(args->pollset); + grpc_channel_args_destroy(exec_ctx, args->channel_args); + grpc_exec_ctx_flush(exec_ctx); + grpc_pollset_destroy(exec_ctx, args->pollset); gpr_free(args->pollset); - GRPC_COMBINER_UNREF(args->lock, nullptr); + GRPC_COMBINER_UNREF(exec_ctx, args->lock, NULL); } gpr_timespec NSecondDeadline(int seconds) { @@ -196,13 +196,14 @@ void PollPollsetUntilRequestDone(ArgsStruct* args) { time_left.tv_sec, time_left.tv_nsec); GPR_ASSERT(gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) >= 0); grpc_pollset_worker* worker = nullptr; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(args->mu); GRPC_LOG_IF_ERROR("pollset_work", - grpc_pollset_work(args->pollset, &worker, + grpc_pollset_work(&exec_ctx, args->pollset, &worker, grpc_timespec_to_millis_round_up( NSecondDeadline(1)))); gpr_mu_unlock(args->mu); + grpc_exec_ctx_finish(&exec_ctx); } gpr_event_set(&args->ev, (void*)1); } @@ -234,7 +235,8 @@ void CheckLBPolicyResultLocked(grpc_channel_args* channel_args, } } -void CheckResolverResultLocked(void* argsp, grpc_error* err) { +void CheckResolverResultLocked(grpc_exec_ctx* exec_ctx, void* argsp, + grpc_error* err) { ArgsStruct* args = (ArgsStruct*)argsp; grpc_channel_args* channel_args = args->channel_args; const grpc_arg* channel_arg = @@ -270,14 +272,15 @@ void CheckResolverResultLocked(void* argsp, grpc_error* err) { } gpr_atm_rel_store(&args->done_atm, 1); gpr_mu_lock(args->mu); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr)); + GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(exec_ctx, args->pollset, nullptr)); gpr_mu_unlock(args->mu); } TEST(ResolverComponentTest, TestResolvesRelevantRecords) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; ArgsStruct args; - ArgsInit(&args); + ArgsInit(&exec_ctx, &args); args.expected_addrs = ParseExpectedAddrs(FLAGS_expected_addrs); args.expected_service_config_string = FLAGS_expected_chosen_service_config; args.expected_lb_policy = FLAGS_expected_lb_policy; @@ -287,18 +290,19 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecords) { FLAGS_local_dns_server_address.c_str(), FLAGS_target_name.c_str())); // create resolver and resolve - grpc_resolver* resolver = - grpc_resolver_create(whole_uri, nullptr, args.pollset_set, args.lock); + grpc_resolver* resolver = grpc_resolver_create(&exec_ctx, whole_uri, nullptr, + args.pollset_set, args.lock); gpr_free(whole_uri); grpc_closure on_resolver_result_changed; GRPC_CLOSURE_INIT(&on_resolver_result_changed, CheckResolverResultLocked, (void*)&args, grpc_combiner_scheduler(args.lock)); - grpc_resolver_next_locked(resolver, &args.channel_args, + grpc_resolver_next_locked(&exec_ctx, resolver, &args.channel_args, &on_resolver_result_changed); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); PollPollsetUntilRequestDone(&args); - GRPC_RESOLVER_UNREF(resolver, nullptr); - ArgsFinish(&args); + GRPC_RESOLVER_UNREF(&exec_ctx, resolver, NULL); + ArgsFinish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } } // namespace diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index 0b9dc83f2b..1c6f44dd7d 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -82,26 +82,27 @@ class EndpointPairFixture { ApplyCommonServerBuilderConfig(&b); server_ = b.BuildAndStart(); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; /* add server endpoint to server_ */ { const grpc_channel_args* server_args = grpc_server_get_channel_args(server_->c_server()); grpc_transport* transport = grpc_create_chttp2_transport( - server_args, endpoints.server, false /* is_client */); + &exec_ctx, server_args, endpoints.server, false /* is_client */); grpc_pollset** pollsets; size_t num_pollsets = 0; grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets); for (size_t i = 0; i < num_pollsets; i++) { - grpc_endpoint_add_to_pollset(endpoints.server, pollsets[i]); + grpc_endpoint_add_to_pollset(&exec_ctx, endpoints.server, pollsets[i]); } - grpc_server_setup_transport(server_->c_server(), transport, nullptr, - server_args); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport, + nullptr, server_args); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, + nullptr); } /* create channel */ @@ -111,15 +112,18 @@ class EndpointPairFixture { ApplyCommonChannelArguments(&args); grpc_channel_args c_args = args.c_channel_args(); - grpc_transport* transport = - grpc_create_chttp2_transport(&c_args, endpoints.client, true); + grpc_transport* transport = grpc_create_chttp2_transport( + &exec_ctx, &c_args, endpoints.client, true); GPR_ASSERT(transport); grpc_channel* channel = grpc_channel_create( - "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); - grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, + nullptr); channel_ = CreateChannelInternal("", channel); } + + grpc_exec_ctx_finish(&exec_ctx); } virtual ~EndpointPairFixture() { diff --git a/test/cpp/server/server_builder_test.cc b/test/cpp/server/server_builder_test.cc index 694ce549c0..d18459cec9 100644 --- a/test/cpp/server/server_builder_test.cc +++ b/test/cpp/server/server_builder_test.cc @@ -22,8 +22,6 @@ #include <grpc++/server.h> #include <grpc++/server_builder.h> -#include <grpc/grpc.h> - #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -79,8 +77,5 @@ TEST(ServerBuilderTest, CreateServerRepeatedPortWithDisallowedReusePort) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_init(); - int ret = RUN_ALL_TESTS(); - grpc_shutdown(); - return ret; + return RUN_ALL_TESTS(); } diff --git a/test/cpp/util/byte_buffer_test.cc b/test/cpp/util/byte_buffer_test.cc index d603b289c8..8fb51bc663 100644 --- a/test/cpp/util/byte_buffer_test.cc +++ b/test/cpp/util/byte_buffer_test.cc @@ -22,7 +22,6 @@ #include <vector> #include <grpc++/support/slice.h> -#include <grpc/grpc.h> #include <grpc/slice.h> #include <gtest/gtest.h> @@ -110,8 +109,5 @@ TEST_F(ByteBufferTest, SerializationMakesCopy) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_init(); - int ret = RUN_ALL_TESTS(); - grpc_shutdown(); - return ret; + return RUN_ALL_TESTS(); } diff --git a/test/cpp/util/slice_test.cc b/test/cpp/util/slice_test.cc index c2e55f3374..8a8962d7ee 100644 --- a/test/cpp/util/slice_test.cc +++ b/test/cpp/util/slice_test.cc @@ -18,7 +18,6 @@ #include <grpc++/support/slice.h> -#include <grpc/grpc.h> #include <grpc/slice.h> #include <gtest/gtest.h> @@ -128,8 +127,5 @@ TEST_F(SliceTest, Cslice) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_init(); - int ret = RUN_ALL_TESTS(); - grpc_shutdown(); - return ret; + return RUN_ALL_TESTS(); } |