diff options
author | 2017-11-13 15:37:58 -0800 | |
---|---|---|
committer | 2017-11-14 01:36:28 -0800 | |
commit | 75122c23578e24417dcf64081c737571a9fc2dbc (patch) | |
tree | f4b8491964ec0508a5826490628c9f87b82c3326 /src/core | |
parent | 36cd68f0d543b9024c84eff82319890a791de7f6 (diff) |
Address some PR comments
Diffstat (limited to 'src/core')
82 files changed, 375 insertions, 494 deletions
diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc index 1b42f5d6a0..dbdcd53ef5 100644 --- a/src/core/ext/filters/client_channel/backup_poller.cc +++ b/src/core/ext/filters/client_channel/backup_poller.cc @@ -112,10 +112,10 @@ static void run_poller(void* arg, grpc_error* error) { backup_poller_shutdown_unref(p); return; } - grpc_error* err = grpc_pollset_work(p->pollset, NULL, grpc_exec_ctx_now()); + grpc_error* err = grpc_pollset_work(p->pollset, NULL, ExecCtx::Get()->Now()); gpr_mu_unlock(p->pollset_mu); GRPC_LOG_IF_ERROR("Run client channel backup poller", err); - grpc_timer_init(&p->polling_timer, grpc_exec_ctx_now() + g_poll_interval_ms, + grpc_timer_init(&p->polling_timer, ExecCtx::Get()->Now() + g_poll_interval_ms, &p->run_poller_closure); } @@ -137,7 +137,7 @@ void grpc_client_channel_start_backup_polling( GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller, grpc_schedule_on_exec_ctx); grpc_timer_init(&g_poller->polling_timer, - grpc_exec_ctx_now() + g_poll_interval_ms, + ExecCtx::Get()->Now() + g_poll_interval_ms, &g_poller->run_poller_closure); } diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index 3069e66775..0ceedb9f86 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -41,14 +41,14 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( if (client_channel_elem->filter == &grpc_client_channel_filter) { state = grpc_client_channel_check_connectivity_state(client_channel_elem, try_to_connect); - grpc_exec_ctx_finish(); + return state; } gpr_log(GPR_ERROR, "grpc_channel_check_connectivity_state called on something that is " "not a client channel, but '%s'", client_channel_elem->filter->name); - grpc_exec_ctx_finish(); + return GRPC_CHANNEL_SHUTDOWN; } @@ -241,6 +241,4 @@ void grpc_channel_watch_connectivity_state( } else { abort(); } - - grpc_exec_ctx_finish(); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index d4c58fb1e0..63cf417c4e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1122,7 +1122,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) { if (glb_policy->lb_fallback_timeout_ms > 0 && glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) { grpc_millis deadline = - grpc_exec_ctx_now() + glb_policy->lb_fallback_timeout_ms; + ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, glb_policy, @@ -1271,7 +1271,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); - grpc_millis timeout = next_try - grpc_exec_ctx_now(); + grpc_millis timeout = next_try - ExecCtx::Get()->Now(); if (timeout > 0) { gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", @@ -1297,7 +1297,7 @@ static void send_client_load_report_locked(void* arg, grpc_error* error); static void schedule_next_client_load_report(glb_lb_policy* glb_policy) { const grpc_millis next_client_load_report_time = - grpc_exec_ctx_now() + glb_policy->client_stats_report_interval; + ExecCtx::Get()->Now() + glb_policy->client_stats_report_interval; GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -1392,7 +1392,7 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) { grpc_millis deadline = glb_policy->lb_call_timeout_ms == 0 ? GRPC_MILLIS_INF_FUTURE - : grpc_exec_ctx_now() + glb_policy->lb_call_timeout_ms; + : ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms; glb_policy->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 77d790aa38..f0543964ae 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -265,7 +265,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); grpc_millis next_try = grpc_backoff_step(&r->backoff_state).next_attempt_start_time; - grpc_millis timeout = next_try - grpc_exec_ctx_now(); + grpc_millis timeout = next_try - ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); GPR_ASSERT(!r->have_retry_timer); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index c57fac61a4..925223d189 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -101,18 +101,7 @@ static void grpc_ares_request_unref(grpc_ares_request* r) { request */ if (gpr_unref(&r->pending_queries)) { /* TODO(zyc): Sort results with RFC6724 before invoking on_done. */ - if (exec_ctx == NULL) { - /* A new exec_ctx is created here, as the c-ares interface does not - provide one in ares_host_callback. It's safe to schedule on_done with - the newly created exec_ctx, since the caller has been warned not to - acquire locks in on_done. ares_dns_resolver is using combiner to - protect resources needed by on_done. */ - ExecCtx _local_exec_ctx; - GRPC_CLOSURE_SCHED(r->on_done, r->error); - grpc_exec_ctx_finish(); - } else { - GRPC_CLOSURE_SCHED(r->on_done, r->error); - } + GRPC_CLOSURE_SCHED(r->on_done, r->error); gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy(r->ev_driver); gpr_free(r); @@ -263,7 +252,6 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts, } } grpc_ares_request_unref(r); - grpc_exec_ctx_finish(); } static const char g_service_config_attribute_prefix[] = "grpc_config="; diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 4463673e1f..10404ec4ef 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -163,7 +163,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { } else { grpc_millis next_try = grpc_backoff_step(&r->backoff_state).next_attempt_start_time; - grpc_millis timeout = next_try - grpc_exec_ctx_now(); + grpc_millis timeout = next_try - ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); GPR_ASSERT(!r->have_retry_timer); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index c8583687d5..98f96b5750 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -458,7 +458,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; const grpc_millis time_til_next = - c->backoff_result.next_attempt_start_time - grpc_exec_ctx_now(); + c->backoff_result.next_attempt_start_time - ExecCtx::Get()->Now(); if (time_til_next <= 0) { gpr_log(GPR_INFO, "Retry immediately"); } else { diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index cb8e480734..fbab57769c 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -132,10 +132,8 @@ void grpc_subchannel_index_shutdown(void) { void grpc_subchannel_index_unref(void) { if (gpr_unref(&g_refcount)) { - ExecCtx _local_exec_ctx; gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_subchannel_index, exec_ctx); - grpc_exec_ctx_finish(); + gpr_avl_unref(g_subchannel_index, ExecCtx::Get()); } } @@ -145,12 +143,12 @@ grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) { // Lock, and take a reference to the subchannel index. // We don't need to do the search under a lock as avl's are immutable. gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); + gpr_avl index = gpr_avl_ref(g_subchannel_index, ExecCtx::Get()); gpr_mu_unlock(&g_mu); grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( - (grpc_subchannel*)gpr_avl_get(index, key, exec_ctx), "index_find"); - gpr_avl_unref(index, exec_ctx); + (grpc_subchannel*)gpr_avl_get(index, key, ExecCtx::Get()), "index_find"); + gpr_avl_unref(index, ExecCtx::Get()); return c; } @@ -166,11 +164,11 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); + gpr_avl index = gpr_avl_ref(g_subchannel_index, ExecCtx::Get()); gpr_mu_unlock(&g_mu); // - Check to see if a subchannel already exists - c = (grpc_subchannel*)gpr_avl_get(index, key, exec_ctx); + c = (grpc_subchannel*)gpr_avl_get(index, key, ExecCtx::Get()); if (c != NULL) { c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); } @@ -180,8 +178,9 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, } else { // no -> update the avl and compare/swap gpr_avl updated = gpr_avl_add( - gpr_avl_ref(index, exec_ctx), subchannel_key_copy(key), - GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), exec_ctx); + gpr_avl_ref(index, ExecCtx::Get()), subchannel_key_copy(key), + GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), + ExecCtx::Get()); // it may happen (but it's expected to be unlikely) // that some other thread has changed the index: @@ -193,9 +192,9 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, } gpr_mu_unlock(&g_mu); - gpr_avl_unref(updated, exec_ctx); + gpr_avl_unref(updated, ExecCtx::Get()); } - gpr_avl_unref(index, exec_ctx); + gpr_avl_unref(index, ExecCtx::Get()); } if (need_to_unref_constructed) { @@ -212,21 +211,22 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); + gpr_avl index = gpr_avl_ref(g_subchannel_index, ExecCtx::Get()); gpr_mu_unlock(&g_mu); // Check to see if this key still refers to the previously // registered subchannel - grpc_subchannel* c = (grpc_subchannel*)gpr_avl_get(index, key, exec_ctx); + grpc_subchannel* c = + (grpc_subchannel*)gpr_avl_get(index, key, ExecCtx::Get()); if (c != constructed) { - gpr_avl_unref(index, exec_ctx); + gpr_avl_unref(index, ExecCtx::Get()); break; } // compare and swap the update (some other thread may have // mutated the index behind us) gpr_avl updated = - gpr_avl_remove(gpr_avl_ref(index, exec_ctx), key, exec_ctx); + gpr_avl_remove(gpr_avl_ref(index, ExecCtx::Get()), key, ExecCtx::Get()); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { @@ -235,8 +235,8 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, } gpr_mu_unlock(&g_mu); - gpr_avl_unref(updated, exec_ctx); - gpr_avl_unref(index, exec_ctx); + gpr_avl_unref(updated, ExecCtx::Get()); + gpr_avl_unref(index, ExecCtx::Get()); } } diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 1b9ce3b996..015a3ce124 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -100,7 +100,7 @@ static void decrease_call_count(channel_data* chand) { if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) { GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_idle_timer"); grpc_timer_init(&chand->max_idle_timer, - grpc_exec_ctx_now() + chand->max_connection_idle, + ExecCtx::Get()->Now() + chand->max_connection_idle, &chand->close_max_idle_channel); } } @@ -121,7 +121,7 @@ static void start_max_age_timer_after_init(void* arg, grpc_error* error) { chand->max_age_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); grpc_timer_init(&chand->max_age_timer, - grpc_exec_ctx_now() + chand->max_connection_age, + ExecCtx::Get()->Now() + chand->max_connection_age, &chand->close_max_age_channel); gpr_mu_unlock(&chand->max_age_timer_mu); grpc_transport_op* op = grpc_make_transport_op(NULL); @@ -141,7 +141,7 @@ static void start_max_age_grace_timer_after_goaway_op(void* arg, grpc_timer_init(&chand->max_age_grace_timer, chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE ? GRPC_MILLIS_INF_FUTURE - : grpc_exec_ctx_now() + chand->max_connection_age_grace, + : ExecCtx::Get()->Now() + chand->max_connection_age_grace, &chand->force_close_max_age_channel); gpr_mu_unlock(&chand->max_age_timer_mu); GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc index e7741f97d4..3afca884ca 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc @@ -95,7 +95,7 @@ grpc_channel* grpc_insecure_channel_create(const char* target, new_args); // Clean up. grpc_channel_args_destroy(new_args); - grpc_exec_ctx_finish(); + return channel != NULL ? channel : grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index 37e6f1f30d..b0eff1c992 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -60,8 +60,6 @@ grpc_channel* grpc_insecure_channel_create_from_fd( grpc_channel_args_destroy(final_args); grpc_chttp2_transport_start_reading(transport, NULL); - grpc_exec_ctx_finish(); - return channel != NULL ? channel : grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index a5da71a67c..bebc38c248 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -211,7 +211,6 @@ grpc_channel* grpc_secure_channel_create(grpc_channel_credentials* creds, new_args); // Clean up. grpc_channel_args_destroy(new_args); - grpc_exec_ctx_finish(); } return channel != NULL ? channel : grpc_lame_client_channel_create( diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 6eb1e3491c..bbcfb1b195 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -132,7 +132,7 @@ static void on_accept(void* arg, grpc_endpoint* tcp, connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. - const grpc_millis deadline = grpc_exec_ctx_now() + 120 * GPR_MS_PER_SEC; + const grpc_millis deadline = ExecCtx::Get()->Now() + 120 * GPR_MS_PER_SEC; grpc_handshake_manager_do_handshake(connection_state->handshake_mgr, tcp, state->args, deadline, acceptor, on_handshake_done, connection_state); @@ -161,10 +161,10 @@ static void tcp_server_shutdown_complete(void* arg, grpc_error* error) { gpr_mu_unlock(&state->mu); // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); if (destroy_done != NULL) { destroy_done->cb(destroy_done->cb_arg, GRPC_ERROR_REF(error)); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); } grpc_channel_args_destroy(state->args); gpr_mu_destroy(&state->mu); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc index d7aad110b9..6cbb26a349 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc @@ -39,6 +39,6 @@ int grpc_server_add_insecure_http2_port(grpc_server* server, const char* addr) { GRPC_ERROR_UNREF(err); } - grpc_exec_ctx_finish(); + return port_num; } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index 57a8316a58..e5419e5e6e 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -61,7 +61,6 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, grpc_server_setup_transport(server, transport, NULL, server_args); grpc_chttp2_transport_start_reading(transport, NULL); - grpc_exec_ctx_finish(); } #else // !GPR_SUPPORT_CHANNELS_FROM_FD diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc index dc7da96210..aeae8f42e3 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc @@ -76,7 +76,7 @@ done: if (sc != NULL) { GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server"); } - grpc_exec_ctx_finish(); + if (err != GRPC_ERROR_NONE) { const char* msg = grpc_error_string(err); gpr_log(GPR_ERROR, "%s", msg); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index ecf3cf69ef..e49e26fc35 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -530,7 +530,7 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, - grpc_exec_ctx_now() + t->keepalive_time, + ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no @@ -2585,14 +2585,14 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, - grpc_exec_ctx_now() + t->keepalive_time, + ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } } else if (error == GRPC_ERROR_CANCELLED) { /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, - grpc_exec_ctx_now() + t->keepalive_time, + ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); @@ -2602,7 +2602,7 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, - grpc_exec_ctx_now() + t->keepalive_time, + ExecCtx::Get()->Now() + t->keepalive_time, &t->keepalive_watchdog_fired_locked); } @@ -2614,7 +2614,7 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_timer_cancel(&t->keepalive_watchdog_timer); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, - grpc_exec_ctx_now() + t->keepalive_time, + ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } } diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 1609fa0532..e54d59b5fa 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -160,7 +160,7 @@ TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t, .set_min_control_value(-1) .set_max_control_value(25) .set_integral_range(10)), - last_pid_update_(grpc_exec_ctx_now()) {} + last_pid_update_(ExecCtx::Get()->Now()) {} uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) { FlowControlTrace trace("t updt sent", this, nullptr); @@ -306,7 +306,7 @@ double TransportFlowControl::TargetLogBdp() { } double TransportFlowControl::SmoothLogBdp(double value) { - grpc_millis now = grpc_exec_ctx_now(); + grpc_millis now = ExecCtx::Get()->Now(); double bdp_error = value - pid_controller_.last_control_value(); const double dt = (double)(now - last_pid_update_) * 1e-3; last_pid_update_ = now; diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.cc b/src/core/ext/transport/chttp2/transport/frame_ping.cc index 1b0dc0dfaa..60172be9cb 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.cc +++ b/src/core/ext/transport/chttp2/transport/frame_ping.cc @@ -89,7 +89,7 @@ grpc_error* grpc_chttp2_ping_parser_parse(void* parser, grpc_chttp2_ack_ping(t, p->opaque_8bytes); } else { if (!t->is_client) { - grpc_millis now = grpc_exec_ctx_now(); + grpc_millis now = ExecCtx::Get()->Now(); grpc_millis next_allowed_ping = t->ping_recv_state.last_ping_recv_time + t->ping_policy.min_recv_ping_interval_without_data; diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index e225a0244a..efb6e54ce7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -570,7 +570,7 @@ static void deadline_enc(grpc_chttp2_hpack_compressor* c, grpc_millis deadline, framer_state* st) { char timeout_str[GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; grpc_mdelem mdelem; - grpc_http2_encode_timeout(deadline - grpc_exec_ctx_now(), timeout_str); + grpc_http2_encode_timeout(deadline - ExecCtx::Get()->Now(), timeout_str); mdelem = grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TIMEOUT, grpc_slice_from_copied_string(timeout_str)); hpack_enc(c, mdelem, st); diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 5731e9ff78..f7f83c9aee 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -436,7 +436,7 @@ static void on_initial_header(void* tp, grpc_mdelem md) { } if (timeout != GRPC_MILLIS_INF_FUTURE) { grpc_chttp2_incoming_metadata_buffer_set_deadline( - &s->metadata_buffer[0], grpc_exec_ctx_now() + timeout); + &s->metadata_buffer[0], ExecCtx::Get()->Now() + timeout); } GRPC_MDELEM_UNREF(md); } else { diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index ec83633472..4f76c2eb23 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -68,7 +68,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { } return; } - grpc_millis now = grpc_exec_ctx_now(); + grpc_millis now = ExecCtx::Get()->Now(); grpc_millis next_allowed_ping = t->ping_state.last_ping_sent_time + t->ping_policy.min_sent_ping_interval_without_data; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index bff0fb36ad..3411acc563 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -417,7 +417,6 @@ static void on_failed(bidirectional_stream* stream, int net_error) { gpr_mu_unlock(&s->mu); execute_from_storage(s); GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); - grpc_exec_ctx_finish(); } /* @@ -444,7 +443,6 @@ static void on_canceled(bidirectional_stream* stream) { gpr_mu_unlock(&s->mu); execute_from_storage(s); GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); - grpc_exec_ctx_finish(); } /* @@ -463,7 +461,6 @@ static void on_succeeded(bidirectional_stream* stream) { gpr_mu_unlock(&s->mu); execute_from_storage(s); GRPC_CRONET_STREAM_UNREF(s, "cronet transport"); - grpc_exec_ctx_finish(); } /* @@ -492,7 +489,6 @@ static void on_stream_ready(bidirectional_stream* stream) { } gpr_mu_unlock(&s->mu); execute_from_storage(s); - grpc_exec_ctx_finish(); } /* @@ -548,7 +544,6 @@ static void on_response_headers_received( } gpr_mu_unlock(&s->mu); execute_from_storage(s); - grpc_exec_ctx_finish(); } /* @@ -566,7 +561,6 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) { s->state.state_callback_received[OP_SEND_MESSAGE] = true; gpr_mu_unlock(&s->mu); execute_from_storage(s); - grpc_exec_ctx_finish(); } /* @@ -608,7 +602,6 @@ static void on_read_completed(bidirectional_stream* stream, char* data, gpr_mu_unlock(&s->mu); execute_from_storage(s); } - grpc_exec_ctx_finish(); } /* @@ -666,7 +659,6 @@ static void on_response_trailers_received( gpr_mu_unlock(&s->mu); execute_from_storage(s); } - grpc_exec_ctx_finish(); } /* diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1c7f2873d1..a79b2b26b0 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -1107,7 +1107,6 @@ void grpc_inproc_transport_init(void) { grpc_slice_unref_internal(auth_tmp); g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); - grpc_exec_ctx_finish(); } static const grpc_transport_vtable inproc_vtable = { @@ -1182,7 +1181,6 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server, grpc_channel_args_destroy(client_args); // Now finish scheduled operations - grpc_exec_ctx_finish(); return channel; } @@ -1194,5 +1192,4 @@ void grpc_inproc_transport_shutdown(void) { grpc_slice_unref_internal(g_fake_path_value); grpc_slice_unref_internal(g_fake_auth_key); grpc_slice_unref_internal(g_fake_auth_value); - grpc_exec_ctx_finish(); } diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index e6c55f420d..b75ce79d46 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -36,7 +36,7 @@ grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff) { backoff->current_backoff = backoff->initial_backoff; const grpc_millis initial_timeout = GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout); - const grpc_millis now = grpc_exec_ctx_now(); + const grpc_millis now = ExecCtx::Get()->Now(); const grpc_backoff_result result = {now + initial_timeout, now + backoff->current_backoff}; return result; @@ -67,7 +67,7 @@ grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff) { backoff->min_connect_timeout); const grpc_millis next_timeout = GPR_MIN( (grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff); - const grpc_millis now = grpc_exec_ctx_now(); + const grpc_millis now = ExecCtx::Get()->Now(); const grpc_backoff_result result = {now + current_timeout, now + next_timeout}; return result; diff --git a/src/core/lib/compression/stream_compression_gzip.cc b/src/core/lib/compression/stream_compression_gzip.cc index 4d5d0955ce..3fae3490ce 100644 --- a/src/core/lib/compression/stream_compression_gzip.cc +++ b/src/core/lib/compression/stream_compression_gzip.cc @@ -58,7 +58,7 @@ static bool gzip_flate(grpc_stream_compression_context_gzip* ctx, if (r < 0 && r != Z_BUF_ERROR) { gpr_log(GPR_ERROR, "zlib error (%d)", r); grpc_slice_unref_internal(slice_out); - grpc_exec_ctx_finish(); + return false; } else if (r == Z_STREAM_END && ctx->flate == inflate) { eoc = true; @@ -89,7 +89,7 @@ static bool gzip_flate(grpc_stream_compression_context_gzip* ctx, default: gpr_log(GPR_ERROR, "zlib error (%d)", r); grpc_slice_unref_internal(slice_out); - grpc_exec_ctx_finish(); + return false; } } else if (flush == Z_FINISH) { @@ -105,7 +105,7 @@ static bool gzip_flate(grpc_stream_compression_context_gzip* ctx, default: gpr_log(GPR_ERROR, "zlib error (%d)", r); grpc_slice_unref_internal(slice_out); - grpc_exec_ctx_finish(); + return false; } } @@ -121,7 +121,7 @@ static bool gzip_flate(grpc_stream_compression_context_gzip* ctx, } max_output_size -= (slice_size - ctx->zs.avail_out); } - grpc_exec_ctx_finish(); + if (end_of_context) { *end_of_context = eoc; } diff --git a/src/core/lib/debug/stats.h b/src/core/lib/debug/stats.h index 1c2826506c..24b0084130 100644 --- a/src/core/lib/debug/stats.h +++ b/src/core/lib/debug/stats.h @@ -35,7 +35,7 @@ typedef struct grpc_stats_data { extern grpc_stats_data* grpc_stats_per_cpu_storage; #define GRPC_THREAD_STATS_DATA() \ - (&grpc_stats_per_cpu_storage[(exec_ctx)->starting_cpu]) + (&grpc_stats_per_cpu_storage[ExecCtx::Get()->starting_cpu()]) #define GRPC_STATS_INC_COUNTER(ctr) \ (gpr_atm_no_barrier_fetch_add(&GRPC_THREAD_STATS_DATA()->counters[(ctr)], 1)) diff --git a/src/core/lib/iomgr/block_annotate.h b/src/core/lib/iomgr/block_annotate.h index 9db3cf0199..7783da0c14 100644 --- a/src/core/lib/iomgr/block_annotate.h +++ b/src/core/lib/iomgr/block_annotate.h @@ -19,6 +19,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H #define GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H +#include "src/core/lib/iomgr/exec_ctx.h" + #ifdef __cplusplus extern "C" { #endif @@ -39,25 +41,18 @@ void gpr_thd_end_blocking_region(); do { \ gpr_thd_start_blocking_region(); \ } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ - do { \ - gpr_thd_end_blocking_region(); \ - } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX() \ - do { \ - gpr_thd_end_blocking_region(); \ - grpc_exec_ctx_invalidate_now(); \ +#define GRPC_SCHEDULING_END_BLOCKING_REGION \ + do { \ + gpr_thd_end_blocking_region(); \ + ExecCtx::Get()->InvalidateNow(); \ } while (0) #else #define GRPC_SCHEDULING_START_BLOCKING_REGION \ do { \ } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ - do { \ - } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX() \ - do { \ - grpc_exec_ctx_invalidate_now(); \ +#define GRPC_SCHEDULING_END_BLOCKING_REGION \ + do { \ + ExecCtx::Get()->InvalidateNow(); \ } while (0) #endif diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index b1b8fffdca..c9f5448630 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -128,20 +128,24 @@ grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { } static void push_last_on_exec_ctx(grpc_combiner* lock) { - lock->next_combiner_on_this_exec_ctx = NULL; - if (exec_ctx->active_combiner == NULL) { - exec_ctx->active_combiner = exec_ctx->last_combiner = lock; + lock->next_combiner_on_this_exec_ctx = nullptr; + if (ExecCtx::Get()->combiner_data()->active_combiner == nullptr) { + ExecCtx::Get()->combiner_data()->active_combiner = + ExecCtx::Get()->combiner_data()->last_combiner = lock; } else { - exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock; - exec_ctx->last_combiner = lock; + ExecCtx::Get() + ->combiner_data() + ->last_combiner->next_combiner_on_this_exec_ctx = lock; + ExecCtx::Get()->combiner_data()->last_combiner = lock; } } static void push_first_on_exec_ctx(grpc_combiner* lock) { - lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner; - exec_ctx->active_combiner = lock; + lock->next_combiner_on_this_exec_ctx = + ExecCtx::Get()->combiner_data()->active_combiner; + ExecCtx::Get()->combiner_data()->active_combiner = lock; if (lock->next_combiner_on_this_exec_ctx == NULL) { - exec_ctx->last_combiner = lock; + ExecCtx::Get()->combiner_data()->last_combiner = lock; } } @@ -161,7 +165,7 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) { GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(); GPR_TIMER_MARK("combiner.initiated", 0); gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, - (gpr_atm)exec_ctx); + (gpr_atm)ExecCtx::Get()); // first element on this list: add it to the list of combiner locks // executing within this exec_ctx push_last_on_exec_ctx(lock); @@ -170,7 +174,7 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) { // offload for one or two actions, and that's fine gpr_atm initiator = gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null); - if (initiator != 0 && initiator != (gpr_atm)exec_ctx) { + if (initiator != 0 && initiator != (gpr_atm)ExecCtx::Get()) { gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0); } } @@ -182,10 +186,12 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) { } static void move_next() { - exec_ctx->active_combiner = - exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; - if (exec_ctx->active_combiner == NULL) { - exec_ctx->last_combiner = NULL; + ExecCtx::Get()->combiner_data()->active_combiner = + ExecCtx::Get() + ->combiner_data() + ->active_combiner->next_combiner_on_this_exec_ctx; + if (ExecCtx::Get()->combiner_data()->active_combiner == NULL) { + ExecCtx::Get()->combiner_data()->last_combiner = NULL; } } @@ -203,7 +209,7 @@ static void queue_offload(grpc_combiner* lock) { bool grpc_combiner_continue_exec_ctx() { GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0); - grpc_combiner* lock = exec_ctx->active_combiner; + grpc_combiner* lock = ExecCtx::Get()->combiner_data()->active_combiner; if (lock == NULL) { GPR_TIMER_END("combiner.continue_exec_ctx", 0); return false; @@ -217,10 +223,11 @@ bool grpc_combiner_continue_exec_ctx() { "contended=%d " "exec_ctx_ready_to_finish=%d " "time_to_execute_final_list=%d", - lock, contended, grpc_exec_ctx_ready_to_finish(), + lock, contended, + ExecCtx::Get()->IsReadyToFinish(), lock->time_to_execute_final_list)); - if (contended && grpc_exec_ctx_ready_to_finish() && + if (contended && ExecCtx::Get()->IsReadyToFinish() && grpc_executor_is_threaded()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be @@ -326,11 +333,11 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, - "C:%p grpc_combiner_execute_finally c=%p; ac=%p", - lock, closure, exec_ctx->active_combiner)); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, + closure, ExecCtx::Get()->combiner_data()->active_combiner)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); - if (exec_ctx->active_combiner != lock) { + if (ExecCtx::Get()->combiner_data()->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(enqueue_finally, closure, grpc_combiner_scheduler(lock)), diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 696ac6942f..1a281322a8 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -65,7 +65,6 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, "socketpair-client"); gpr_free(final_name); - grpc_exec_ctx_finish(); return p; } diff --git a/src/core/lib/iomgr/endpoint_pair_windows.cc b/src/core/lib/iomgr/endpoint_pair_windows.cc index d464617097..e0f211cdf9 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.cc +++ b/src/core/lib/iomgr/endpoint_pair_windows.cc @@ -77,7 +77,7 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( channel_args, "endpoint:server"); p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), channel_args, "endpoint:client"); - grpc_exec_ctx_finish(); + return p; } diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index ce8b538773..157f12a36d 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -157,11 +157,7 @@ static void unref_errs(grpc_error* err) { } } -static void unref_slice(grpc_slice slice) { - ExecCtx _local_exec_ctx; - grpc_slice_unref_internal(slice); - grpc_exec_ctx_finish(); -} +static void unref_slice(grpc_slice slice) { grpc_slice_unref_internal(slice); } static void unref_strs(grpc_error* err) { for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) { diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 31f51df15d..2b486887b8 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -554,7 +554,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { static int poll_deadline_to_millis_timeout(grpc_millis millis) { if (millis == GRPC_MILLIS_INF_FUTURE) return -1; - grpc_millis delta = millis - grpc_exec_ctx_now(); + grpc_millis delta = millis - ExecCtx::Get()->Now(); if (delta > INT_MAX) { return INT_MAX; } else if (delta < 0) { @@ -630,7 +630,7 @@ static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) { timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(); + GRPC_SCHEDULING_END_BLOCKING_REGION; } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -743,7 +743,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, SET_KICK_STATE(worker, KICKED); } } - grpc_exec_ctx_invalidate_now(); + ExecCtx::Get()->InvalidateNow(); } if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -848,7 +848,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, /* Make sure we appear kicked */ SET_KICK_STATE(worker, KICKED); grpc_closure_list_move(&worker->schedule_on_end_work, - &exec_ctx->closure_list); + ExecCtx::Get()->closure_list()); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { if (worker->next != worker && worker->next->state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -859,9 +859,9 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, SET_KICK_STATE(worker->next, DESIGNATED_POLLER); GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(); gpr_cv_signal(&worker->next->cv); - if (grpc_exec_ctx_has_work()) { + if (ExecCtx::Get()->HasWork()) { gpr_mu_unlock(&pollset->mu); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } } else { @@ -892,12 +892,12 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, found_worker = check_neighborhood_for_available_poller(neighborhood); gpr_mu_unlock(&neighborhood->mu); } - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } - } else if (grpc_exec_ctx_has_work()) { + } else if (ExecCtx::Get()->HasWork()) { gpr_mu_unlock(&pollset->mu); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } if (worker->initialized_cv) { @@ -948,9 +948,9 @@ static grpc_error* pollset_work(grpc_pollset* ps, process_epoll_events() returns very quickly: It just queues the work on exec_ctx but does not execute it (the actual exectution or more - accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting - a designated poller). So we are not waiting long periods without a - designated poller */ + accurately ExecCtx::Get()->Flush() happens in end_worker() AFTER + selecting a designated poller). So we are not waiting long periods + without a designated poller */ if (gpr_atm_acq_load(&g_epoll_set.cursor) == gpr_atm_acq_load(&g_epoll_set.num_events)) { append_error(&error, do_epoll_wait(ps, deadline), err_desc); diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 0979a45270..385b5f68d0 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -682,7 +682,7 @@ static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { static int poll_deadline_to_millis_timeout(grpc_millis millis) { if (millis == GRPC_MILLIS_INF_FUTURE) return -1; - grpc_millis delta = millis - grpc_exec_ctx_now(); + grpc_millis delta = millis - ExecCtx::Get()->Now(); if (delta > INT_MAX) return INT_MAX; else if (delta < 0) @@ -804,7 +804,7 @@ static grpc_error* pollable_epoll(pollable* p, grpc_millis deadline) { r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(); + GRPC_SCHEDULING_END_BLOCKING_REGION; } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -902,7 +902,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, worker->pollable_obj, worker); } } - grpc_exec_ctx_invalidate_now(); + ExecCtx::Get()->InvalidateNow(); } else { gpr_mu_unlock(&pollset->mu); } @@ -970,7 +970,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR " kwp=%d pollable=%p", - pollset, worker_hdl, WORKER_PTR, grpc_exec_ctx_now(), deadline, + pollset, worker_hdl, WORKER_PTR, ExecCtx::Get()->Now(), deadline, pollset->kicked_without_poller, pollset->active_pollable); } static const char* err_desc = "pollset_work"; @@ -990,7 +990,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, &error, pollable_process_events(pollset, WORKER_PTR->pollable_obj, false), err_desc); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); } diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index c42a609fee..a9b094a2fa 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1090,7 +1090,7 @@ static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { static int poll_deadline_to_millis_timeout(grpc_millis millis) { if (millis == GRPC_MILLIS_INF_FUTURE) return -1; - grpc_millis delta = millis - grpc_exec_ctx_now(); + grpc_millis delta = millis - ExecCtx::Get()->Now(); if (delta > INT_MAX) return INT_MAX; else if (delta < 0) @@ -1220,7 +1220,7 @@ static void pollset_work_and_unlock(grpc_pollset* pollset, GRPC_STATS_INC_SYSCALL_POLL(); ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); - GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (ep_rv < 0) { if (errno != EINTR) { gpr_asprintf(&err_msg, @@ -1350,7 +1350,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pollset_work_and_unlock(pollset, &worker, timeout_ms, &g_orig_sigmask, &error); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->po.mu); @@ -1373,7 +1373,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, finish_shutdown_locked(pollset); gpr_mu_unlock(&pollset->po.mu); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->po.mu); } diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index c7189950f0..cab4f7547c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -976,7 +976,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, GRPC_SCHEDULING_START_BLOCKING_REGION; GRPC_STATS_INC_SYSCALL_POLL(); r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); @@ -1040,7 +1040,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, worker list, which means nobody could ask us to re-evaluate polling). */ done: if (!locked) { - queued_work |= grpc_exec_ctx_flush(); + queued_work |= ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); locked = 1; } @@ -1074,7 +1074,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(pollset); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * pollset_work. @@ -1083,7 +1083,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { GRPC_CLOSURE_LIST_SCHED(&pollset->idle_jobs); gpr_mu_unlock(&pollset->mu); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } } @@ -1110,7 +1110,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { static int poll_deadline_to_millis_timeout(grpc_millis deadline) { if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; if (deadline == 0) return 0; - grpc_millis n = deadline - grpc_exec_ctx_now(); + grpc_millis n = deadline - ExecCtx::Get()->Now(); if (n < 0) return 0; if (n > INT_MAX) return -1; return (int)n; diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index c10d1e60f0..fe5a0e7e2d 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -27,45 +27,19 @@ thread_local ExecCtx* exec_ctx = nullptr; -ExecCtx::ExecCtx() - : closure_list(GRPC_CLOSURE_LIST_INIT), - active_combiner(nullptr), - last_combiner(nullptr), - flags(GRPC_EXEC_CTX_FLAG_IS_FINISHED), - starting_cpu(gpr_cpu_current_cpu()), - check_ready_to_finish_arg(nullptr), - check_ready_to_finish(nullptr), - now_is_valid(false), - now(0), - last_exec_ctx(exec_ctx) { - exec_ctx = this; -} - -ExecCtx::ExecCtx(uintptr_t fl, bool (*finish_check)(void* arg), - void* finish_check_arg) - : closure_list(GRPC_CLOSURE_LIST_INIT), - active_combiner(nullptr), - last_combiner(nullptr), - flags(fl), - starting_cpu(gpr_cpu_current_cpu()), - check_ready_to_finish_arg(finish_check_arg), - check_ready_to_finish(finish_check), - now_is_valid(false), - now(0), - last_exec_ctx(exec_ctx) { - exec_ctx = this; -} - +ExecCtx::ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { exec_ctx = this; } +ExecCtx::ExecCtx(uintptr_t fl) : flags_(fl) { exec_ctx = this; } ExecCtx::~ExecCtx() { GPR_ASSERT(exec_ctx == this); - grpc_exec_ctx_finish(); - exec_ctx = last_exec_ctx; + flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; + Flush(); + exec_ctx = last_exec_ctx_; } -bool grpc_exec_ctx_ready_to_finish() { - if ((exec_ctx->flags & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { - if (exec_ctx->check_ready_to_finish(exec_ctx->check_ready_to_finish_arg)) { - exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; +bool ExecCtx::IsReadyToFinish() { + if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { + if (CheckReadyToFinish()) { + flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; return true; } return false; @@ -74,21 +48,7 @@ bool grpc_exec_ctx_ready_to_finish() { } } -bool grpc_never_ready_to_finish(void* arg_ignored) { return false; } - -bool grpc_always_ready_to_finish(void* arg_ignored) { return true; } - -bool grpc_exec_ctx_has_work() { - return exec_ctx->active_combiner != NULL || - !grpc_closure_list_empty(exec_ctx->closure_list); -} - -void grpc_exec_ctx_finish() { - exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; - grpc_exec_ctx_flush(); -} - -static void exec_ctx_run(grpc_closure* closure, grpc_error* error) { +void exec_ctx_run(grpc_closure* closure, grpc_error* error) { #ifndef NDEBUG closure->scheduled = false; if (GRPC_TRACER_ON(grpc_trace_closure)) { @@ -107,13 +67,13 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error* error) { GRPC_ERROR_UNREF(error); } -bool grpc_exec_ctx_flush() { +bool ExecCtx::Flush() { bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); for (;;) { - if (!grpc_closure_list_empty(exec_ctx->closure_list)) { - grpc_closure* c = exec_ctx->closure_list.head; - exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; + if (!grpc_closure_list_empty(closure_list_)) { + grpc_closure* c = closure_list_.head; + closure_list_.head = closure_list_.tail = NULL; while (c != NULL) { grpc_closure* next = c->next_data.next; grpc_error* error = c->error_data.error; @@ -125,13 +85,13 @@ bool grpc_exec_ctx_flush() { break; } } - GPR_ASSERT(exec_ctx->active_combiner == NULL); + GPR_ASSERT(combiner_data_.active_combiner == nullptr); GPR_TIMER_END("grpc_exec_ctx_flush", 0); return did_something; } -static void exec_ctx_sched(grpc_closure* closure, grpc_error* error) { - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); +void exec_ctx_sched(grpc_closure* closure, grpc_error* error) { + grpc_closure_list_append(exec_ctx->closure_list(), closure, error); } static gpr_timespec @@ -139,7 +99,7 @@ static gpr_timespec // last enum value in // gpr_clock_type -void grpc_exec_ctx_global_init(void) { +void ExecCtx::GlobalInit(void) { for (int i = 0; i < GPR_TIMESPAN; i++) { g_start_time[i] = gpr_now((gpr_clock_type)i); } @@ -147,7 +107,7 @@ void grpc_exec_ctx_global_init(void) { g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN); } -void grpc_exec_ctx_global_shutdown(void) {} +void ExecCtx::GlobalShutdown(void) {} static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); @@ -168,16 +128,6 @@ static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { return (gpr_atm)x; } -grpc_millis grpc_exec_ctx_now() { - if (!exec_ctx->now_is_valid) { - exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); - exec_ctx->now_is_valid = true; - } - return exec_ctx->now; -} - -void grpc_exec_ctx_invalidate_now() { exec_ctx->now_is_valid = false; } - gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock_type) { // special-case infinities as grpc_millis can be 32bit on some platforms @@ -204,6 +154,16 @@ grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { return timespec_to_atm_round_up(ts); } +grpc_millis ExecCtx::Now() { + if (!now_is_valid_) { + now_ = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + now_is_valid_ = true; + } + return now_; +} + +ExecCtx* ExecCtx::Get() { return exec_ctx; } + static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { exec_ctx_run, exec_ctx_sched, "exec_ctx"}; static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index a80bcbbc0d..a71e43e178 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -66,95 +66,81 @@ typedef struct grpc_combiner grpc_combiner; * - Instances are always passed as the first argument to a function that * takes it, and always as a pointer (grpc_exec_ctx is never copied). */ -struct grpc_exec_ctx { - grpc_closure_list closure_list; - /** currently active combiner: updated only via combiner.c */ - grpc_combiner* active_combiner; - /** last active combiner in the active combiner list */ - grpc_combiner* last_combiner; - uintptr_t flags; - unsigned starting_cpu; - void* check_ready_to_finish_arg; - bool (*check_ready_to_finish)(void* arg); - - bool now_is_valid; - grpc_millis now; - const char* creator; -}; - -extern grpc_closure_scheduler* grpc_schedule_on_exec_ctx; - -bool grpc_exec_ctx_has_work(); - -/** Flush any work that has been enqueued onto this grpc_exec_ctx. - * Caller must guarantee that no interfering locks are held. - * Returns true if work was performed, false otherwise. */ -bool grpc_exec_ctx_flush(); -/** Finish any pending work for a grpc_exec_ctx. Must be called before - * the instance is destroyed, or work may be lost. */ -void grpc_exec_ctx_finish(); -/** Returns true if we'd like to leave this execution context as soon as - possible: useful for deciding whether to do something more or not depending - on outside context */ -bool grpc_exec_ctx_ready_to_finish(); -/** A finish check that is never ready to finish */ -bool grpc_never_ready_to_finish(void* arg_ignored); -/** A finish check that is always ready to finish */ -bool grpc_always_ready_to_finish(void* arg_ignored); - -void grpc_exec_ctx_global_init(void); - -void grpc_exec_ctx_global_init(void); -void grpc_exec_ctx_global_shutdown(void); - -grpc_millis grpc_exec_ctx_now(); -void grpc_exec_ctx_invalidate_now(); -gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); -grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); -grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); - -inline grpc_exec_ctx make_exec_ctx(grpc_exec_ctx r) { - grpc_exec_ctx_flush(); - return r; -} - class ExecCtx { public: ExecCtx(); - ExecCtx(uintptr_t fl, bool (*finish_check)(void* arg), - void* finish_check_arg); + ExecCtx(uintptr_t fl); ~ExecCtx(); - grpc_closure_list closure_list; - /** currently active combiner: updated only via combiner.c */ - grpc_combiner* active_combiner; - /** last active combiner in the active combiner list */ - grpc_combiner* last_combiner; - uintptr_t flags; - unsigned starting_cpu; - void* check_ready_to_finish_arg; - bool (*check_ready_to_finish)(void* arg); - - bool now_is_valid; - grpc_millis now; - - private: - ExecCtx* last_exec_ctx; -}; + unsigned starting_cpu() const { return starting_cpu_; } + + struct CombinerData { + /* currently active combiner: updated only via combiner.c */ + grpc_combiner* active_combiner; + /* last active combiner in the active combiner list */ + grpc_combiner* last_combiner; + }; + + /** Only to be used by grpc-combiner code */ + CombinerData* combiner_data() { return &combiner_data_; } + + grpc_closure_list* closure_list() { return &closure_list_; } + + bool HasWork() { + return combiner_data_.active_combiner != NULL || + !grpc_closure_list_empty(closure_list_); + } + + /** Flush any work that has been enqueued onto this grpc_exec_ctx. + * Caller must guarantee that no interfering locks are held. + * Returns true if work was performed, false otherwise. */ + bool Flush(); + + /** Returns true if we'd like to leave this execution context as soon as +possible: useful for deciding whether to do something more or not depending +on outside context */ + bool IsReadyToFinish(); -extern thread_local ExecCtx* exec_ctx; + grpc_millis Now(); -/* initializer for grpc_exec_ctx: - * prefer to use GRPC_EXEC_CTX_INIT whenever possible */ -#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ - make_exec_ctx(grpc_exec_ctx{GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, \ - gpr_cpu_current_cpu(), finish_check_arg, \ - finish_check, false, 0, __PRETTY_FUNCTION__}) + void InvalidateNow() { now_is_valid_ = false; } -/* initialize an execution context at the top level of an API call into grpc - (this is safe to use elsewhere, though possibly not as efficient) */ -#define GRPC_EXEC_CTX_INIT \ - GRPC_EXEC_CTX_INITIALIZER(GRPC_EXEC_CTX_FLAG_IS_FINISHED, NULL, NULL) + void SetNow(grpc_millis new_val) { + now_ = new_val; + now_is_valid_ = true; + } + + uintptr_t flags() { return flags_; } + + /** Finish any pending work for a grpc_exec_ctx. Must be called before + * the instance is destroyed, or work may be lost. */ + void Finish(); + + static void GlobalInit(void); + + static void GlobalShutdown(void); + + static ExecCtx* Get(); + + protected: + virtual bool CheckReadyToFinish() { return false; } + + grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT; + CombinerData combiner_data_ = {nullptr, nullptr}; + uintptr_t flags_; + unsigned starting_cpu_ = gpr_cpu_current_cpu(); + + bool now_is_valid_ = false; + grpc_millis now_ = 0; + + ExecCtx* last_exec_ctx_ = Get(); +}; + +extern grpc_closure_scheduler* grpc_schedule_on_exec_ctx; + +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); #ifdef __cplusplus } diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index f764d915ff..bf8805a2cd 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -78,7 +78,7 @@ static size_t run_closures(grpc_closure_list list) { GRPC_ERROR_UNREF(error); c = next; n++; - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); } return n; @@ -145,7 +145,7 @@ static void executor_thread(void* arg) { thread_state* ts = (thread_state*)arg; gpr_tls_set(&g_this_thread_state, (intptr_t)ts); - ExecCtx _local_exec_ctx(0, grpc_never_ready_to_finish, NULL); + ExecCtx _local_exec_ctx; size_t subtract_depth = 0; for (;;) { @@ -175,10 +175,9 @@ static void executor_thread(void* arg) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } - grpc_exec_ctx_invalidate_now(); + ExecCtx::Get()->InvalidateNow(); subtract_depth = run_closures(exec); } - grpc_exec_ctx_finish(); } static void executor_push(grpc_closure* closure, grpc_error* error, @@ -201,12 +200,12 @@ static void executor_push(grpc_closure* closure, grpc_error* error, gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); #endif } - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + grpc_closure_list_append(ExecCtx::Get()->closure_list(), closure, error); return; } thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state); if (ts == NULL) { - ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + ts = &g_thread_state[GPR_HASH_POINTER(ExecCtx::Get(), cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); } diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc index 8f15f5e6cc..1686bf2872 100644 --- a/src/core/lib/iomgr/iocp_windows.cc +++ b/src/core/lib/iomgr/iocp_windows.cc @@ -46,7 +46,7 @@ static DWORD deadline_to_millis_timeout(grpc_millis deadline) { if (deadline == GRPC_MILLIS_INF_FUTURE) { return INFINITE; } - grpc_millis now = grpc_exec_ctx_now(); + grpc_millis now = ExecCtx::Get()->Now(); if (deadline < now) return 0; grpc_millis timeout = deadline - now; if (timeout > std::numeric_limits<DWORD>::max()) return INFINITE; @@ -65,7 +65,7 @@ grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) { success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped, deadline_to_millis_timeout(deadline)); - grpc_exec_ctx_invalidate_now(); + ExecCtx::Get()->InvalidateNow(); if (success == 0 && overlapped == NULL) { return GRPC_IOCP_WORK_TIMEOUT; } @@ -118,16 +118,16 @@ void grpc_iocp_flush(void) { do { work_status = grpc_iocp_work(GRPC_MILLIS_INF_PAST); - } while (work_status == GRPC_IOCP_WORK_KICK || grpc_exec_ctx_flush()); + } while (work_status == GRPC_IOCP_WORK_KICK || ExecCtx::Get()->Flush()); } void grpc_iocp_shutdown(void) { ExecCtx _local_exec_ctx; while (gpr_atm_acq_load(&g_custom_events)) { grpc_iocp_work(GRPC_MILLIS_INF_FUTURE); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); } - grpc_exec_ctx_finish(); + GPR_ASSERT(CloseHandle(g_iocp)); } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 01d9964cc8..a1add4a303 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -49,7 +49,7 @@ void grpc_iomgr_init() { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); - grpc_exec_ctx_global_init(); + ExecCtx::GlobalInit(); grpc_executor_init(); grpc_timer_list_init(); g_root_object.next = g_root_object.prev = &g_root_object; @@ -98,11 +98,10 @@ void grpc_iomgr_shutdown() { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - exec_ctx->now_is_valid = true; - exec_ctx->now = GRPC_MILLIS_INF_FUTURE; + ExecCtx::Get()->SetNow(GRPC_MILLIS_INF_FUTURE); if (grpc_timer_check(NULL) == GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); grpc_iomgr_platform_flush(); gpr_mu_lock(&g_mu); continue; @@ -137,14 +136,14 @@ void grpc_iomgr_shutdown() { gpr_mu_unlock(&g_mu); grpc_timer_list_shutdown(); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); /* ensure all threads have left g_mu */ gpr_mu_lock(&g_mu); gpr_mu_unlock(&g_mu); grpc_iomgr_platform_shutdown(); - grpc_exec_ctx_global_shutdown(); + ExecCtx::GlobalShutdown(); grpc_network_status_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); diff --git a/src/core/lib/iomgr/iomgr_uv.cc b/src/core/lib/iomgr/iomgr_uv.cc index 4dda970286..2ab414252a 100644 --- a/src/core/lib/iomgr/iomgr_uv.cc +++ b/src/core/lib/iomgr/iomgr_uv.cc @@ -34,7 +34,6 @@ void grpc_iomgr_platform_init(void) { grpc_register_tracer(&grpc_tcp_trace); grpc_executor_set_threading(false); g_init_thread = gpr_thd_currentid(); - grpc_exec_ctx_finish(); } void grpc_iomgr_platform_flush(void) {} void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } diff --git a/src/core/lib/iomgr/load_file.cc b/src/core/lib/iomgr/load_file.cc index 97e448fb32..feef65cc34 100644 --- a/src/core/lib/iomgr/load_file.cc +++ b/src/core/lib/iomgr/load_file.cc @@ -73,6 +73,6 @@ end: GRPC_ERROR_UNREF(error); error = error_out; } - GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; + GRPC_SCHEDULING_END_BLOCKING_REGION; return error; } diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc index 7028876297..a68ad4a6e3 100644 --- a/src/core/lib/iomgr/pollset_uv.cc +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -124,7 +124,7 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { - grpc_millis now = grpc_exec_ctx_now(); + grpc_millis now = ExecCtx::Get()->Now(); if (deadline >= now) { timeout = deadline - now; } else { @@ -143,7 +143,7 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, } } if (!grpc_closure_list_empty(exec_ctx->closure_list)) { - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); } gpr_mu_lock(&grpc_polling_mu); return GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc index dd019b1ec3..5ff3e7cb3a 100644 --- a/src/core/lib/iomgr/pollset_windows.cc +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -129,7 +129,7 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, g_active_poller = &worker; gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(deadline); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&grpc_polling_mu); pollset->is_iocp_worker = 0; g_active_poller = NULL; @@ -160,10 +160,10 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, while (!worker.kicked) { if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { - grpc_exec_ctx_invalidate_now(); + ExecCtx::Get()->InvalidateNow(); break; } - grpc_exec_ctx_invalidate_now(); + ExecCtx::Get()->InvalidateNow(); } } else { pollset->kicked_without_pollers = 0; @@ -171,7 +171,7 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, done: if (!grpc_closure_list_empty(exec_ctx->closure_list)) { gpr_mu_unlock(&grpc_polling_mu); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&grpc_polling_mu); } if (added_worker) { diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 3795c3f75e..80a1a45cc0 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -81,7 +81,7 @@ static grpc_error* blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; + GRPC_SCHEDULING_END_BLOCKING_REGION; if (s != 0) { /* Retry if well-known service name is recognized */ @@ -90,7 +90,7 @@ static grpc_error* blocking_resolve_address_impl( if (strcmp(port, svc[i][0]) == 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, svc[i][1], &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; + GRPC_SCHEDULING_END_BLOCKING_REGION; break; } } diff --git a/src/core/lib/iomgr/resolve_address_uv.cc b/src/core/lib/iomgr/resolve_address_uv.cc index 038926af07..ffd70c4f35 100644 --- a/src/core/lib/iomgr/resolve_address_uv.cc +++ b/src/core/lib/iomgr/resolve_address_uv.cc @@ -131,7 +131,7 @@ static void getaddrinfo_callback(uv_getaddrinfo_t* req, int status, original error probably has more interesting information */ error = handle_addrinfo_result(status, res, r->addresses); GRPC_CLOSURE_SCHED(r->on_done, error); - grpc_exec_ctx_finish(); + gpr_free(r->hints); gpr_free(r->host); gpr_free(r->port); diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 15f3fd9f1a..4e2bc7b5ca 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -87,7 +87,7 @@ static grpc_error* blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; + GRPC_SCHEDULING_END_BLOCKING_REGION; if (s != 0) { error = GRPC_WSA_ERROR(WSAGetLastError(), "getaddrinfo"); goto done; diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index b58ae1cb21..8fee585f4b 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -624,7 +624,6 @@ void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) { void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) { ExecCtx _local_exec_ctx; grpc_resource_quota_unref_internal(resource_quota); - grpc_exec_ctx_finish(); } grpc_resource_quota* grpc_resource_quota_ref_internal( @@ -656,7 +655,6 @@ void grpc_resource_quota_resize(grpc_resource_quota* resource_quota, (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size)); GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE); - grpc_exec_ctx_finish(); } size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) { diff --git a/src/core/lib/iomgr/tcp_client_uv.cc b/src/core/lib/iomgr/tcp_client_uv.cc index 2a127993a2..7454b01445 100644 --- a/src/core/lib/iomgr/tcp_client_uv.cc +++ b/src/core/lib/iomgr/tcp_client_uv.cc @@ -105,11 +105,10 @@ static void uv_tc_on_connect(uv_connect_t* req, int status) { } done = (--connect->refs == 0); if (done) { - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); uv_tcp_connect_cleanup(connect); } GRPC_CLOSURE_SCHED(closure, error); - grpc_exec_ctx_finish(); } static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep, diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 697281d8b1..de3dabd7fc 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -130,7 +130,7 @@ static void run_poller(void* bp, grpc_error* error_ignored) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); - grpc_millis deadline = grpc_exec_ctx_now() + 13 * GPR_MS_PER_SEC; + grpc_millis deadline = ExecCtx::Get()->Now() + 13 * GPR_MS_PER_SEC; GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(); GRPC_LOG_IF_ERROR( "backup_poller:pollset_work", diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc index 8d5387fe6e..9db2cbe58d 100644 --- a/src/core/lib/iomgr/tcp_server_uv.cc +++ b/src/core/lib/iomgr/tcp_server_uv.cc @@ -142,7 +142,6 @@ static void handle_close_callback(uv_handle_t* handle) { if (sp->server->open_ports == 0 && sp->server->shutdown) { finish_shutdown(sp->server); } - grpc_exec_ctx_finish(); } static void close_listener(grpc_tcp_listener* sp) { @@ -177,14 +176,8 @@ void grpc_tcp_server_unref(grpc_tcp_server* s) { /* Complete shutdown_starting work before destroying. */ ExecCtx _local_exec_ctx; GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting); - if (exec_ctx == NULL) { - grpc_exec_ctx_flush(); - tcp_server_destroy(s); - grpc_exec_ctx_finish(); - } else { - grpc_exec_ctx_finish(); - tcp_server_destroy(s); - } + ExecCtx::Get()->Flush(); + tcp_server_destroy(s); } } @@ -255,7 +248,6 @@ static void on_connect(uv_stream_t* server, int status) { } else { sp->has_pending_connection = true; } - grpc_exec_ctx_finish(); } static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle, diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index 571c1d6f9a..3ea9674840 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -115,7 +115,6 @@ static void uv_close_callback(uv_handle_t* handle) { ExecCtx _local_exec_ctx; grpc_tcp* tcp = (grpc_tcp*)handle->data; TCP_UNREF(tcp, "destroy"); - grpc_exec_ctx_finish(); } static grpc_slice alloc_read_slice(grpc_resource_user* resource_user) { @@ -130,7 +129,6 @@ static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size, (void)suggested_size; buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); - grpc_exec_ctx_finish(); } static void read_callback(uv_stream_t* stream, ssize_t nread, @@ -174,7 +172,6 @@ static void read_callback(uv_stream_t* stream, ssize_t nread, error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); } GRPC_CLOSURE_SCHED(cb, error); - grpc_exec_ctx_finish(); } static void uv_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, @@ -224,7 +221,6 @@ static void write_callback(uv_write_t* req, int status) { grpc_resource_user_free(tcp->resource_user, sizeof(uv_buf_t) * tcp->write_slices->count); GRPC_CLOSURE_SCHED(cb, error); - grpc_exec_ctx_finish(); } static void uv_endpoint_write(grpc_endpoint* ep, @@ -384,7 +380,6 @@ grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle, uv_unref((uv_handle_t*)handle); #endif - grpc_exec_ctx_finish(); return &tcp->base; } diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index e7fe12b07b..d5e6066f35 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -249,7 +249,7 @@ void grpc_timer_list_init() { g_shared_mutables.initialized = true; g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; gpr_mu_init(&g_shared_mutables.mu); - g_shared_mutables.min_timer = grpc_exec_ctx_now(); + g_shared_mutables.min_timer = ExecCtx::Get()->Now(); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); grpc_register_tracer(&grpc_timer_trace); @@ -341,7 +341,7 @@ void grpc_timer_init(grpc_timer* timer, grpc_millis deadline, if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, - deadline, grpc_exec_ctx_now(), closure, closure->cb); + deadline, ExecCtx::Get()->Now(), closure, closure->cb); } if (!g_shared_mutables.initialized) { @@ -354,7 +354,7 @@ void grpc_timer_init(grpc_timer* timer, grpc_millis deadline, gpr_mu_lock(&shard->mu); timer->pending = true; - grpc_millis now = grpc_exec_ctx_now(); + grpc_millis now = ExecCtx::Get()->Now(); if (deadline <= now) { timer->pending = false; GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE); @@ -607,7 +607,7 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now, grpc_timer_check_result grpc_timer_check(grpc_millis* next) { // prelude - grpc_millis now = grpc_exec_ctx_now(); + grpc_millis now = ExecCtx::Get()->Now(); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 499fc73748..69adb673d8 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -101,7 +101,6 @@ void grpc_timer_manager_tick() { ExecCtx _local_exec_ctx; grpc_millis next = GRPC_MILLIS_INF_FUTURE; grpc_timer_check(&next); - grpc_exec_ctx_finish(); } static void run_some_timers() { @@ -126,7 +125,7 @@ static void run_some_timers() { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "flush exec_ctx"); } - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_mu_lock(&g_mu); // garbage collect any threads hanging out that are dead gc_completed_threads(); @@ -179,7 +178,7 @@ static bool wait_until(grpc_millis next) { g_timed_waiter_deadline = next; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - grpc_millis wait_time = next - grpc_exec_ctx_now(); + grpc_millis wait_time = next - ExecCtx::Get()->Now(); gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", wait_time); } @@ -224,7 +223,7 @@ static bool wait_until(grpc_millis next) { static void timer_main_loop() { for (;;) { grpc_millis next = GRPC_MILLIS_INF_FUTURE; - grpc_exec_ctx_invalidate_now(); + ExecCtx::Get()->InvalidateNow(); // check timer state, updates next to the next time to run a check switch (grpc_timer_check(&next)) { case GRPC_TIMERS_FIRED: @@ -274,9 +273,9 @@ static void timer_thread_cleanup(completed_thread* ct) { static void timer_thread(void* completed_thread_ptr) { // this threads exec_ctx: we try to run things through to completion here // since it's easy to spin up new threads - ExecCtx _local_exec_ctx(0, grpc_never_ready_to_finish, NULL); + ExecCtx _local_exec_ctx; timer_main_loop(); - grpc_exec_ctx_finish(); + timer_thread_cleanup((completed_thread*)completed_thread_ptr); } diff --git a/src/core/lib/iomgr/timer_uv.cc b/src/core/lib/iomgr/timer_uv.cc index 94601d99af..6edd4169f1 100644 --- a/src/core/lib/iomgr/timer_uv.cc +++ b/src/core/lib/iomgr/timer_uv.cc @@ -51,7 +51,6 @@ void run_expired_timer(uv_timer_t* handle) { timer->pending = 0; GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE); stop_uv_timer(handle); - grpc_exec_ctx_finish(); } void grpc_timer_init(grpc_timer* timer, grpc_millis deadline, @@ -60,13 +59,13 @@ void grpc_timer_init(grpc_timer* timer, grpc_millis deadline, uv_timer_t* uv_timer; GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; - if (deadline <= grpc_exec_ctx_now()) { + if (deadline <= ExecCtx::Get()->Now()) { timer->pending = 0; GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE); return; } timer->pending = 1; - timeout = (uint64_t)(deadline - grpc_exec_ctx_now()); + timeout = (uint64_t)(deadline - ExecCtx::Get()->Now()); uv_timer = (uv_timer_t*)gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); uv_timer->data = timer; diff --git a/src/core/lib/security/context/security_context.cc b/src/core/lib/security/context/security_context.cc index 1d708c997c..9b58b3657f 100644 --- a/src/core/lib/security/context/security_context.cc +++ b/src/core/lib/security/context/security_context.cc @@ -57,7 +57,7 @@ grpc_call_error grpc_call_set_credentials(grpc_call* call, grpc_call_credentials_unref(ctx->creds); ctx->creds = grpc_call_credentials_ref(creds); } - grpc_exec_ctx_finish(); + return GRPC_CALL_OK; } @@ -95,7 +95,6 @@ void grpc_client_security_context_destroy(void* ctx) { c->extension.destroy(c->extension.instance); } gpr_free(ctx); - grpc_exec_ctx_finish(); } /* --- grpc_server_security_context --- */ diff --git a/src/core/lib/security/credentials/credentials.cc b/src/core/lib/security/credentials/credentials.cc index 20c4ae70fb..6a272653f8 100644 --- a/src/core/lib/security/credentials/credentials.cc +++ b/src/core/lib/security/credentials/credentials.cc @@ -74,7 +74,6 @@ void grpc_channel_credentials_release(grpc_channel_credentials* creds) { GRPC_API_TRACE("grpc_channel_credentials_release(creds=%p)", 1, (creds)); ExecCtx _local_exec_ctx; grpc_channel_credentials_unref(creds); - grpc_exec_ctx_finish(); } grpc_call_credentials* grpc_call_credentials_ref(grpc_call_credentials* creds) { @@ -97,7 +96,6 @@ void grpc_call_credentials_release(grpc_call_credentials* creds) { GRPC_API_TRACE("grpc_call_credentials_release(creds=%p)", 1, (creds)); ExecCtx _local_exec_ctx; grpc_call_credentials_unref(creds); - grpc_exec_ctx_finish(); } bool grpc_call_credentials_get_request_metadata( @@ -213,7 +211,6 @@ void grpc_server_credentials_release(grpc_server_credentials* creds) { GRPC_API_TRACE("grpc_server_credentials_release(creds=%p)", 1, (creds)); ExecCtx _local_exec_ctx; grpc_server_credentials_unref(creds); - grpc_exec_ctx_finish(); } grpc_security_status grpc_server_credentials_create_security_connector( diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.cc b/src/core/lib/security/credentials/google_default/google_default_credentials.cc index 897b9d7520..03ec4bc3b3 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.cc +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.cc @@ -114,13 +114,13 @@ static int is_stack_running_on_compute_engine() { grpc_resource_quota_create("google_default_credentials"); grpc_httpcli_get( &context, &detector.pollent, resource_quota, &request, - grpc_exec_ctx_now() + max_detection_delay, + ExecCtx::Get()->Now() + max_detection_delay, GRPC_CLOSURE_CREATE(on_compute_engine_detection_http_response, &detector, grpc_schedule_on_exec_ctx), &detector.response); grpc_resource_quota_unref_internal(resource_quota); - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ @@ -144,7 +144,7 @@ static int is_stack_running_on_compute_engine() { grpc_pollset_shutdown(grpc_polling_entity_pollset(&detector.pollent), &destroy_closure); g_polling_mu = NULL; - grpc_exec_ctx_flush(); + ExecCtx::Get()->Flush(); gpr_free(grpc_polling_entity_pollset(&detector.pollent)); grpc_http_response_destroy(&detector.response); @@ -285,7 +285,7 @@ end: } else { GRPC_ERROR_UNREF(error); } - grpc_exec_ctx_finish(); + return result; } @@ -299,7 +299,6 @@ void grpc_flush_cached_google_default_credentials(void) { } compute_engine_detection_done = 0; gpr_mu_unlock(&g_state_mu); - grpc_exec_ctx_finish(); } /* -- Well known credentials path. -- */ diff --git a/src/core/lib/security/credentials/iam/iam_credentials.cc b/src/core/lib/security/credentials/iam/iam_credentials.cc index 07938ec67e..4d9da0cbe3 100644 --- a/src/core/lib/security/credentials/iam/iam_credentials.cc +++ b/src/core/lib/security/credentials/iam/iam_credentials.cc @@ -77,6 +77,6 @@ grpc_call_credentials* grpc_google_iam_credentials_create( grpc_slice_from_copied_string(authority_selector)); grpc_credentials_mdelem_array_add(&c->md_array, md); GRPC_MDELEM_UNREF(md); - grpc_exec_ctx_finish(); + return &c->base; } diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.cc b/src/core/lib/security/credentials/jwt/jwt_credentials.cc index 1d43ee6e03..ccc3f4aeed 100644 --- a/src/core/lib/security/credentials/jwt/jwt_credentials.cc +++ b/src/core/lib/security/credentials/jwt/jwt_credentials.cc @@ -185,6 +185,6 @@ grpc_call_credentials* grpc_service_account_jwt_access_credentials_create( grpc_call_credentials* creds = grpc_service_account_jwt_access_credentials_create_from_auth_json_key( grpc_auth_json_key_create_from_string(json_key), token_lifetime); - grpc_exec_ctx_finish(); + return creds; } diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.cc b/src/core/lib/security/credentials/jwt/jwt_verifier.cc index eaa2078787..5246e1f985 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.cc +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.cc @@ -358,7 +358,7 @@ static verifier_cb_ctx* verifier_cb_ctx_create( ctx->signed_data = grpc_slice_from_copied_buffer(signed_jwt, signed_jwt_len); ctx->user_data = user_data; ctx->user_cb = cb; - grpc_exec_ctx_finish(); + return ctx; } @@ -702,7 +702,7 @@ static void on_openid_config_retrieved(void* user_data, grpc_error* error) { resource_quota = grpc_resource_quota_create("jwt_verifier"); grpc_httpcli_get( &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, - grpc_exec_ctx_now() + grpc_jwt_verifier_max_delay, + ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), &ctx->responses[HTTP_RESPONSE_KEYS]); grpc_resource_quota_unref_internal(resource_quota); @@ -828,7 +828,7 @@ static void retrieve_key_and_verify(verifier_cb_ctx* ctx) { extreme memory pressure. */ resource_quota = grpc_resource_quota_create("jwt_verifier"); grpc_httpcli_get(&ctx->verifier->http_ctx, &ctx->pollent, resource_quota, - &req, grpc_exec_ctx_now() + grpc_jwt_verifier_max_delay, + &req, ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, http_cb, &ctx->responses[rsp_idx]); grpc_resource_quota_unref_internal(resource_quota); gpr_free(req.host); diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index bae9692938..b653705609 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -216,8 +216,9 @@ static void on_oauth2_token_fetcher_http_response(void* user_data, gpr_mu_lock(&c->mu); c->token_fetch_pending = false; c->access_token_md = GRPC_MDELEM_REF(access_token_md); - c->token_expiration = - status == GRPC_CREDENTIALS_OK ? grpc_exec_ctx_now() + token_lifetime : 0; + c->token_expiration = status == GRPC_CREDENTIALS_OK + ? ExecCtx::Get()->Now() + token_lifetime + : 0; grpc_oauth2_pending_get_request_metadata* pending_request = c->pending_requests; c->pending_requests = NULL; @@ -255,7 +256,7 @@ static bool oauth2_token_fetcher_get_request_metadata( grpc_mdelem cached_access_token_md = GRPC_MDNULL; gpr_mu_lock(&c->mu); if (!GRPC_MDISNULL(c->access_token_md) && - (c->token_expiration - grpc_exec_ctx_now() > refresh_threshold)) { + (c->token_expiration - ExecCtx::Get()->Now() > refresh_threshold)) { cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md); } if (!GRPC_MDISNULL(cached_access_token_md)) { @@ -287,7 +288,7 @@ static bool oauth2_token_fetcher_get_request_metadata( c->fetch_func(grpc_credentials_metadata_request_create(creds), &c->httpcli_context, &c->pollent, on_oauth2_token_fetcher_http_response, - grpc_exec_ctx_now() + refresh_threshold); + ExecCtx::Get()->Now() + refresh_threshold); } return false; } @@ -517,7 +518,7 @@ grpc_call_credentials* grpc_access_token_credentials_create( c->access_token_md = grpc_mdelem_from_slices( grpc_slice_from_static_string(GRPC_AUTHORIZATION_METADATA_KEY), grpc_slice_from_copied_string(token_md_value)); - grpc_exec_ctx_finish(); + gpr_free(token_md_value); return &c->base; } diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.cc b/src/core/lib/security/credentials/plugin/plugin_credentials.cc index 064666a7d0..025d024617 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.cc +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.cc @@ -116,9 +116,8 @@ static void plugin_md_request_metadata_ready(void* request, grpc_status_code status, const char* error_details) { /* called from application code */ - ExecCtx _local_exec_ctx( - GRPC_EXEC_CTX_FLAG_IS_FINISHED | GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP, - NULL, NULL); + ExecCtx _local_exec_ctx(GRPC_EXEC_CTX_FLAG_IS_FINISHED | + GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP); grpc_plugin_credentials_pending_request* r = (grpc_plugin_credentials_pending_request*)request; if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { @@ -141,7 +140,6 @@ static void plugin_md_request_metadata_ready(void* request, r->creds, r); } gpr_free(r); - grpc_exec_ctx_finish(); } static bool plugin_get_request_metadata(grpc_call_credentials* creds, diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index cb20401409..4ed2ec55bd 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -266,7 +266,6 @@ static void on_handshake_next_done_grpc_wrapper( } else { gpr_mu_unlock(&h->mu); } - grpc_exec_ctx_finish(); } static grpc_error* do_handshaker_next_locked( diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 5f21a2933b..86817076f8 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -141,7 +141,6 @@ static void on_md_processing_done( } grpc_metadata_array_destroy(&calld->md); GRPC_CALL_STACK_UNREF(calld->owning_call, "server_auth_metadata"); - grpc_exec_ctx_finish(); } static void cancel_call(void* arg, grpc_error* error) { diff --git a/src/core/lib/slice/slice.cc b/src/core/lib/slice/slice.cc index 3b3b2e4f05..6e1554d471 100644 --- a/src/core/lib/slice/slice.cc +++ b/src/core/lib/slice/slice.cc @@ -69,7 +69,6 @@ grpc_slice grpc_slice_ref(grpc_slice slice) { void grpc_slice_unref(grpc_slice slice) { ExecCtx _local_exec_ctx; grpc_slice_unref_internal(slice); - grpc_exec_ctx_finish(); } /* grpc_slice_from_static_string support structure - a refcount that does diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index 6774269972..4bc54c303f 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -75,7 +75,6 @@ void grpc_slice_buffer_destroy_internal(grpc_slice_buffer* sb) { void grpc_slice_buffer_destroy(grpc_slice_buffer* sb) { ExecCtx _local_exec_ctx; grpc_slice_buffer_destroy_internal(sb); - grpc_exec_ctx_finish(); } uint8_t* grpc_slice_buffer_tiny_add(grpc_slice_buffer* sb, size_t n) { @@ -175,7 +174,6 @@ void grpc_slice_buffer_reset_and_unref_internal(grpc_slice_buffer* sb) { void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer* sb) { ExecCtx _local_exec_ctx; grpc_slice_buffer_reset_and_unref_internal(sb); - grpc_exec_ctx_finish(); } void grpc_slice_buffer_swap(grpc_slice_buffer* a, grpc_slice_buffer* b) { diff --git a/src/core/lib/surface/alarm.cc b/src/core/lib/surface/alarm.cc index 395ffd393c..7aee100f3f 100644 --- a/src/core/lib/surface/alarm.cc +++ b/src/core/lib/surface/alarm.cc @@ -51,7 +51,7 @@ static void alarm_unref(grpc_alarm* alarm) { if (alarm->cq != NULL) { GRPC_CQ_INTERNAL_UNREF(alarm->cq, "alarm"); } - grpc_exec_ctx_finish(); + gpr_free(alarm); } } @@ -126,13 +126,11 @@ void grpc_alarm_set(grpc_alarm* alarm, grpc_completion_queue* cq, GPR_ASSERT(grpc_cq_begin_op(cq, tag)); grpc_timer_init(&alarm->alarm, grpc_timespec_to_millis_round_up(deadline), &alarm->on_alarm); - grpc_exec_ctx_finish(); } void grpc_alarm_cancel(grpc_alarm* alarm, void* reserved) { ExecCtx _local_exec_ctx; grpc_timer_cancel(&alarm->alarm); - grpc_exec_ctx_finish(); } void grpc_alarm_destroy(grpc_alarm* alarm, void* reserved) { diff --git a/src/core/lib/surface/byte_buffer.cc b/src/core/lib/surface/byte_buffer.cc index f3c10797f3..6a9b13bb41 100644 --- a/src/core/lib/surface/byte_buffer.cc +++ b/src/core/lib/surface/byte_buffer.cc @@ -78,7 +78,6 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { break; } gpr_free(bb); - grpc_exec_ctx_finish(); } size_t grpc_byte_buffer_length(grpc_byte_buffer* bb) { diff --git a/src/core/lib/surface/byte_buffer_reader.cc b/src/core/lib/surface/byte_buffer_reader.cc index fb66829baa..9a9e26ecdc 100644 --- a/src/core/lib/surface/byte_buffer_reader.cc +++ b/src/core/lib/surface/byte_buffer_reader.cc @@ -70,7 +70,7 @@ int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, reader->current.index = 0; break; } - grpc_exec_ctx_finish(); + return 1; } @@ -118,6 +118,6 @@ grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader) { grpc_slice_unref_internal(in_slice); GPR_ASSERT(bytes_read <= input_size); } - grpc_exec_ctx_finish(); + return out_slice; } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 5e1c0badd0..bbb7a39e29 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -595,7 +595,7 @@ void grpc_call_unref(grpc_call* c) { grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, NULL); } GRPC_CALL_INTERNAL_UNREF(c, "destroy"); - grpc_exec_ctx_finish(); + GPR_TIMER_END("grpc_call_unref", 0); } @@ -604,7 +604,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { GPR_ASSERT(!reserved); ExecCtx _local_exec_ctx; cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); - grpc_exec_ctx_finish(); + return GRPC_CALL_OK; } @@ -659,7 +659,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c, 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description); - grpc_exec_ctx_finish(); + return GRPC_CALL_OK; } @@ -2048,7 +2048,6 @@ grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops, err = call_start_batch(call, ops, nops, tag, 0); } - grpc_exec_ctx_finish(); return err; } diff --git a/src/core/lib/surface/call_details.cc b/src/core/lib/surface/call_details.cc index 01b19abefb..03ce7f88fb 100644 --- a/src/core/lib/surface/call_details.cc +++ b/src/core/lib/surface/call_details.cc @@ -37,5 +37,4 @@ void grpc_call_details_destroy(grpc_call_details* cd) { ExecCtx _local_exec_ctx; grpc_slice_unref_internal(cd->method); grpc_slice_unref_internal(cd->host); - grpc_exec_ctx_finish(); } diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index dc1fc1632e..7725351f74 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -252,7 +252,6 @@ void grpc_channel_get_info(grpc_channel* channel, grpc_channel_element* elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); elem->filter->get_channel_info(elem, channel_info); - grpc_exec_ctx_finish(); } static grpc_call* grpc_channel_create_call_internal( @@ -305,7 +304,7 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel, grpc_slice_ref_internal(*host)) : GRPC_MDNULL, grpc_timespec_to_millis_round_up(deadline)); - grpc_exec_ctx_finish(); + return call; } @@ -344,7 +343,7 @@ void* grpc_channel_register_call(grpc_channel* channel, const char* method, rc->next = channel->registered_calls; channel->registered_calls = rc; gpr_mu_unlock(&channel->registered_call_mu); - grpc_exec_ctx_finish(); + return rc; } @@ -370,7 +369,7 @@ grpc_call* grpc_channel_create_registered_call( channel, parent_call, propagation_mask, completion_queue, NULL, GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), grpc_timespec_to_millis_round_up(deadline)); - grpc_exec_ctx_finish(); + return call; } @@ -416,8 +415,6 @@ void grpc_channel_destroy(grpc_channel* channel) { elem->filter->start_transport_op(elem, op); GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel"); - - grpc_exec_ctx_finish(); } grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel) { diff --git a/src/core/lib/surface/channel_ping.cc b/src/core/lib/surface/channel_ping.cc index 0966a8d967..06cdbf6c73 100644 --- a/src/core/lib/surface/channel_ping.cc +++ b/src/core/lib/surface/channel_ping.cc @@ -60,5 +60,4 @@ void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq, op->bind_pollset = grpc_cq_pollset(cq); GPR_ASSERT(grpc_cq_begin_op(cq, tag)); top_elem->filter->start_transport_op(top_elem, op); - grpc_exec_ctx_finish(); } diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index b69d40534d..0b0a8d070d 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -124,7 +124,7 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset, while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; - grpc_exec_ctx_invalidate_now(); + ExecCtx::Get()->InvalidateNow(); if (&w == npp->root) { npp->root = w.next; if (&w == npp->root) { @@ -371,7 +371,6 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); } - grpc_exec_ctx_finish(); } gpr_tls_set(&g_cached_event, (intptr_t)0); gpr_tls_set(&g_cached_cq, (intptr_t)0); @@ -412,8 +411,6 @@ static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(); } - grpc_exec_ctx_finish(); - if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); } @@ -445,7 +442,6 @@ grpc_completion_queue* grpc_completion_queue_create_internal( ExecCtx _local_exec_ctx; GRPC_STATS_INC_CQS_CREATED(); - grpc_exec_ctx_finish(); cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + @@ -639,9 +635,9 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, error != GRPC_ERROR_NONE)) { const char* errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(=%p, cq=%p, tag=%p, error=%s, " + "cq_end_op_for_next(cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); + 6, (cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -726,9 +722,9 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, error != GRPC_ERROR_NONE)) { const char* errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(=%p, cq=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); + 6, (cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -794,31 +790,40 @@ typedef struct { bool first_loop; } cq_is_finished_arg; -static bool cq_is_next_finished(void* arg) { - cq_is_finished_arg* a = (cq_is_finished_arg*)arg; - grpc_completion_queue* cq = a->cq; - cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); - GPR_ASSERT(a->stolen_completion == NULL); +class ExecCtxNext : public ExecCtx { + public: + ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} - gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); + bool CheckReadyToFinish() override { + cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_; + grpc_completion_queue* cq = a->cq; + cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); + GPR_ASSERT(a->stolen_completion == NULL); - if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - a->last_seen_things_queued_ever = + gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); - /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but - * that - * is ok and doesn't affect correctness. Might effect the tail latencies a - * bit) */ - a->stolen_completion = cq_event_queue_pop(&cqd->queue); - if (a->stolen_completion != NULL) { - return true; + if (current_last_seen_things_queued_ever != + a->last_seen_things_queued_ever) { + a->last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + + /* Pop a cq_completion from the queue. Returns NULL if the queue is empty + * might return NULL in some cases even if the queue is not empty; but + * that + * is ok and doesn't affect correctness. Might effect the tail latencies a + * bit) */ + a->stolen_completion = cq_event_queue_pop(&cqd->queue); + if (a->stolen_completion != NULL) { + return true; + } } + return !a->first_loop && a->deadline < ExecCtx::Get()->Now(); } - return !a->first_loop && a->deadline < grpc_exec_ctx_now(); -} + + private: + void* check_ready_to_finish_arg_; +}; #ifndef NDEBUG static void dump_pending_tags(grpc_completion_queue* cq) { @@ -873,7 +878,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, NULL, NULL, true}; - ExecCtx _local_exec_ctx(0, cq_is_next_finished, &is_finished_arg); + ExecCtxNext _local_exec_ctx(&is_finished_arg); for (;;) { grpc_millis iteration_deadline = deadline_millis; @@ -923,7 +928,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, break; } - if (!is_finished_arg.first_loop && grpc_exec_ctx_now() >= deadline_millis) { + if (!is_finished_arg.first_loop && + ExecCtx::Get()->Now() >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -959,7 +965,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); GRPC_CQ_INTERNAL_UNREF(cq, "next"); - grpc_exec_ctx_finish(); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); GPR_TIMER_END("grpc_completion_queue_next", 0); @@ -1039,37 +1045,46 @@ static void del_plucker(grpc_completion_queue* cq, void* tag, GPR_UNREACHABLE_CODE(return ); } -static bool cq_is_pluck_finished(void* arg) { - cq_is_finished_arg* a = (cq_is_finished_arg*)arg; - grpc_completion_queue* cq = a->cq; - cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); +class ExecCtxPluck : public ExecCtx { + public: + ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} - GPR_ASSERT(a->stolen_completion == NULL); - gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cqd->things_queued_ever); - if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); - a->last_seen_things_queued_ever = + bool CheckReadyToFinish() override { + cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_; + grpc_completion_queue* cq = a->cq; + cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq); + + GPR_ASSERT(a->stolen_completion == NULL); + gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); - grpc_cq_completion* c; - grpc_cq_completion* prev = &cqd->completed_head; - while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) != - &cqd->completed_head) { - if (c->tag == a->tag) { - prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cqd->completed_tail) { - cqd->completed_tail = prev; + if (current_last_seen_things_queued_ever != + a->last_seen_things_queued_ever) { + gpr_mu_lock(cq->mu); + a->last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + grpc_cq_completion* c; + grpc_cq_completion* prev = &cqd->completed_head; + while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) != + &cqd->completed_head) { + if (c->tag == a->tag) { + prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; + } + gpr_mu_unlock(cq->mu); + a->stolen_completion = c; + return true; } - gpr_mu_unlock(cq->mu); - a->stolen_completion = c; - return true; + prev = c; } - prev = c; + gpr_mu_unlock(cq->mu); } - gpr_mu_unlock(cq->mu); + return !a->first_loop && a->deadline < ExecCtx::Get()->Now(); } - return !a->first_loop && a->deadline < grpc_exec_ctx_now(); -} + + private: + void* check_ready_to_finish_arg_; +}; static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved) { @@ -1106,7 +1121,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, NULL, tag, true}; - ExecCtx _local_exec_ctx(0, cq_is_pluck_finished, &is_finished_arg); + ExecCtxPluck _local_exec_ctx(&is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { gpr_mu_unlock(cq->mu); @@ -1153,7 +1168,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, dump_pending_tags(cq); break; } - if (!is_finished_arg.first_loop && grpc_exec_ctx_now() >= deadline_millis) { + if (!is_finished_arg.first_loop && + ExecCtx::Get()->Now() >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1182,7 +1198,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); GRPC_CQ_INTERNAL_UNREF(cq, "pluck"); - grpc_exec_ctx_finish(); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); GPR_TIMER_END("grpc_completion_queue_pluck", 0); @@ -1238,7 +1254,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); cq->vtable->shutdown(cq); - grpc_exec_ctx_finish(); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -1249,7 +1265,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue* cq) { ExecCtx _local_exec_ctx; GRPC_CQ_INTERNAL_UNREF(cq, "destroy"); - grpc_exec_ctx_finish(); + GPR_TIMER_END("grpc_completion_queue_destroy", 0); } diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 66c8c3b6da..20e17a7f60 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -168,14 +168,14 @@ void grpc_init(void) { grpc_iomgr_start(); } gpr_mu_unlock(&g_init_mu); - grpc_exec_ctx_finish(); + GRPC_API_TRACE("grpc_init(void)", 0, ()); } void grpc_shutdown(void) { int i; GRPC_API_TRACE("grpc_shutdown(void)", 0, ()); - ExecCtx _local_exec_ctx(0, grpc_never_ready_to_finish, NULL); + ExecCtx _local_exec_ctx; gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { grpc_executor_shutdown(); @@ -194,7 +194,6 @@ void grpc_shutdown(void) { grpc_stats_shutdown(); } gpr_mu_unlock(&g_init_mu); - grpc_exec_ctx_finish(); } int grpc_is_initialized(void) { diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 5cd8c1fd89..da081e68cb 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -169,6 +169,6 @@ grpc_channel* grpc_lame_client_channel_create(const char* target, auto chand = reinterpret_cast<grpc_core::ChannelData*>(elem->channel_data); chand->error_code = error_code; chand->error_message = error_message; - grpc_exec_ctx_finish(); + return channel; } diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 6e3ce005a2..0d4435d556 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1047,8 +1047,6 @@ void grpc_server_start(grpc_server* server) { GRPC_CLOSURE_CREATE(start_listeners, server, grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), GRPC_ERROR_NONE); - - grpc_exec_ctx_finish(); } void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, @@ -1188,7 +1186,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); - goto done; + return; } server->shutdown_tags = (shutdown_tag*)gpr_realloc( server->shutdown_tags, @@ -1198,7 +1196,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, sdt->cq = cq; if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_unlock(&server->mu_global); - goto done; + return; } server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); @@ -1225,9 +1223,6 @@ void grpc_server_shutdown_and_notify(grpc_server* server, channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */, GRPC_ERROR_NONE); - -done: - grpc_exec_ctx_finish(); } void grpc_server_cancel_all_calls(grpc_server* server) { @@ -1243,7 +1238,6 @@ void grpc_server_cancel_all_calls(grpc_server* server) { channel_broadcaster_shutdown( &broadcaster, false /* send_goaway */, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls")); - grpc_exec_ctx_finish(); } void grpc_server_destroy(grpc_server* server) { @@ -1265,7 +1259,6 @@ void grpc_server_destroy(grpc_server* server) { gpr_mu_unlock(&server->mu_global); server_unref(server); - grpc_exec_ctx_finish(); } void grpc_server_add_listener(grpc_server* server, void* arg, @@ -1368,7 +1361,7 @@ grpc_call_error grpc_server_request_call( rc->initial_metadata = initial_metadata; error = queue_call_request(server, cq_idx, rc); done: - grpc_exec_ctx_finish(); + return error; } @@ -1425,7 +1418,7 @@ grpc_call_error grpc_server_request_registered_call( rc->data.registered.optional_payload = optional_payload; error = queue_call_request(server, cq_idx, rc); done: - grpc_exec_ctx_finish(); + return error; } diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 47d65870d1..4e279b4d94 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -79,7 +79,7 @@ grpc_millis BdpEstimator::CompletePing() { } ping_state_ = PingState::UNSCHEDULED; accumulator_ = 0; - return grpc_exec_ctx_now() + inter_ping_delay_; + return ExecCtx::Get()->Now() + inter_ping_delay_; } } // namespace grpc_core diff --git a/src/core/lib/transport/status_conversion.cc b/src/core/lib/transport/status_conversion.cc index fd7764f2db..61470b8c78 100644 --- a/src/core/lib/transport/status_conversion.cc +++ b/src/core/lib/transport/status_conversion.cc @@ -46,8 +46,8 @@ grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error, case GRPC_HTTP2_CANCEL: /* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been * exceeded */ - return grpc_exec_ctx_now() > deadline ? GRPC_STATUS_DEADLINE_EXCEEDED - : GRPC_STATUS_CANCELLED; + return ExecCtx::Get()->Now() > deadline ? GRPC_STATUS_DEADLINE_EXCEEDED + : GRPC_STATUS_CANCELLED; case GRPC_HTTP2_ENHANCE_YOUR_CALM: return GRPC_STATUS_RESOURCE_EXHAUSTED; case GRPC_HTTP2_INADEQUATE_SECURITY: diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 6f31bd07f9..ca80a7404d 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -62,7 +62,7 @@ void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) { void grpc_stream_unref(grpc_stream_refcount* refcount) { #endif if (gpr_unref(&refcount->refs)) { - if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { + if (ExecCtx::Get()->flags() & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { /* Ick. The thread we're running on MAY be owned (indirectly) by a call-stack. If that's the case, destroying the call-stack MAY try to destroy the |