diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-11-14 18:11:22 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2017-11-14 18:11:22 -0800 |
commit | 6c26b16fe06b1cc75b4dac372f4f51f6b7d1bfc0 (patch) | |
tree | 5d9b7bbdcf125bfc103b67778349ea075b049bb4 /src | |
parent | 75122c23578e24417dcf64081c737571a9fc2dbc (diff) |
Move ExecCtx to grpc_core namespace. Make exec_ctx a private static in ExecCtx and some minor changes
Diffstat (limited to 'src')
80 files changed, 373 insertions, 323 deletions
diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc index dbdcd53ef5..ea9fb2cae1 100644 --- a/src/core/ext/filters/client_channel/backup_poller.cc +++ b/src/core/ext/filters/client_channel/backup_poller.cc @@ -112,10 +112,12 @@ static void run_poller(void* arg, grpc_error* error) { backup_poller_shutdown_unref(p); return; } - grpc_error* err = grpc_pollset_work(p->pollset, NULL, ExecCtx::Get()->Now()); + grpc_error* err = + grpc_pollset_work(p->pollset, NULL, grpc_core::ExecCtx::Get()->Now()); gpr_mu_unlock(p->pollset_mu); GRPC_LOG_IF_ERROR("Run client channel backup poller", err); - grpc_timer_init(&p->polling_timer, ExecCtx::Get()->Now() + g_poll_interval_ms, + grpc_timer_init(&p->polling_timer, + grpc_core::ExecCtx::Get()->Now() + g_poll_interval_ms, &p->run_poller_closure); } @@ -137,7 +139,7 @@ void grpc_client_channel_start_backup_polling( GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller, grpc_schedule_on_exec_ctx); grpc_timer_init(&g_poller->polling_timer, - ExecCtx::Get()->Now() + g_poll_interval_ms, + grpc_core::ExecCtx::Get()->Now() + g_poll_interval_ms, &g_poller->run_poller_closure); } diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index 0ceedb9f86..c949e52bd4 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -33,7 +33,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( /* forward through to the underlying client channel */ grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_connectivity_state state; GRPC_API_TRACE( "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2, @@ -198,7 +198,7 @@ void grpc_channel_watch_connectivity_state( gpr_timespec deadline, grpc_completion_queue* cq, void* tag) { grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; state_watcher* w = (state_watcher*)gpr_malloc(sizeof(*w)); GRPC_API_TRACE( diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 63cf417c4e..e757777aec 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1122,7 +1122,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) { if (glb_policy->lb_fallback_timeout_ms > 0 && glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) { grpc_millis deadline = - ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; + grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, glb_policy, @@ -1271,7 +1271,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); - grpc_millis timeout = next_try - ExecCtx::Get()->Now(); + grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); if (timeout > 0) { gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", @@ -1297,7 +1297,8 @@ static void send_client_load_report_locked(void* arg, grpc_error* error); static void schedule_next_client_load_report(glb_lb_policy* glb_policy) { const grpc_millis next_client_load_report_time = - ExecCtx::Get()->Now() + glb_policy->client_stats_report_interval; + grpc_core::ExecCtx::Get()->Now() + + glb_policy->client_stats_report_interval; GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -1392,7 +1393,7 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) { grpc_millis deadline = glb_policy->lb_call_timeout_ms == 0 ? GRPC_MILLIS_INF_FUTURE - : ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms; + : grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms; glb_policy->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index f0543964ae..5ac5d37a54 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -265,7 +265,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); grpc_millis next_try = grpc_backoff_step(&r->backoff_state).next_attempt_start_time; - grpc_millis timeout = next_try - ExecCtx::Get()->Now(); + grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); GPR_ASSERT(!r->have_retry_timer); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 925223d189..06af5bf0b2 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -213,7 +213,7 @@ static void on_hostbyname_done_cb(void* arg, int status, int timeouts, static void on_srv_query_done_cb(void* arg, int status, int timeouts, unsigned char* abuf, int alen) { grpc_ares_request* r = (grpc_ares_request*)arg; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; gpr_log(GPR_DEBUG, "on_query_srv_done_cb"); if (status == ARES_SUCCESS) { gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS"); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 10404ec4ef..82cd28fb2b 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -163,7 +163,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { } else { grpc_millis next_try = grpc_backoff_step(&r->backoff_state).next_attempt_start_time; - grpc_millis timeout = next_try - ExecCtx::Get()->Now(); + grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); GPR_ASSERT(!r->have_retry_timer); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 98f96b5750..61adb984f3 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -458,7 +458,8 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; const grpc_millis time_til_next = - c->backoff_result.next_attempt_start_time - ExecCtx::Get()->Now(); + c->backoff_result.next_attempt_start_time - + grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { gpr_log(GPR_INFO, "Retry immediately"); } else { diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index fbab57769c..b7b7472aff 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -133,7 +133,7 @@ void grpc_subchannel_index_shutdown(void) { void grpc_subchannel_index_unref(void) { if (gpr_unref(&g_refcount)) { gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_subchannel_index, ExecCtx::Get()); + gpr_avl_unref(g_subchannel_index, grpc_core::ExecCtx::Get()); } } @@ -143,12 +143,13 @@ grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) { // Lock, and take a reference to the subchannel index. // We don't need to do the search under a lock as avl's are immutable. gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index, ExecCtx::Get()); + gpr_avl index = gpr_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); gpr_mu_unlock(&g_mu); grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( - (grpc_subchannel*)gpr_avl_get(index, key, ExecCtx::Get()), "index_find"); - gpr_avl_unref(index, ExecCtx::Get()); + (grpc_subchannel*)gpr_avl_get(index, key, grpc_core::ExecCtx::Get()), + "index_find"); + gpr_avl_unref(index, grpc_core::ExecCtx::Get()); return c; } @@ -164,11 +165,11 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index, ExecCtx::Get()); + gpr_avl index = gpr_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); gpr_mu_unlock(&g_mu); // - Check to see if a subchannel already exists - c = (grpc_subchannel*)gpr_avl_get(index, key, ExecCtx::Get()); + c = (grpc_subchannel*)gpr_avl_get(index, key, grpc_core::ExecCtx::Get()); if (c != NULL) { c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); } @@ -177,10 +178,11 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, need_to_unref_constructed = true; } else { // no -> update the avl and compare/swap - gpr_avl updated = gpr_avl_add( - gpr_avl_ref(index, ExecCtx::Get()), subchannel_key_copy(key), - GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), - ExecCtx::Get()); + gpr_avl updated = + gpr_avl_add(gpr_avl_ref(index, grpc_core::ExecCtx::Get()), + subchannel_key_copy(key), + GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), + grpc_core::ExecCtx::Get()); // it may happen (but it's expected to be unlikely) // that some other thread has changed the index: @@ -192,9 +194,9 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, } gpr_mu_unlock(&g_mu); - gpr_avl_unref(updated, ExecCtx::Get()); + gpr_avl_unref(updated, grpc_core::ExecCtx::Get()); } - gpr_avl_unref(index, ExecCtx::Get()); + gpr_avl_unref(index, grpc_core::ExecCtx::Get()); } if (need_to_unref_constructed) { @@ -211,22 +213,23 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index, ExecCtx::Get()); + gpr_avl index = gpr_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); gpr_mu_unlock(&g_mu); // Check to see if this key still refers to the previously // registered subchannel grpc_subchannel* c = - (grpc_subchannel*)gpr_avl_get(index, key, ExecCtx::Get()); + (grpc_subchannel*)gpr_avl_get(index, key, grpc_core::ExecCtx::Get()); if (c != constructed) { - gpr_avl_unref(index, ExecCtx::Get()); + gpr_avl_unref(index, grpc_core::ExecCtx::Get()); break; } // compare and swap the update (some other thread may have // mutated the index behind us) gpr_avl updated = - gpr_avl_remove(gpr_avl_ref(index, ExecCtx::Get()), key, ExecCtx::Get()); + gpr_avl_remove(gpr_avl_ref(index, grpc_core::ExecCtx::Get()), key, + grpc_core::ExecCtx::Get()); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { @@ -235,8 +238,8 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, } gpr_mu_unlock(&g_mu); - gpr_avl_unref(updated, ExecCtx::Get()); - gpr_avl_unref(index, ExecCtx::Get()); + gpr_avl_unref(updated, grpc_core::ExecCtx::Get()); + gpr_avl_unref(index, grpc_core::ExecCtx::Get()); } } diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 015a3ce124..f89e8c730d 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -99,9 +99,10 @@ static void increase_call_count(channel_data* chand) { static void decrease_call_count(channel_data* chand) { if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) { GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_idle_timer"); - grpc_timer_init(&chand->max_idle_timer, - ExecCtx::Get()->Now() + chand->max_connection_idle, - &chand->close_max_idle_channel); + grpc_timer_init( + &chand->max_idle_timer, + grpc_core::ExecCtx::Get()->Now() + chand->max_connection_idle, + &chand->close_max_idle_channel); } } @@ -121,7 +122,7 @@ static void start_max_age_timer_after_init(void* arg, grpc_error* error) { chand->max_age_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); grpc_timer_init(&chand->max_age_timer, - ExecCtx::Get()->Now() + chand->max_connection_age, + grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age, &chand->close_max_age_channel); gpr_mu_unlock(&chand->max_age_timer_mu); grpc_transport_op* op = grpc_make_transport_op(NULL); @@ -138,11 +139,12 @@ static void start_max_age_grace_timer_after_goaway_op(void* arg, gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); - grpc_timer_init(&chand->max_age_grace_timer, - chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE - ? GRPC_MILLIS_INF_FUTURE - : ExecCtx::Get()->Now() + chand->max_connection_age_grace, - &chand->force_close_max_age_channel); + grpc_timer_init( + &chand->max_age_grace_timer, + chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE + ? GRPC_MILLIS_INF_FUTURE + : grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age_grace, + &chand->force_close_max_age_channel); gpr_mu_unlock(&chand->max_age_timer_mu); GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age start_max_age_grace_timer_after_goaway_op"); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc index 3afca884ca..fec61281c3 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc @@ -80,7 +80,7 @@ static grpc_client_channel_factory client_channel_factory = { grpc_channel* grpc_insecure_channel_create(const char* target, const grpc_channel_args* args, void* reserved) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE( "grpc_insecure_channel_create(target=%s, args=%p, reserved=%p)", 3, (target, args, reserved)); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index b0eff1c992..c713ce1960 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -37,7 +37,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( const char* target, int fd, const grpc_channel_args* args) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_insecure_channel_create(target=%p, fd=%d, args=%p)", 3, (target, fd, args)); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index bebc38c248..a328126263 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -190,7 +190,7 @@ grpc_channel* grpc_secure_channel_create(grpc_channel_credentials* creds, const char* target, const grpc_channel_args* args, void* reserved) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE( "grpc_secure_channel_create(creds=%p, target=%s, args=%p, " "reserved=%p)", diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index bbcfb1b195..97b6ad38de 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -132,7 +132,8 @@ static void on_accept(void* arg, grpc_endpoint* tcp, connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. - const grpc_millis deadline = ExecCtx::Get()->Now() + 120 * GPR_MS_PER_SEC; + const grpc_millis deadline = + grpc_core::ExecCtx::Get()->Now() + 120 * GPR_MS_PER_SEC; grpc_handshake_manager_do_handshake(connection_state->handshake_mgr, tcp, state->args, deadline, acceptor, on_handshake_done, connection_state); @@ -161,10 +162,10 @@ static void tcp_server_shutdown_complete(void* arg, grpc_error* error) { gpr_mu_unlock(&state->mu); // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); if (destroy_done != NULL) { destroy_done->cb(destroy_done->cb_arg, GRPC_ERROR_REF(error)); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); } grpc_channel_args_destroy(state->args); gpr_mu_destroy(&state->mu); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc index 6cbb26a349..826886c961 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc @@ -26,7 +26,7 @@ #include "src/core/lib/surface/server.h" int grpc_server_add_insecure_http2_port(grpc_server* server, const char* addr) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; int port_num = 0; GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2, (server, addr)); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index e5419e5e6e..11b3d710df 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -38,7 +38,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, void* reserved, int fd) { GPR_ASSERT(reserved == NULL); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; char* name; gpr_asprintf(&name, "fd:%d", fd); diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc index aeae8f42e3..827f0020bb 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.cc @@ -36,7 +36,7 @@ int grpc_server_add_secure_http2_port(grpc_server* server, const char* addr, grpc_server_credentials* creds) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_error* err = GRPC_ERROR_NONE; grpc_server_security_connector* sc = NULL; int port_num = 0; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e49e26fc35..a2861dfd6f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -530,7 +530,7 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, - ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no @@ -2585,14 +2585,14 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, - ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } } else if (error == GRPC_ERROR_CANCELLED) { /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, - ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); @@ -2602,7 +2602,7 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, - ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->keepalive_watchdog_fired_locked); } @@ -2614,7 +2614,7 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_timer_cancel(&t->keepalive_watchdog_timer); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, - ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } } diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index e54d59b5fa..14f089859e 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -160,7 +160,7 @@ TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t, .set_min_control_value(-1) .set_max_control_value(25) .set_integral_range(10)), - last_pid_update_(ExecCtx::Get()->Now()) {} + last_pid_update_(grpc_core::ExecCtx::Get()->Now()) {} uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) { FlowControlTrace trace("t updt sent", this, nullptr); @@ -306,7 +306,7 @@ double TransportFlowControl::TargetLogBdp() { } double TransportFlowControl::SmoothLogBdp(double value) { - grpc_millis now = ExecCtx::Get()->Now(); + grpc_millis now = grpc_core::ExecCtx::Get()->Now(); double bdp_error = value - pid_controller_.last_control_value(); const double dt = (double)(now - last_pid_update_) * 1e-3; last_pid_update_ = now; diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.cc b/src/core/ext/transport/chttp2/transport/frame_ping.cc index 60172be9cb..298a56721a 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.cc +++ b/src/core/ext/transport/chttp2/transport/frame_ping.cc @@ -89,7 +89,7 @@ grpc_error* grpc_chttp2_ping_parser_parse(void* parser, grpc_chttp2_ack_ping(t, p->opaque_8bytes); } else { if (!t->is_client) { - grpc_millis now = ExecCtx::Get()->Now(); + grpc_millis now = grpc_core::ExecCtx::Get()->Now(); grpc_millis next_allowed_ping = t->ping_recv_state.last_ping_recv_time + t->ping_policy.min_recv_ping_interval_without_data; diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index efb6e54ce7..2f9849c09a 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -570,7 +570,8 @@ static void deadline_enc(grpc_chttp2_hpack_compressor* c, grpc_millis deadline, framer_state* st) { char timeout_str[GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; grpc_mdelem mdelem; - grpc_http2_encode_timeout(deadline - ExecCtx::Get()->Now(), timeout_str); + grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(), + timeout_str); mdelem = grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TIMEOUT, grpc_slice_from_copied_string(timeout_str)); hpack_enc(c, mdelem, st); diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index f7f83c9aee..f73b498d40 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -436,7 +436,7 @@ static void on_initial_header(void* tp, grpc_mdelem md) { } if (timeout != GRPC_MILLIS_INF_FUTURE) { grpc_chttp2_incoming_metadata_buffer_set_deadline( - &s->metadata_buffer[0], ExecCtx::Get()->Now() + timeout); + &s->metadata_buffer[0], grpc_core::ExecCtx::Get()->Now() + timeout); } GRPC_MDELEM_UNREF(md); } else { diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 4f76c2eb23..ddcac45d83 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -68,7 +68,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { } return; } - grpc_millis now = ExecCtx::Get()->Now(); + grpc_millis now = grpc_core::ExecCtx::Get()->Now(); grpc_millis next_allowed_ping = t->ping_state.last_ping_sent_time + t->ping_policy.min_sent_ping_interval_without_data; diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc index f634627f03..8e1dcc542e 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc @@ -49,6 +49,6 @@ GRPCAPI grpc_channel* grpc_cronet_secure_channel_create( grpc_transport* ct = grpc_create_cronet_transport(engine, target, args, reserved); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; return grpc_channel_create(target, args, GRPC_CLIENT_DIRECT_CHANNEL, ct); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 3411acc563..971071e387 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -398,7 +398,7 @@ static void execute_from_storage(stream_obj* s) { */ static void on_failed(bidirectional_stream* stream, int net_error) { CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); @@ -424,7 +424,7 @@ static void on_failed(bidirectional_stream* stream, int net_error) { */ static void on_canceled(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); @@ -450,7 +450,7 @@ static void on_canceled(bidirectional_stream* stream) { */ static void on_succeeded(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; stream_obj* s = (stream_obj*)stream->annotation; gpr_mu_lock(&s->mu); @@ -468,7 +468,7 @@ static void on_succeeded(bidirectional_stream* stream) { */ static void on_stream_ready(bidirectional_stream* stream) { CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; stream_obj* s = (stream_obj*)stream->annotation; grpc_cronet_transport* t = (grpc_cronet_transport*)s->curr_ct; gpr_mu_lock(&s->mu); @@ -498,7 +498,7 @@ static void on_response_headers_received( bidirectional_stream* stream, const bidirectional_stream_header_array* headers, const char* negotiated_protocol) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj* s = (stream_obj*)stream->annotation; @@ -550,7 +550,7 @@ static void on_response_headers_received( Cronet callback */ static void on_write_completed(bidirectional_stream* stream, const char* data) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; stream_obj* s = (stream_obj*)stream->annotation; CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); gpr_mu_lock(&s->mu); @@ -568,7 +568,7 @@ static void on_write_completed(bidirectional_stream* stream, const char* data) { */ static void on_read_completed(bidirectional_stream* stream, char* data, int count) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; stream_obj* s = (stream_obj*)stream->annotation; CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); @@ -610,7 +610,7 @@ static void on_read_completed(bidirectional_stream* stream, char* data, static void on_response_trailers_received( bidirectional_stream* stream, const bidirectional_stream_header_array* trailers) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj* s = (stream_obj*)stream->annotation; diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index a79b2b26b0..e303f5ac0d 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -1091,7 +1091,7 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return NULL; } static void do_nothing(void* arg, grpc_error* error) {} void grpc_inproc_transport_init(void) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL, grpc_schedule_on_exec_ctx); g_empty_slice = grpc_slice_from_static_buffer(NULL, 0); @@ -1155,7 +1155,7 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server, GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2, (server, args)); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; const grpc_channel_args* server_args = grpc_server_get_channel_args(server); @@ -1186,7 +1186,7 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server, } void grpc_inproc_transport_shutdown(void) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_slice_unref_internal(g_empty_slice); grpc_slice_unref_internal(g_fake_path_key); grpc_slice_unref_internal(g_fake_path_value); diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index b75ce79d46..da3b9b1b2d 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -36,7 +36,7 @@ grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff) { backoff->current_backoff = backoff->initial_backoff; const grpc_millis initial_timeout = GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout); - const grpc_millis now = ExecCtx::Get()->Now(); + const grpc_millis now = grpc_core::ExecCtx::Get()->Now(); const grpc_backoff_result result = {now + initial_timeout, now + backoff->current_backoff}; return result; @@ -67,7 +67,7 @@ grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff) { backoff->min_connect_timeout); const grpc_millis next_timeout = GPR_MIN( (grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff); - const grpc_millis now = ExecCtx::Get()->Now(); + const grpc_millis now = grpc_core::ExecCtx::Get()->Now(); const grpc_backoff_result result = {now + current_timeout, now + next_timeout}; return result; diff --git a/src/core/lib/compression/stream_compression_gzip.cc b/src/core/lib/compression/stream_compression_gzip.cc index 3fae3490ce..4aaef4e128 100644 --- a/src/core/lib/compression/stream_compression_gzip.cc +++ b/src/core/lib/compression/stream_compression_gzip.cc @@ -40,7 +40,7 @@ static bool gzip_flate(grpc_stream_compression_context_gzip* ctx, /* Full flush is not allowed when inflating. */ GPR_ASSERT(!(ctx->flate == inflate && (flush == Z_FINISH))); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; int r; bool eoc = false; size_t original_max_output_size = max_output_size; diff --git a/src/core/lib/debug/stats.h b/src/core/lib/debug/stats.h index 24b0084130..6b36f67172 100644 --- a/src/core/lib/debug/stats.h +++ b/src/core/lib/debug/stats.h @@ -35,7 +35,7 @@ typedef struct grpc_stats_data { extern grpc_stats_data* grpc_stats_per_cpu_storage; #define GRPC_THREAD_STATS_DATA() \ - (&grpc_stats_per_cpu_storage[ExecCtx::Get()->starting_cpu()]) + (&grpc_stats_per_cpu_storage[grpc_core::ExecCtx::Get()->starting_cpu()]) #define GRPC_STATS_INC_COUNTER(ctr) \ (gpr_atm_no_barrier_fetch_add(&GRPC_THREAD_STATS_DATA()->counters[(ctr)], 1)) diff --git a/src/core/lib/iomgr/block_annotate.h b/src/core/lib/iomgr/block_annotate.h index 7783da0c14..55bde3eaac 100644 --- a/src/core/lib/iomgr/block_annotate.h +++ b/src/core/lib/iomgr/block_annotate.h @@ -41,18 +41,18 @@ void gpr_thd_end_blocking_region(); do { \ gpr_thd_start_blocking_region(); \ } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION \ - do { \ - gpr_thd_end_blocking_region(); \ - ExecCtx::Get()->InvalidateNow(); \ +#define GRPC_SCHEDULING_END_BLOCKING_REGION \ + do { \ + gpr_thd_end_blocking_region(); \ + grpc_core::ExecCtx::Get()->InvalidateNow(); \ } while (0) #else #define GRPC_SCHEDULING_START_BLOCKING_REGION \ do { \ } while (0) -#define GRPC_SCHEDULING_END_BLOCKING_REGION \ - do { \ - ExecCtx::Get()->InvalidateNow(); \ +#define GRPC_SCHEDULING_END_BLOCKING_REGION \ + do { \ + grpc_core::ExecCtx::Get()->InvalidateNow(); \ } while (0) #endif diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index c9f5448630..6cc4eef1d8 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -129,23 +129,23 @@ grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { static void push_last_on_exec_ctx(grpc_combiner* lock) { lock->next_combiner_on_this_exec_ctx = nullptr; - if (ExecCtx::Get()->combiner_data()->active_combiner == nullptr) { - ExecCtx::Get()->combiner_data()->active_combiner = - ExecCtx::Get()->combiner_data()->last_combiner = lock; + if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) { + grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = + grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock; } else { - ExecCtx::Get() + grpc_core::ExecCtx::Get() ->combiner_data() ->last_combiner->next_combiner_on_this_exec_ctx = lock; - ExecCtx::Get()->combiner_data()->last_combiner = lock; + grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock; } } static void push_first_on_exec_ctx(grpc_combiner* lock) { lock->next_combiner_on_this_exec_ctx = - ExecCtx::Get()->combiner_data()->active_combiner; - ExecCtx::Get()->combiner_data()->active_combiner = lock; + grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; + grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock; if (lock->next_combiner_on_this_exec_ctx == NULL) { - ExecCtx::Get()->combiner_data()->last_combiner = lock; + grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock; } } @@ -165,7 +165,7 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) { GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(); GPR_TIMER_MARK("combiner.initiated", 0); gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, - (gpr_atm)ExecCtx::Get()); + (gpr_atm)grpc_core::ExecCtx::Get()); // first element on this list: add it to the list of combiner locks // executing within this exec_ctx push_last_on_exec_ctx(lock); @@ -174,7 +174,7 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) { // offload for one or two actions, and that's fine gpr_atm initiator = gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null); - if (initiator != 0 && initiator != (gpr_atm)ExecCtx::Get()) { + if (initiator != 0 && initiator != (gpr_atm)grpc_core::ExecCtx::Get()) { gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0); } } @@ -186,12 +186,12 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) { } static void move_next() { - ExecCtx::Get()->combiner_data()->active_combiner = - ExecCtx::Get() + grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = + grpc_core::ExecCtx::Get() ->combiner_data() ->active_combiner->next_combiner_on_this_exec_ctx; - if (ExecCtx::Get()->combiner_data()->active_combiner == NULL) { - ExecCtx::Get()->combiner_data()->last_combiner = NULL; + if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == NULL) { + grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = NULL; } } @@ -209,7 +209,8 @@ static void queue_offload(grpc_combiner* lock) { bool grpc_combiner_continue_exec_ctx() { GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0); - grpc_combiner* lock = ExecCtx::Get()->combiner_data()->active_combiner; + grpc_combiner* lock = + grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; if (lock == NULL) { GPR_TIMER_END("combiner.continue_exec_ctx", 0); return false; @@ -224,10 +225,10 @@ bool grpc_combiner_continue_exec_ctx() { "exec_ctx_ready_to_finish=%d " "time_to_execute_final_list=%d", lock, contended, - ExecCtx::Get()->IsReadyToFinish(), + grpc_core::ExecCtx::Get()->IsReadyToFinish(), lock->time_to_execute_final_list)); - if (contended && ExecCtx::Get()->IsReadyToFinish() && + if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && grpc_executor_is_threaded()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be @@ -333,11 +334,11 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, - closure, ExecCtx::Get()->combiner_data()->active_combiner)); + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, + closure, grpc_core::ExecCtx::Get()->combiner_data()->active_combiner)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); - if (ExecCtx::Get()->combiner_data()->active_combiner != lock) { + if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(enqueue_finally, closure, grpc_combiner_scheduler(lock)), diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 1a281322a8..65db4a9675 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -54,7 +54,7 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, char* final_name; create_sockets(sv); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; gpr_asprintf(&final_name, "%s:client", name); p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), args, diff --git a/src/core/lib/iomgr/endpoint_pair_windows.cc b/src/core/lib/iomgr/endpoint_pair_windows.cc index e0f211cdf9..afd91c9932 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.cc +++ b/src/core/lib/iomgr/endpoint_pair_windows.cc @@ -72,7 +72,7 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), channel_args, "endpoint:server"); p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 2b486887b8..f22fb82797 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -554,7 +554,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { static int poll_deadline_to_millis_timeout(grpc_millis millis) { if (millis == GRPC_MILLIS_INF_FUTURE) return -1; - grpc_millis delta = millis - ExecCtx::Get()->Now(); + grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now(); if (delta > INT_MAX) { return INT_MAX; } else if (delta < 0) { @@ -743,7 +743,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, SET_KICK_STATE(worker, KICKED); } } - ExecCtx::Get()->InvalidateNow(); + grpc_core::ExecCtx::Get()->InvalidateNow(); } if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -848,7 +848,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, /* Make sure we appear kicked */ SET_KICK_STATE(worker, KICKED); grpc_closure_list_move(&worker->schedule_on_end_work, - ExecCtx::Get()->closure_list()); + grpc_core::ExecCtx::Get()->closure_list()); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { if (worker->next != worker && worker->next->state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -859,9 +859,9 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, SET_KICK_STATE(worker->next, DESIGNATED_POLLER); GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(); gpr_cv_signal(&worker->next->cv); - if (ExecCtx::Get()->HasWork()) { + if (grpc_core::ExecCtx::Get()->HasWork()) { gpr_mu_unlock(&pollset->mu); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } } else { @@ -892,12 +892,12 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, found_worker = check_neighborhood_for_available_poller(neighborhood); gpr_mu_unlock(&neighborhood->mu); } - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } - } else if (ExecCtx::Get()->HasWork()) { + } else if (grpc_core::ExecCtx::Get()->HasWork()) { gpr_mu_unlock(&pollset->mu); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } if (worker->initialized_cv) { @@ -948,8 +948,8 @@ static grpc_error* pollset_work(grpc_pollset* ps, process_epoll_events() returns very quickly: It just queues the work on exec_ctx but does not execute it (the actual exectution or more - accurately ExecCtx::Get()->Flush() happens in end_worker() AFTER - selecting a designated poller). So we are not waiting long periods + accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker() + AFTER selecting a designated poller). So we are not waiting long periods without a designated poller */ if (gpr_atm_acq_load(&g_epoll_set.cursor) == gpr_atm_acq_load(&g_epoll_set.num_events)) { diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 385b5f68d0..5b4edd1e74 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -682,7 +682,7 @@ static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { static int poll_deadline_to_millis_timeout(grpc_millis millis) { if (millis == GRPC_MILLIS_INF_FUTURE) return -1; - grpc_millis delta = millis - ExecCtx::Get()->Now(); + grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now(); if (delta > INT_MAX) return INT_MAX; else if (delta < 0) @@ -902,7 +902,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, worker->pollable_obj, worker); } } - ExecCtx::Get()->InvalidateNow(); + grpc_core::ExecCtx::Get()->InvalidateNow(); } else { gpr_mu_unlock(&pollset->mu); } @@ -970,8 +970,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset, gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR " kwp=%d pollable=%p", - pollset, worker_hdl, WORKER_PTR, ExecCtx::Get()->Now(), deadline, - pollset->kicked_without_poller, pollset->active_pollable); + pollset, worker_hdl, WORKER_PTR, grpc_core::ExecCtx::Get()->Now(), + deadline, pollset->kicked_without_poller, pollset->active_pollable); } static const char* err_desc = "pollset_work"; grpc_error* error = GRPC_ERROR_NONE; @@ -990,7 +990,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, &error, pollable_process_events(pollset, WORKER_PTR->pollable_obj, false), err_desc); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); } diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index a9b094a2fa..4ded7c0211 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1090,7 +1090,7 @@ static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { static int poll_deadline_to_millis_timeout(grpc_millis millis) { if (millis == GRPC_MILLIS_INF_FUTURE) return -1; - grpc_millis delta = millis - ExecCtx::Get()->Now(); + grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now(); if (delta > INT_MAX) return INT_MAX; else if (delta < 0) @@ -1350,7 +1350,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pollset_work_and_unlock(pollset, &worker, timeout_ms, &g_orig_sigmask, &error); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->po.mu); @@ -1373,7 +1373,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, finish_shutdown_locked(pollset); gpr_mu_unlock(&pollset->po.mu); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->po.mu); } diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index cab4f7547c..b6546aa4b4 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1040,7 +1040,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, worker list, which means nobody could ask us to re-evaluate polling). */ done: if (!locked) { - queued_work |= ExecCtx::Get()->Flush(); + queued_work |= grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); locked = 1; } @@ -1074,7 +1074,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(pollset); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * pollset_work. @@ -1083,7 +1083,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { GRPC_CLOSURE_LIST_SCHED(&pollset->idle_jobs); gpr_mu_unlock(&pollset->mu); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } } @@ -1110,7 +1110,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { static int poll_deadline_to_millis_timeout(grpc_millis deadline) { if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; if (deadline == 0) return 0; - grpc_millis n = deadline - ExecCtx::Get()->Now(); + grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now(); if (n < 0) return 0; if (n > INT_MAX) return -1; return (int)n; diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index fe5a0e7e2d..de71c1cf9c 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -25,29 +25,6 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/profiling/timers.h" -thread_local ExecCtx* exec_ctx = nullptr; - -ExecCtx::ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { exec_ctx = this; } -ExecCtx::ExecCtx(uintptr_t fl) : flags_(fl) { exec_ctx = this; } -ExecCtx::~ExecCtx() { - GPR_ASSERT(exec_ctx == this); - flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; - Flush(); - exec_ctx = last_exec_ctx_; -} - -bool ExecCtx::IsReadyToFinish() { - if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { - if (CheckReadyToFinish()) { - flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; - return true; - } - return false; - } else { - return true; - } -} - void exec_ctx_run(grpc_closure* closure, grpc_error* error) { #ifndef NDEBUG closure->scheduled = false; @@ -67,48 +44,16 @@ void exec_ctx_run(grpc_closure* closure, grpc_error* error) { GRPC_ERROR_UNREF(error); } -bool ExecCtx::Flush() { - bool did_something = 0; - GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); - for (;;) { - if (!grpc_closure_list_empty(closure_list_)) { - grpc_closure* c = closure_list_.head; - closure_list_.head = closure_list_.tail = NULL; - while (c != NULL) { - grpc_closure* next = c->next_data.next; - grpc_error* error = c->error_data.error; - did_something = true; - exec_ctx_run(c, error); - c = next; - } - } else if (!grpc_combiner_continue_exec_ctx()) { - break; - } - } - GPR_ASSERT(combiner_data_.active_combiner == nullptr); - GPR_TIMER_END("grpc_exec_ctx_flush", 0); - return did_something; -} - -void exec_ctx_sched(grpc_closure* closure, grpc_error* error) { - grpc_closure_list_append(exec_ctx->closure_list(), closure, error); -} - static gpr_timespec g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the // last enum value in // gpr_clock_type -void ExecCtx::GlobalInit(void) { - for (int i = 0; i < GPR_TIMESPAN; i++) { - g_start_time[i] = gpr_now((gpr_clock_type)i); - } - // allows uniform treatment in conversion functions - g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN); +void exec_ctx_sched(grpc_closure* closure, grpc_error* error) { + grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, + error); } -void ExecCtx::GlobalShutdown(void) {} - static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); double x = @@ -154,6 +99,47 @@ grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { return timespec_to_atm_round_up(ts); } +static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { + exec_ctx_run, exec_ctx_sched, "exec_ctx"}; +static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; +grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler; + +namespace grpc_core { +thread_local ExecCtx* ExecCtx::exec_ctx_ = nullptr; + +bool ExecCtx::Flush() { + bool did_something = 0; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); + for (;;) { + if (!grpc_closure_list_empty(closure_list_)) { + grpc_closure* c = closure_list_.head; + closure_list_.head = closure_list_.tail = NULL; + while (c != NULL) { + grpc_closure* next = c->next_data.next; + grpc_error* error = c->error_data.error; + did_something = true; + exec_ctx_run(c, error); + c = next; + } + } else if (!grpc_combiner_continue_exec_ctx()) { + break; + } + } + GPR_ASSERT(combiner_data_.active_combiner == nullptr); + GPR_TIMER_END("grpc_exec_ctx_flush", 0); + return did_something; +} + +void ExecCtx::GlobalInit(void) { + for (int i = 0; i < GPR_TIMESPAN; i++) { + g_start_time[i] = gpr_now((gpr_clock_type)i); + } + // allows uniform treatment in conversion functions + g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN); +} + +void ExecCtx::GlobalShutdown(void) {} + grpc_millis ExecCtx::Now() { if (!now_is_valid_) { now_ = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); @@ -162,9 +148,4 @@ grpc_millis ExecCtx::Now() { return now_; } -ExecCtx* ExecCtx::Get() { return exec_ctx; } - -static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { - exec_ctx_run, exec_ctx_sched, "exec_ctx"}; -static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; -grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler; +} // namespace grpc_core diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index a71e43e178..7b8da2f0af 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -46,6 +46,7 @@ typedef struct grpc_combiner grpc_combiner; should be given to not delete said call/channel from this exec_ctx */ #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2 +namespace grpc_core { /** Execution context. * A bag of data that collects information along a callstack. * Generally created at public API entry points, and passed down as @@ -68,10 +69,25 @@ typedef struct grpc_combiner grpc_combiner; */ class ExecCtx { public: - ExecCtx(); - ExecCtx(uintptr_t fl); - ~ExecCtx(); + /** Default Constructor */ + ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { exec_ctx_ = this; } + + /** Parameterised Constructor */ + ExecCtx(uintptr_t fl) : flags_(fl) { exec_ctx_ = this; } + + /** Destructor */ + ~ExecCtx() { + GPR_ASSERT(exec_ctx_ == this); + flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; + Flush(); + exec_ctx_ = last_exec_ctx_; + } + + /** Disallow copy and assignment operators */ + ExecCtx(const ExecCtx&) = delete; + ExecCtx& operator=(const ExecCtx&) = delete; + /** Return starting_cpu */ unsigned starting_cpu() const { return starting_cpu_; } struct CombinerData { @@ -84,8 +100,13 @@ class ExecCtx { /** Only to be used by grpc-combiner code */ CombinerData* combiner_data() { return &combiner_data_; } + /** Return pointer to grpc_closure_list */ grpc_closure_list* closure_list() { return &closure_list_; } + /** Return flags */ + uintptr_t flags() { return flags_; } + + /** Checks if there is work to be done */ bool HasWork() { return combiner_data_.active_combiner != NULL || !grpc_closure_list_empty(closure_list_); @@ -99,32 +120,59 @@ class ExecCtx { /** Returns true if we'd like to leave this execution context as soon as possible: useful for deciding whether to do something more or not depending on outside context */ - bool IsReadyToFinish(); + bool IsReadyToFinish() { + if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { + if (CheckReadyToFinish()) { + flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; + return true; + } + return false; + } else { + return true; + } + } + /** Returns the stored current time relative to start if valid, + * otherwise refreshes the stored time, sets it valid and returns the new + * value */ grpc_millis Now(); + /** Invalidates the stored time value. A new time value will be set on calling + * Now() */ void InvalidateNow() { now_is_valid_ = false; } - void SetNow(grpc_millis new_val) { - now_ = new_val; + /** To be used only by shutdown code in iomgr */ + void SetNowIomgrShutdown() { + now_ = GRPC_MILLIS_INF_FUTURE; now_is_valid_ = true; } - uintptr_t flags() { return flags_; } + /** To be used only for testing. + * Sets the now value + */ + void TestOnlySetNow(grpc_millis new_val) { + now_ = new_val; + now_is_valid_ = true; + } /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void Finish(); + /** Global initialization for ExecCtx. Called by iomgr */ static void GlobalInit(void); + /** Global shutdown for ExecCtx. Called by iomgr */ static void GlobalShutdown(void); - static ExecCtx* Get(); + /** Gets pointer to current exec_ctx */ + static ExecCtx* Get() { return exec_ctx_; } protected: + /** Check if ready to finish */ virtual bool CheckReadyToFinish() { return false; } + private: grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT; CombinerData combiner_data_ = {nullptr, nullptr}; uintptr_t flags_; @@ -133,8 +181,10 @@ on outside context */ bool now_is_valid_ = false; grpc_millis now_ = 0; - ExecCtx* last_exec_ctx_ = Get(); + static thread_local ExecCtx* exec_ctx_; + ExecCtx* last_exec_ctx_ = exec_ctx_; }; +} // namespace grpc_core extern grpc_closure_scheduler* grpc_schedule_on_exec_ctx; diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index bf8805a2cd..4db298c1c5 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -78,7 +78,7 @@ static size_t run_closures(grpc_closure_list list) { GRPC_ERROR_UNREF(error); c = next; n++; - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); } return n; @@ -145,7 +145,7 @@ static void executor_thread(void* arg) { thread_state* ts = (thread_state*)arg; gpr_tls_set(&g_this_thread_state, (intptr_t)ts); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; size_t subtract_depth = 0; for (;;) { @@ -175,7 +175,7 @@ static void executor_thread(void* arg) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } - ExecCtx::Get()->InvalidateNow(); + grpc_core::ExecCtx::Get()->InvalidateNow(); subtract_depth = run_closures(exec); } } @@ -200,12 +200,14 @@ static void executor_push(grpc_closure* closure, grpc_error* error, gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); #endif } - grpc_closure_list_append(ExecCtx::Get()->closure_list(), closure, error); + grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), + closure, error); return; } thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state); if (ts == NULL) { - ts = &g_thread_state[GPR_HASH_POINTER(ExecCtx::Get(), cur_thread_count)]; + ts = &g_thread_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), + cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); } diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc index 1686bf2872..f5c6297438 100644 --- a/src/core/lib/iomgr/iocp_windows.cc +++ b/src/core/lib/iomgr/iocp_windows.cc @@ -46,7 +46,7 @@ static DWORD deadline_to_millis_timeout(grpc_millis deadline) { if (deadline == GRPC_MILLIS_INF_FUTURE) { return INFINITE; } - grpc_millis now = ExecCtx::Get()->Now(); + grpc_millis now = grpc_core::ExecCtx::Get()->Now(); if (deadline < now) return 0; grpc_millis timeout = deadline - now; if (timeout > std::numeric_limits<DWORD>::max()) return INFINITE; @@ -65,7 +65,7 @@ grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) { success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped, deadline_to_millis_timeout(deadline)); - ExecCtx::Get()->InvalidateNow(); + grpc_core::ExecCtx::Get()->InvalidateNow(); if (success == 0 && overlapped == NULL) { return GRPC_IOCP_WORK_TIMEOUT; } @@ -113,19 +113,20 @@ void grpc_iocp_kick(void) { } void grpc_iocp_flush(void) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_iocp_work_status work_status; do { work_status = grpc_iocp_work(GRPC_MILLIS_INF_PAST); - } while (work_status == GRPC_IOCP_WORK_KICK || ExecCtx::Get()->Flush()); + } while (work_status == GRPC_IOCP_WORK_KICK || + grpc_core::ExecCtx::Get()->Flush()); } void grpc_iocp_shutdown(void) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; while (gpr_atm_acq_load(&g_custom_events)) { grpc_iocp_work(GRPC_MILLIS_INF_FUTURE); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); } GPR_ASSERT(CloseHandle(g_iocp)); diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index a1add4a303..9c74b5d1c3 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -49,7 +49,7 @@ void grpc_iomgr_init() { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); - ExecCtx::GlobalInit(); + grpc_core::ExecCtx::GlobalInit(); grpc_executor_init(); grpc_timer_list_init(); g_root_object.next = g_root_object.prev = &g_root_object; @@ -98,10 +98,10 @@ void grpc_iomgr_shutdown() { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - ExecCtx::Get()->SetNow(GRPC_MILLIS_INF_FUTURE); + grpc_core::ExecCtx::Get()->SetNowIomgrShutdown(); if (grpc_timer_check(NULL) == GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); grpc_iomgr_platform_flush(); gpr_mu_lock(&g_mu); continue; @@ -136,14 +136,14 @@ void grpc_iomgr_shutdown() { gpr_mu_unlock(&g_mu); grpc_timer_list_shutdown(); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); /* ensure all threads have left g_mu */ gpr_mu_lock(&g_mu); gpr_mu_unlock(&g_mu); grpc_iomgr_platform_shutdown(); - ExecCtx::GlobalShutdown(); + grpc_core::ExecCtx::GlobalShutdown(); grpc_network_status_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); diff --git a/src/core/lib/iomgr/iomgr_uv.cc b/src/core/lib/iomgr/iomgr_uv.cc index 2ab414252a..5823bb7ec2 100644 --- a/src/core/lib/iomgr/iomgr_uv.cc +++ b/src/core/lib/iomgr/iomgr_uv.cc @@ -29,7 +29,7 @@ gpr_thd_id g_init_thread; void grpc_iomgr_platform_init(void) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_pollset_global_init(); grpc_register_tracer(&grpc_tcp_trace); grpc_executor_set_threading(false); diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc index a68ad4a6e3..89d28a0f9d 100644 --- a/src/core/lib/iomgr/pollset_uv.cc +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -124,7 +124,7 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { - grpc_millis now = ExecCtx::Get()->Now(); + grpc_millis now = grpc_core::ExecCtx::Get()->Now(); if (deadline >= now) { timeout = deadline - now; } else { @@ -143,7 +143,7 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, } } if (!grpc_closure_list_empty(exec_ctx->closure_list)) { - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); } gpr_mu_lock(&grpc_polling_mu); return GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc index 5ff3e7cb3a..81e1d009ca 100644 --- a/src/core/lib/iomgr/pollset_windows.cc +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -129,7 +129,7 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, g_active_poller = &worker; gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(deadline); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&grpc_polling_mu); pollset->is_iocp_worker = 0; g_active_poller = NULL; @@ -160,10 +160,10 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, while (!worker.kicked) { if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { - ExecCtx::Get()->InvalidateNow(); + grpc_core::ExecCtx::Get()->InvalidateNow(); break; } - ExecCtx::Get()->InvalidateNow(); + grpc_core::ExecCtx::Get()->InvalidateNow(); } } else { pollset->kicked_without_pollers = 0; @@ -171,7 +171,7 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset, done: if (!grpc_closure_list_empty(exec_ctx->closure_list)) { gpr_mu_unlock(&grpc_polling_mu); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&grpc_polling_mu); } if (added_worker) { diff --git a/src/core/lib/iomgr/resolve_address_uv.cc b/src/core/lib/iomgr/resolve_address_uv.cc index ffd70c4f35..54adf9b9f6 100644 --- a/src/core/lib/iomgr/resolve_address_uv.cc +++ b/src/core/lib/iomgr/resolve_address_uv.cc @@ -114,7 +114,7 @@ static grpc_error* handle_addrinfo_result(int status, struct addrinfo* result, static void getaddrinfo_callback(uv_getaddrinfo_t* req, int status, struct addrinfo* res) { request* r = (request*)req->data; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_error* error; int retry_status; char* port = r->port; diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 8fee585f4b..11cb5ddbee 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -622,7 +622,7 @@ void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) { /* Public API */ void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_resource_quota_unref_internal(resource_quota); } @@ -647,7 +647,7 @@ double grpc_resource_quota_get_memory_pressure( /* Public API */ void grpc_resource_quota_resize(grpc_resource_quota* resource_quota, size_t size) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; rq_resize_args* a = (rq_resize_args*)gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_ref_internal(resource_quota); a->size = (int64_t)size; diff --git a/src/core/lib/iomgr/tcp_client_uv.cc b/src/core/lib/iomgr/tcp_client_uv.cc index 7454b01445..0bf3a043df 100644 --- a/src/core/lib/iomgr/tcp_client_uv.cc +++ b/src/core/lib/iomgr/tcp_client_uv.cc @@ -76,7 +76,7 @@ static void uv_tc_on_alarm(void* acp, grpc_error* error) { static void uv_tc_on_connect(uv_connect_t* req, int status) { grpc_uv_tcp_connect* connect = (grpc_uv_tcp_connect*)req->data; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_error* error = GRPC_ERROR_NONE; int done; grpc_closure* closure = connect->closure; @@ -105,7 +105,7 @@ static void uv_tc_on_connect(uv_connect_t* req, int status) { } done = (--connect->refs == 0); if (done) { - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); uv_tcp_connect_cleanup(connect); } GRPC_CLOSURE_SCHED(closure, error); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index de3dabd7fc..04d4440f9e 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -130,7 +130,7 @@ static void run_poller(void* bp, grpc_error* error_ignored) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); - grpc_millis deadline = ExecCtx::Get()->Now() + 13 * GPR_MS_PER_SEC; + grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 13 * GPR_MS_PER_SEC; GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(); GRPC_LOG_IF_ERROR( "backup_poller:pollset_work", diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc index 9db2cbe58d..daa7afe95f 100644 --- a/src/core/lib/iomgr/tcp_server_uv.cc +++ b/src/core/lib/iomgr/tcp_server_uv.cc @@ -137,7 +137,7 @@ static void finish_shutdown(grpc_tcp_server* s) { static void handle_close_callback(uv_handle_t* handle) { grpc_tcp_listener* sp = (grpc_tcp_listener*)handle->data; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; sp->server->open_ports--; if (sp->server->open_ports == 0 && sp->server->shutdown) { finish_shutdown(sp->server); @@ -174,9 +174,9 @@ void grpc_tcp_server_unref(grpc_tcp_server* s) { GRPC_UV_ASSERT_SAME_THREAD(); if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); tcp_server_destroy(s); } } @@ -223,7 +223,7 @@ static void finish_accept(grpc_tcp_listener* sp) { static void on_connect(uv_stream_t* server, int status) { grpc_tcp_listener* sp = (grpc_tcp_listener*)server->data; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; if (status < 0) { switch (status) { diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index 3ea9674840..742ab9a754 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -112,7 +112,7 @@ static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); } #endif static void uv_close_callback(uv_handle_t* handle) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_tcp* tcp = (grpc_tcp*)handle->data; TCP_UNREF(tcp, "destroy"); } @@ -124,7 +124,7 @@ static grpc_slice alloc_read_slice(grpc_resource_user* resource_user) { static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_tcp* tcp = (grpc_tcp*)handle->data; (void)suggested_size; buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slice); @@ -135,7 +135,7 @@ static void read_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { grpc_slice sub; grpc_error* error; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_tcp* tcp = (grpc_tcp*)stream->data; grpc_closure* cb = tcp->read_cb; if (nread == 0) { @@ -204,7 +204,7 @@ static void uv_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, static void write_callback(uv_write_t* req, int status) { grpc_tcp* tcp = (grpc_tcp*)req->data; grpc_error* error; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_closure* cb = tcp->write_cb; tcp->write_cb = NULL; TCP_UNREF(tcp, "write"); @@ -355,7 +355,7 @@ grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle, grpc_resource_quota* resource_quota, char* peer_string) { grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp)); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index d5e6066f35..ae82701676 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -249,7 +249,7 @@ void grpc_timer_list_init() { g_shared_mutables.initialized = true; g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; gpr_mu_init(&g_shared_mutables.mu); - g_shared_mutables.min_timer = ExecCtx::Get()->Now(); + g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now(); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); grpc_register_tracer(&grpc_timer_trace); @@ -341,7 +341,7 @@ void grpc_timer_init(grpc_timer* timer, grpc_millis deadline, if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, - deadline, ExecCtx::Get()->Now(), closure, closure->cb); + deadline, grpc_core::ExecCtx::Get()->Now(), closure, closure->cb); } if (!g_shared_mutables.initialized) { @@ -354,7 +354,7 @@ void grpc_timer_init(grpc_timer* timer, grpc_millis deadline, gpr_mu_lock(&shard->mu); timer->pending = true; - grpc_millis now = ExecCtx::Get()->Now(); + grpc_millis now = grpc_core::ExecCtx::Get()->Now(); if (deadline <= now) { timer->pending = false; GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE); @@ -607,7 +607,7 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now, grpc_timer_check_result grpc_timer_check(grpc_millis* next) { // prelude - grpc_millis now = ExecCtx::Get()->Now(); + grpc_millis now = grpc_core::ExecCtx::Get()->Now(); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 69adb673d8..6a43f4fadb 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -98,7 +98,7 @@ static void start_timer_thread_and_unlock(void) { } void grpc_timer_manager_tick() { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_millis next = GRPC_MILLIS_INF_FUTURE; grpc_timer_check(&next); } @@ -125,7 +125,7 @@ static void run_some_timers() { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "flush exec_ctx"); } - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&g_mu); // garbage collect any threads hanging out that are dead gc_completed_threads(); @@ -178,7 +178,7 @@ static bool wait_until(grpc_millis next) { g_timed_waiter_deadline = next; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - grpc_millis wait_time = next - ExecCtx::Get()->Now(); + grpc_millis wait_time = next - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", wait_time); } @@ -223,7 +223,7 @@ static bool wait_until(grpc_millis next) { static void timer_main_loop() { for (;;) { grpc_millis next = GRPC_MILLIS_INF_FUTURE; - ExecCtx::Get()->InvalidateNow(); + grpc_core::ExecCtx::Get()->InvalidateNow(); // check timer state, updates next to the next time to run a check switch (grpc_timer_check(&next)) { case GRPC_TIMERS_FIRED: @@ -273,7 +273,7 @@ static void timer_thread_cleanup(completed_thread* ct) { static void timer_thread(void* completed_thread_ptr) { // this threads exec_ctx: we try to run things through to completion here // since it's easy to spin up new threads - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; timer_main_loop(); timer_thread_cleanup((completed_thread*)completed_thread_ptr); diff --git a/src/core/lib/iomgr/timer_uv.cc b/src/core/lib/iomgr/timer_uv.cc index 6edd4169f1..1432eba51e 100644 --- a/src/core/lib/iomgr/timer_uv.cc +++ b/src/core/lib/iomgr/timer_uv.cc @@ -45,7 +45,7 @@ static void stop_uv_timer(uv_timer_t* handle) { void run_expired_timer(uv_timer_t* handle) { grpc_timer* timer = (grpc_timer*)handle->data; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(timer->pending); timer->pending = 0; @@ -59,13 +59,13 @@ void grpc_timer_init(grpc_timer* timer, grpc_millis deadline, uv_timer_t* uv_timer; GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; - if (deadline <= ExecCtx::Get()->Now()) { + if (deadline <= grpc_core::ExecCtx::Get()->Now()) { timer->pending = 0; GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE); return; } timer->pending = 1; - timeout = (uint64_t)(deadline - ExecCtx::Get()->Now()); + timeout = (uint64_t)(deadline - grpc_core::ExecCtx::Get()->Now()); uv_timer = (uv_timer_t*)gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); uv_timer->data = timer; diff --git a/src/core/lib/security/context/security_context.cc b/src/core/lib/security/context/security_context.cc index 9b58b3657f..570db78882 100644 --- a/src/core/lib/security/context/security_context.cc +++ b/src/core/lib/security/context/security_context.cc @@ -38,7 +38,7 @@ grpc_tracer_flag grpc_trace_auth_context_refcount = grpc_call_error grpc_call_set_credentials(grpc_call* call, grpc_call_credentials* creds) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_client_security_context* ctx = NULL; GRPC_API_TRACE("grpc_call_set_credentials(call=%p, creds=%p)", 2, (call, creds)); @@ -87,7 +87,7 @@ grpc_client_security_context* grpc_client_security_context_create(void) { } void grpc_client_security_context_destroy(void* ctx) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_client_security_context* c = (grpc_client_security_context*)ctx; grpc_call_credentials_unref(c->creds); GRPC_AUTH_CONTEXT_UNREF(c->auth_context, "client_security_context"); diff --git a/src/core/lib/security/credentials/credentials.cc b/src/core/lib/security/credentials/credentials.cc index 6a272653f8..5181f7f260 100644 --- a/src/core/lib/security/credentials/credentials.cc +++ b/src/core/lib/security/credentials/credentials.cc @@ -72,7 +72,7 @@ void grpc_channel_credentials_unref(grpc_channel_credentials* creds) { void grpc_channel_credentials_release(grpc_channel_credentials* creds) { GRPC_API_TRACE("grpc_channel_credentials_release(creds=%p)", 1, (creds)); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_channel_credentials_unref(creds); } @@ -94,7 +94,7 @@ void grpc_call_credentials_unref(grpc_call_credentials* creds) { void grpc_call_credentials_release(grpc_call_credentials* creds) { GRPC_API_TRACE("grpc_call_credentials_release(creds=%p)", 1, (creds)); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_call_credentials_unref(creds); } @@ -209,7 +209,7 @@ void grpc_server_credentials_unref(grpc_server_credentials* creds) { void grpc_server_credentials_release(grpc_server_credentials* creds) { GRPC_API_TRACE("grpc_server_credentials_release(creds=%p)", 1, (creds)); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_server_credentials_unref(creds); } diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.cc b/src/core/lib/security/credentials/google_default/google_default_credentials.cc index 03ec4bc3b3..a8991943b0 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.cc +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.cc @@ -114,13 +114,13 @@ static int is_stack_running_on_compute_engine() { grpc_resource_quota_create("google_default_credentials"); grpc_httpcli_get( &context, &detector.pollent, resource_quota, &request, - ExecCtx::Get()->Now() + max_detection_delay, + grpc_core::ExecCtx::Get()->Now() + max_detection_delay, GRPC_CLOSURE_CREATE(on_compute_engine_detection_http_response, &detector, grpc_schedule_on_exec_ctx), &detector.response); grpc_resource_quota_unref_internal(resource_quota); - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ @@ -144,7 +144,7 @@ static int is_stack_running_on_compute_engine() { grpc_pollset_shutdown(grpc_polling_entity_pollset(&detector.pollent), &destroy_closure); g_polling_mu = NULL; - ExecCtx::Get()->Flush(); + grpc_core::ExecCtx::Get()->Flush(); gpr_free(grpc_polling_entity_pollset(&detector.pollent)); grpc_http_response_destroy(&detector.response); @@ -220,7 +220,7 @@ grpc_channel_credentials* grpc_google_default_credentials_create(void) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Failed to create Google credentials"); grpc_error* err; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_google_default_credentials_create(void)", 0, ()); @@ -290,7 +290,7 @@ end: } void grpc_flush_cached_google_default_credentials(void) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; gpr_once_init(&g_once, init_default_credentials); gpr_mu_lock(&g_state_mu); if (default_credentials != NULL) { diff --git a/src/core/lib/security/credentials/iam/iam_credentials.cc b/src/core/lib/security/credentials/iam/iam_credentials.cc index 4d9da0cbe3..9f3a86877b 100644 --- a/src/core/lib/security/credentials/iam/iam_credentials.cc +++ b/src/core/lib/security/credentials/iam/iam_credentials.cc @@ -54,7 +54,7 @@ static grpc_call_credentials_vtable iam_vtable = { grpc_call_credentials* grpc_google_iam_credentials_create( const char* token, const char* authority_selector, void* reserved) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE( "grpc_iam_credentials_create(token=%s, authority_selector=%s, " "reserved=%p)", diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.cc b/src/core/lib/security/credentials/jwt/jwt_credentials.cc index ccc3f4aeed..3facce1798 100644 --- a/src/core/lib/security/credentials/jwt/jwt_credentials.cc +++ b/src/core/lib/security/credentials/jwt/jwt_credentials.cc @@ -181,7 +181,7 @@ grpc_call_credentials* grpc_service_account_jwt_access_credentials_create( gpr_free(clean_json); } GPR_ASSERT(reserved == NULL); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_call_credentials* creds = grpc_service_account_jwt_access_credentials_create_from_auth_json_key( grpc_auth_json_key_create_from_string(json_key), token_lifetime); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.cc b/src/core/lib/security/credentials/jwt/jwt_verifier.cc index 5246e1f985..dd0d206b01 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.cc +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.cc @@ -347,7 +347,7 @@ static verifier_cb_ctx* verifier_cb_ctx_create( grpc_jwt_claims* claims, const char* audience, grpc_slice signature, const char* signed_jwt, size_t signed_jwt_len, void* user_data, grpc_jwt_verification_done_cb cb) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; verifier_cb_ctx* ctx = (verifier_cb_ctx*)gpr_zalloc(sizeof(verifier_cb_ctx)); ctx->verifier = verifier; ctx->pollent = grpc_polling_entity_create_from_pollset(pollset); @@ -702,7 +702,7 @@ static void on_openid_config_retrieved(void* user_data, grpc_error* error) { resource_quota = grpc_resource_quota_create("jwt_verifier"); grpc_httpcli_get( &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, - ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, + grpc_core::ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), &ctx->responses[HTTP_RESPONSE_KEYS]); grpc_resource_quota_unref_internal(resource_quota); @@ -827,9 +827,10 @@ static void retrieve_key_and_verify(verifier_cb_ctx* ctx) { channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ resource_quota = grpc_resource_quota_create("jwt_verifier"); - grpc_httpcli_get(&ctx->verifier->http_ctx, &ctx->pollent, resource_quota, - &req, ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, - http_cb, &ctx->responses[rsp_idx]); + grpc_httpcli_get( + &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, + grpc_core::ExecCtx::Get()->Now() + grpc_jwt_verifier_max_delay, http_cb, + &ctx->responses[rsp_idx]); grpc_resource_quota_unref_internal(resource_quota); gpr_free(req.host); gpr_free(req.http.path); diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index b653705609..71be15a46e 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -217,7 +217,7 @@ static void on_oauth2_token_fetcher_http_response(void* user_data, c->token_fetch_pending = false; c->access_token_md = GRPC_MDELEM_REF(access_token_md); c->token_expiration = status == GRPC_CREDENTIALS_OK - ? ExecCtx::Get()->Now() + token_lifetime + ? grpc_core::ExecCtx::Get()->Now() + token_lifetime : 0; grpc_oauth2_pending_get_request_metadata* pending_request = c->pending_requests; @@ -256,7 +256,8 @@ static bool oauth2_token_fetcher_get_request_metadata( grpc_mdelem cached_access_token_md = GRPC_MDNULL; gpr_mu_lock(&c->mu); if (!GRPC_MDISNULL(c->access_token_md) && - (c->token_expiration - ExecCtx::Get()->Now() > refresh_threshold)) { + (c->token_expiration - grpc_core::ExecCtx::Get()->Now() > + refresh_threshold)) { cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md); } if (!GRPC_MDISNULL(cached_access_token_md)) { @@ -288,7 +289,7 @@ static bool oauth2_token_fetcher_get_request_metadata( c->fetch_func(grpc_credentials_metadata_request_create(creds), &c->httpcli_context, &c->pollent, on_oauth2_token_fetcher_http_response, - ExecCtx::Get()->Now() + refresh_threshold); + grpc_core::ExecCtx::Get()->Now() + refresh_threshold); } return false; } @@ -514,7 +515,7 @@ grpc_call_credentials* grpc_access_token_credentials_create( gpr_ref_init(&c->base.refcount, 1); char* token_md_value; gpr_asprintf(&token_md_value, "Bearer %s", access_token); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; c->access_token_md = grpc_mdelem_from_slices( grpc_slice_from_static_string(GRPC_AUTHORIZATION_METADATA_KEY), grpc_slice_from_copied_string(token_md_value)); diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.cc b/src/core/lib/security/credentials/plugin/plugin_credentials.cc index 025d024617..7634cadc3a 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.cc +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.cc @@ -116,8 +116,8 @@ static void plugin_md_request_metadata_ready(void* request, grpc_status_code status, const char* error_details) { /* called from application code */ - ExecCtx _local_exec_ctx(GRPC_EXEC_CTX_FLAG_IS_FINISHED | - GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP); + grpc_core::ExecCtx _local_exec_ctx(GRPC_EXEC_CTX_FLAG_IS_FINISHED | + GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP); grpc_plugin_credentials_pending_request* r = (grpc_plugin_credentials_pending_request*)request; if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index 4ed2ec55bd..0df3375d34 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -255,7 +255,7 @@ static void on_handshake_next_done_grpc_wrapper( security_handshaker* h = (security_handshaker*)user_data; // This callback will be invoked by TSI in a non-grpc thread, so it's // safe to create our own exec_ctx here. - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; gpr_mu_lock(&h->mu); grpc_error* error = on_handshake_next_done_locked( h, result, bytes_to_send, bytes_to_send_size, handshaker_result); diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 86817076f8..5f5ff7c09c 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -118,7 +118,7 @@ static void on_md_processing_done( grpc_status_code status, const char* error_details) { grpc_call_element* elem = (grpc_call_element*)user_data; call_data* calld = (call_data*)elem->call_data; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; // If the call was not cancelled while we were in flight, process the result. if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, (gpr_atm)STATE_DONE)) { diff --git a/src/core/lib/slice/slice.cc b/src/core/lib/slice/slice.cc index 6e1554d471..3604bb77a8 100644 --- a/src/core/lib/slice/slice.cc +++ b/src/core/lib/slice/slice.cc @@ -67,7 +67,7 @@ grpc_slice grpc_slice_ref(grpc_slice slice) { /* Public API */ void grpc_slice_unref(grpc_slice slice) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_slice_unref_internal(slice); } diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index 4bc54c303f..0a4d48f114 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -73,7 +73,7 @@ void grpc_slice_buffer_destroy_internal(grpc_slice_buffer* sb) { } void grpc_slice_buffer_destroy(grpc_slice_buffer* sb) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_slice_buffer_destroy_internal(sb); } @@ -172,7 +172,7 @@ void grpc_slice_buffer_reset_and_unref_internal(grpc_slice_buffer* sb) { } void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer* sb) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_slice_buffer_reset_and_unref_internal(sb); } diff --git a/src/core/lib/surface/alarm.cc b/src/core/lib/surface/alarm.cc index 7aee100f3f..8dcfb1ddb5 100644 --- a/src/core/lib/surface/alarm.cc +++ b/src/core/lib/surface/alarm.cc @@ -47,7 +47,7 @@ static void alarm_ref(grpc_alarm* alarm) { gpr_ref(&alarm->refs); } static void alarm_unref(grpc_alarm* alarm) { if (gpr_unref(&alarm->refs)) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; if (alarm->cq != NULL) { GRPC_CQ_INTERNAL_UNREF(alarm->cq, "alarm"); } @@ -117,7 +117,7 @@ grpc_alarm* grpc_alarm_create(void* reserved) { void grpc_alarm_set(grpc_alarm* alarm, grpc_completion_queue* cq, gpr_timespec deadline, void* tag, void* reserved) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_CQ_INTERNAL_REF(cq, "alarm"); alarm->cq = cq; @@ -129,7 +129,7 @@ void grpc_alarm_set(grpc_alarm* alarm, grpc_completion_queue* cq, } void grpc_alarm_cancel(grpc_alarm* alarm, void* reserved) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_timer_cancel(&alarm->alarm); } diff --git a/src/core/lib/surface/byte_buffer.cc b/src/core/lib/surface/byte_buffer.cc index 6a9b13bb41..03097c6896 100644 --- a/src/core/lib/surface/byte_buffer.cc +++ b/src/core/lib/surface/byte_buffer.cc @@ -71,7 +71,7 @@ grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) { void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { if (!bb) return; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; switch (bb->type) { case GRPC_BB_RAW: grpc_slice_buffer_destroy_internal(&bb->data.raw.slice_buffer); diff --git a/src/core/lib/surface/byte_buffer_reader.cc b/src/core/lib/surface/byte_buffer_reader.cc index 9a9e26ecdc..c5f8df3dda 100644 --- a/src/core/lib/surface/byte_buffer_reader.cc +++ b/src/core/lib/surface/byte_buffer_reader.cc @@ -42,7 +42,7 @@ static int is_compressed(grpc_byte_buffer* buffer) { int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_slice_buffer decompressed_slices_buffer; reader->buffer_in = buffer; switch (reader->buffer_in->type) { @@ -110,7 +110,7 @@ grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader) { grpc_slice out_slice = GRPC_SLICE_MALLOC(input_size); uint8_t* const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */ - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) { const size_t slice_length = GRPC_SLICE_LENGTH(in_slice); memcpy(&(outbuf[bytes_read]), GRPC_SLICE_START_PTR(in_slice), slice_length); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index bbb7a39e29..cb858785f2 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -561,7 +561,7 @@ void grpc_call_unref(grpc_call* c) { if (!gpr_unref(&c->ext_ref)) return; child_call* cc = c->child; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GPR_TIMER_BEGIN("grpc_call_unref", 0); GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); @@ -602,7 +602,7 @@ void grpc_call_unref(grpc_call* c) { grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(!reserved); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); return GRPC_CALL_OK; @@ -652,7 +652,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c, grpc_status_code status, const char* description, void* reserved) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE( "grpc_call_cancel_with_status(" "c=%p, status=%d, description=%s, reserved=%p)", @@ -2034,7 +2034,7 @@ done_with_error: grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops, size_t nops, void* tag, void* reserved) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_call_error err; GRPC_API_TRACE( diff --git a/src/core/lib/surface/call_details.cc b/src/core/lib/surface/call_details.cc index 03ce7f88fb..7d81ba9e22 100644 --- a/src/core/lib/surface/call_details.cc +++ b/src/core/lib/surface/call_details.cc @@ -34,7 +34,7 @@ void grpc_call_details_init(grpc_call_details* cd) { void grpc_call_details_destroy(grpc_call_details* cd) { GRPC_API_TRACE("grpc_call_details_destroy(cd=%p)", 1, (cd)); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_slice_unref_internal(cd->method); grpc_slice_unref_internal(cd->host); } diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 7725351f74..80ba47676e 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -248,7 +248,7 @@ char* grpc_channel_get_target(grpc_channel* channel) { void grpc_channel_get_info(grpc_channel* channel, const grpc_channel_info* channel_info) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_channel_element* elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); elem->filter->get_channel_info(elem, channel_info); @@ -296,7 +296,7 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel, grpc_slice method, const grpc_slice* host, gpr_timespec deadline, void* reserved) { GPR_ASSERT(!reserved); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_call* call = grpc_channel_create_call_internal( channel, parent_call, propagation_mask, cq, NULL, grpc_mdelem_from_slices(GRPC_MDSTR_PATH, grpc_slice_ref_internal(method)), @@ -329,7 +329,7 @@ void* grpc_channel_register_call(grpc_channel* channel, const char* method, "grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)", 4, (channel, method, host, reserved)); GPR_ASSERT(!reserved); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; rc->path = grpc_mdelem_from_slices( GRPC_MDSTR_PATH, @@ -364,7 +364,7 @@ grpc_call* grpc_channel_create_registered_call( registered_call_handle, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_call* call = grpc_channel_create_call_internal( channel, parent_call, propagation_mask, completion_queue, NULL, GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), @@ -407,7 +407,7 @@ static void destroy_channel(void* arg, grpc_error* error) { void grpc_channel_destroy(grpc_channel* channel) { grpc_transport_op* op = grpc_make_transport_op(NULL); grpc_channel_element* elem; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Destroyed"); diff --git a/src/core/lib/surface/channel_ping.cc b/src/core/lib/surface/channel_ping.cc index 06cdbf6c73..545b8fe6ee 100644 --- a/src/core/lib/surface/channel_ping.cc +++ b/src/core/lib/surface/channel_ping.cc @@ -51,7 +51,7 @@ void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq, ping_result* pr = (ping_result*)gpr_malloc(sizeof(*pr)); grpc_channel_element* top_elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GPR_ASSERT(reserved == NULL); pr->tag = tag; pr->cq = cq; diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 0b0a8d070d..24c502881a 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -124,7 +124,7 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset, while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) ; - ExecCtx::Get()->InvalidateNow(); + grpc_core::ExecCtx::Get()->InvalidateNow(); if (&w == npp->root) { npp->root = w.next; if (&w == npp->root) { @@ -359,7 +359,7 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, if (storage != NULL && (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) { *tag = storage->tag; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; *ok = (storage->next & (uintptr_t)(1)) == 1; storage->done(storage->done_arg, storage); ret = 1; @@ -395,7 +395,7 @@ static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) { static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { grpc_cq_completion* c = NULL; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; if (gpr_spinlock_trylock(&q->queue_lock)) { GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(); @@ -440,7 +440,7 @@ grpc_completion_queue* grpc_completion_queue_create_internal( const cq_poller_vtable* poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_STATS_INC_CQS_CREATED(); cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) + @@ -790,7 +790,7 @@ typedef struct { bool first_loop; } cq_is_finished_arg; -class ExecCtxNext : public ExecCtx { +class ExecCtxNext : public grpc_core::ExecCtx { public: ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} @@ -818,7 +818,7 @@ class ExecCtxNext : public ExecCtx { return true; } } - return !a->first_loop && a->deadline < ExecCtx::Get()->Now(); + return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); } private: @@ -929,7 +929,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, } if (!is_finished_arg.first_loop && - ExecCtx::Get()->Now() >= deadline_millis) { + grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -1045,7 +1045,7 @@ static void del_plucker(grpc_completion_queue* cq, void* tag, GPR_UNREACHABLE_CODE(return ); } -class ExecCtxPluck : public ExecCtx { +class ExecCtxPluck : public grpc_core::ExecCtx { public: ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} @@ -1079,7 +1079,7 @@ class ExecCtxPluck : public ExecCtx { } gpr_mu_unlock(cq->mu); } - return !a->first_loop && a->deadline < ExecCtx::Get()->Now(); + return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); } private: @@ -1169,7 +1169,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, break; } if (!is_finished_arg.first_loop && - ExecCtx::Get()->Now() >= deadline_millis) { + grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1250,7 +1250,7 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) { /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); cq->vtable->shutdown(cq); @@ -1263,7 +1263,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue* cq) { GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cq); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_CQ_INTERNAL_UNREF(cq, "destroy"); GPR_TIMER_END("grpc_completion_queue_destroy", 0); diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 20e17a7f60..fdbf926f77 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -116,7 +116,7 @@ void grpc_init(void) { int i; gpr_once_init(&g_basic_init, do_basic_init); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); @@ -175,7 +175,7 @@ void grpc_init(void) { void grpc_shutdown(void) { int i; GRPC_API_TRACE("grpc_shutdown(void)", 0, ()); - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { grpc_executor_shutdown(); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index da081e68cb..3bbdd21285 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -156,7 +156,7 @@ extern "C" const grpc_channel_filter grpc_lame_filter = { grpc_channel* grpc_lame_client_channel_create(const char* target, grpc_status_code error_code, const char* error_message) { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; grpc_channel_element* elem; grpc_channel* channel = grpc_channel_create(target, NULL, GRPC_CLIENT_LAME_CHANNEL, NULL); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 0d4435d556..835e495495 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1022,7 +1022,7 @@ static void start_listeners(void* s, grpc_error* error) { void grpc_server_start(grpc_server* server) { size_t i; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); @@ -1168,7 +1168,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, listener* l; shutdown_tag* sdt; channel_broadcaster broadcaster; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); @@ -1227,7 +1227,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, void grpc_server_cancel_all_calls(grpc_server* server) { channel_broadcaster broadcaster; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server)); @@ -1242,7 +1242,7 @@ void grpc_server_cancel_all_calls(grpc_server* server) { void grpc_server_destroy(grpc_server* server) { listener* l; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server)); @@ -1324,7 +1324,7 @@ grpc_call_error grpc_server_request_call( grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { grpc_call_error error; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; requested_call* rc = (requested_call*)gpr_malloc(sizeof(*rc)); GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); GRPC_API_TRACE( @@ -1371,7 +1371,7 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { grpc_call_error error; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; requested_call* rc = (requested_call*)gpr_malloc(sizeof(*rc)); registered_method* rm = (registered_method*)rmp; GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 4e279b4d94..d2b6e5db25 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -79,7 +79,7 @@ grpc_millis BdpEstimator::CompletePing() { } ping_state_ = PingState::UNSCHEDULED; accumulator_ = 0; - return ExecCtx::Get()->Now() + inter_ping_delay_; + return grpc_core::ExecCtx::Get()->Now() + inter_ping_delay_; } } // namespace grpc_core diff --git a/src/core/lib/transport/status_conversion.cc b/src/core/lib/transport/status_conversion.cc index 61470b8c78..46cba4292b 100644 --- a/src/core/lib/transport/status_conversion.cc +++ b/src/core/lib/transport/status_conversion.cc @@ -46,8 +46,9 @@ grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error, case GRPC_HTTP2_CANCEL: /* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been * exceeded */ - return ExecCtx::Get()->Now() > deadline ? GRPC_STATUS_DEADLINE_EXCEEDED - : GRPC_STATUS_CANCELLED; + return grpc_core::ExecCtx::Get()->Now() > deadline + ? GRPC_STATUS_DEADLINE_EXCEEDED + : GRPC_STATUS_CANCELLED; case GRPC_HTTP2_ENHANCE_YOUR_CALM: return GRPC_STATUS_RESOURCE_EXHAUSTED; case GRPC_HTTP2_INADEQUATE_SECURITY: diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index ca80a7404d..ed2f02de55 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -62,7 +62,8 @@ void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) { void grpc_stream_unref(grpc_stream_refcount* refcount) { #endif if (gpr_unref(&refcount->refs)) { - if (ExecCtx::Get()->flags() & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { + if (grpc_core::ExecCtx::Get()->flags() & + GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { /* Ick. The thread we're running on MAY be owned (indirectly) by a call-stack. If that's the case, destroying the call-stack MAY try to destroy the diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index 0bcfac2845..82d687c530 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -67,7 +67,7 @@ ChannelArguments::ChannelArguments(const ChannelArguments& other) } ChannelArguments::~ChannelArguments() { - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; for (auto it = args_.begin(); it != args_.end(); ++it) { if (it->type == GRPC_ARG_POINTER) { it->value.pointer.vtable->destroy(it->value.pointer.p); @@ -95,7 +95,7 @@ void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { } grpc_arg mutator_arg = grpc_socket_mutator_to_arg(mutator); bool replaced = false; - ExecCtx _local_exec_ctx; + grpc_core::ExecCtx _local_exec_ctx; for (auto it = args_.begin(); it != args_.end(); ++it) { if (it->type == mutator_arg.type && grpc::string(it->key) == grpc::string(mutator_arg.key)) { |