diff options
Diffstat (limited to 'src/core')
24 files changed, 109 insertions, 75 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel_factory.cc b/src/core/ext/filters/client_channel/client_channel_factory.cc index 172e9f03c7..130bbe0418 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.cc +++ b/src/core/ext/filters/client_channel/client_channel_factory.cc @@ -30,7 +30,7 @@ void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory) { } grpc_subchannel* grpc_client_channel_factory_create_subchannel( - grpc_client_channel_factory* factory, const grpc_subchannel_args* args) { + grpc_client_channel_factory* factory, const grpc_channel_args* args) { return factory->vtable->create_subchannel(factory, args); } diff --git a/src/core/ext/filters/client_channel/client_channel_factory.h b/src/core/ext/filters/client_channel/client_channel_factory.h index 601ec46b2a..91dec12282 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.h +++ b/src/core/ext/filters/client_channel/client_channel_factory.h @@ -49,7 +49,7 @@ struct grpc_client_channel_factory_vtable { void (*ref)(grpc_client_channel_factory* factory); void (*unref)(grpc_client_channel_factory* factory); grpc_subchannel* (*create_subchannel)(grpc_client_channel_factory* factory, - const grpc_subchannel_args* args); + const grpc_channel_args* args); grpc_channel* (*create_client_channel)(grpc_client_channel_factory* factory, const char* target, grpc_client_channel_type type, @@ -61,7 +61,7 @@ void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory); /** Create a new grpc_subchannel */ grpc_subchannel* grpc_client_channel_factory_create_subchannel( - grpc_client_channel_factory* factory, const grpc_subchannel_args* args); + grpc_client_channel_factory* factory, const grpc_channel_args* args); /** Create a new grpc_channel */ grpc_channel* grpc_client_channel_factory_create_channel( diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 6f31a643c1..1d0ecbe3f6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -509,12 +509,10 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( GRPC_ARG_SERVER_ADDRESS_LIST, GRPC_ARG_INHIBIT_HEALTH_CHECKING}; // Create a subchannel for each address. - grpc_subchannel_args sc_args; for (size_t i = 0; i < addresses.size(); i++) { // If there were any balancer addresses, we would have chosen grpclb // policy, which does not use a SubchannelList. GPR_ASSERT(!addresses[i].IsBalancer()); - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); InlinedVector<grpc_arg, 4> args_to_add; args_to_add.emplace_back( grpc_create_subchannel_address_arg(&addresses[i].address())); @@ -527,9 +525,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add.data(), args_to_add.size()); gpr_free(args_to_add[0].value.string); - sc_args.args = new_args; grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( - client_channel_factory, &sc_args); + client_channel_factory, new_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) { // Subchannel could not be created. diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index dff213efc6..640a052e91 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -537,7 +537,7 @@ struct HealthCheckParams { } // namespace grpc_core grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, - const grpc_subchannel_args* args) { + const grpc_channel_args* args) { grpc_subchannel_key* key = grpc_subchannel_key_create(args); grpc_subchannel* c = grpc_subchannel_index_find(key); if (c) { @@ -554,11 +554,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address* addr = static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr))); - grpc_get_subchannel_address_arg(args->args, addr); + grpc_get_subchannel_address_arg(args, addr); grpc_resolved_address* new_address = nullptr; grpc_channel_args* new_args = nullptr; - if (grpc_proxy_mappers_map_address(addr, args->args, &new_address, - &new_args)) { + if (grpc_proxy_mappers_map_address(addr, args, &new_address, &new_args)) { GPR_ASSERT(new_address != nullptr); gpr_free(addr); addr = new_address; @@ -567,7 +566,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); c->args = grpc_channel_args_copy_and_add_and_remove( - new_args != nullptr ? new_args : args->args, keys_to_remove, + new_args != nullptr ? new_args : args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); gpr_free(new_arg.value.string); if (new_args != nullptr) grpc_channel_args_destroy(new_args); @@ -580,7 +579,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE, "subchannel"); grpc_core::BackOff::Options backoff_options; - parse_args_for_backoff_values(args->args, &backoff_options, + parse_args_for_backoff_values(args, &backoff_options, &c->min_connect_timeout_ms); c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index d0c0a672fa..8c994c64f5 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -38,7 +38,6 @@ address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; -typedef struct grpc_subchannel_args grpc_subchannel_args; typedef struct grpc_subchannel_key grpc_subchannel_key; #ifndef NDEBUG @@ -186,16 +185,9 @@ void grpc_subchannel_call_set_cleanup_closure( grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_subchannel_call* subchannel_call); -struct grpc_subchannel_args { - /* When updating this struct, also update subchannel_index.c */ - - /** Channel arguments to be supplied to the newly created channel */ - const grpc_channel_args* args; -}; - /** create a subchannel given a connector */ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, - const grpc_subchannel_args* args); + const grpc_channel_args* args); /// Sets \a addr from \a args. void grpc_get_subchannel_address_arg(const grpc_channel_args* args, diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index 0ae7898c5a..d0ceda8312 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -39,38 +39,37 @@ static gpr_mu g_mu; static gpr_refcount g_refcount; struct grpc_subchannel_key { - grpc_subchannel_args args; + grpc_channel_args* args; }; static bool g_force_creation = false; static grpc_subchannel_key* create_key( - const grpc_subchannel_args* args, + const grpc_channel_args* args, grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) { grpc_subchannel_key* k = static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k))); - k->args.args = copy_channel_args(args->args); + k->args = copy_channel_args(args); return k; } -grpc_subchannel_key* grpc_subchannel_key_create( - const grpc_subchannel_args* args) { +grpc_subchannel_key* grpc_subchannel_key_create(const grpc_channel_args* args) { return create_key(args, grpc_channel_args_normalize); } static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) { - return create_key(&k->args, grpc_channel_args_copy); + return create_key(k->args, grpc_channel_args_copy); } int grpc_subchannel_key_compare(const grpc_subchannel_key* a, const grpc_subchannel_key* b) { // To pretend the keys are different, return a non-zero value. if (GPR_UNLIKELY(g_force_creation)) return 1; - return grpc_channel_args_compare(a->args.args, b->args.args); + return grpc_channel_args_compare(a->args, b->args); } void grpc_subchannel_key_destroy(grpc_subchannel_key* k) { - grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args)); + grpc_channel_args_destroy(k->args); gpr_free(k); } diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index c135613d26..429634bd54 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -27,8 +27,7 @@ shared amongst channels */ /** Create a key that can be used to uniquely identify a subchannel */ -grpc_subchannel_key* grpc_subchannel_key_create( - const grpc_subchannel_args* args); +grpc_subchannel_key* grpc_subchannel_key_create(const grpc_channel_args* args); /** Destroy a subchannel key */ void grpc_subchannel_key_destroy(grpc_subchannel_key* key); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc index e6c8c38260..a5bf1bf21d 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc @@ -40,14 +40,12 @@ static void client_channel_factory_unref( grpc_client_channel_factory* cc_factory) {} static grpc_subchannel* client_channel_factory_create_subchannel( - grpc_client_channel_factory* cc_factory, const grpc_subchannel_args* args) { - grpc_subchannel_args final_sc_args; - memcpy(&final_sc_args, args, sizeof(*args)); - final_sc_args.args = grpc_default_authority_add_if_not_present(args->args); + grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) { + grpc_channel_args* new_args = grpc_default_authority_add_if_not_present(args); grpc_connector* connector = grpc_chttp2_connector_create(); - grpc_subchannel* s = grpc_subchannel_create(connector, &final_sc_args); + grpc_subchannel* s = grpc_subchannel_create(connector, new_args); grpc_connector_unref(connector); - grpc_channel_args_destroy(const_cast<grpc_channel_args*>(final_sc_args.args)); + grpc_channel_args_destroy(new_args); return s; } diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index 9612698e96..ddd538faa8 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -46,10 +46,10 @@ static void client_channel_factory_ref( static void client_channel_factory_unref( grpc_client_channel_factory* cc_factory) {} -static grpc_subchannel_args* get_secure_naming_subchannel_args( - const grpc_subchannel_args* args) { +static grpc_channel_args* get_secure_naming_channel_args( + const grpc_channel_args* args) { grpc_channel_credentials* channel_credentials = - grpc_channel_credentials_find_in_args(args->args); + grpc_channel_credentials_find_in_args(args); if (channel_credentials == nullptr) { gpr_log(GPR_ERROR, "Can't create subchannel: channel credentials missing for secure " @@ -57,7 +57,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args( return nullptr; } // Make sure security connector does not already exist in args. - if (grpc_security_connector_find_in_args(args->args) != nullptr) { + if (grpc_security_connector_find_in_args(args) != nullptr) { gpr_log(GPR_ERROR, "Can't create subchannel: security connector already present in " "channel args."); @@ -65,19 +65,18 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args( } // To which address are we connecting? By default, use the server URI. const grpc_arg* server_uri_arg = - grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); + grpc_channel_args_find(args, GRPC_ARG_SERVER_URI); const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg); GPR_ASSERT(server_uri_str != nullptr); grpc_uri* server_uri = grpc_uri_parse(server_uri_str, true /* supress errors */); GPR_ASSERT(server_uri != nullptr); const grpc_core::TargetAuthorityTable* target_authority_table = - grpc_core::FindTargetAuthorityTableInArgs(args->args); + grpc_core::FindTargetAuthorityTableInArgs(args); grpc_core::UniquePtr<char> authority; if (target_authority_table != nullptr) { // Find the authority for the target. - const char* target_uri_str = - grpc_get_subchannel_address_uri_arg(args->args); + const char* target_uri_str = grpc_get_subchannel_address_uri_arg(args); grpc_uri* target_uri = grpc_uri_parse(target_uri_str, false /* suppress errors */); GPR_ASSERT(target_uri != nullptr); @@ -100,15 +99,14 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args( } grpc_arg args_to_add[2]; size_t num_args_to_add = 0; - if (grpc_channel_args_find(args->args, GRPC_ARG_DEFAULT_AUTHORITY) == - nullptr) { + if (grpc_channel_args_find(args, GRPC_ARG_DEFAULT_AUTHORITY) == nullptr) { // If the channel args don't already contain GRPC_ARG_DEFAULT_AUTHORITY, add // the arg, setting it to the value just obtained. args_to_add[num_args_to_add++] = grpc_channel_arg_string_create( const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), authority.get()); } grpc_channel_args* args_with_authority = - grpc_channel_args_copy_and_add(args->args, args_to_add, num_args_to_add); + grpc_channel_args_copy_and_add(args, args_to_add, num_args_to_add); grpc_uri_destroy(server_uri); // Create the security connector using the credentials and target name. grpc_channel_args* new_args_from_connector = nullptr; @@ -137,29 +135,21 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args( grpc_channel_args_destroy(new_args_from_connector); } grpc_channel_args_destroy(args_with_authority); - grpc_subchannel_args* final_sc_args = - static_cast<grpc_subchannel_args*>(gpr_malloc(sizeof(*final_sc_args))); - memcpy(final_sc_args, args, sizeof(*args)); - final_sc_args->args = new_args; - return final_sc_args; + return new_args; } static grpc_subchannel* client_channel_factory_create_subchannel( - grpc_client_channel_factory* cc_factory, const grpc_subchannel_args* args) { - grpc_subchannel_args* subchannel_args = - get_secure_naming_subchannel_args(args); - if (subchannel_args == nullptr) { - gpr_log( - GPR_ERROR, - "Failed to create subchannel arguments during subchannel creation."); + grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) { + grpc_channel_args* new_args = get_secure_naming_channel_args(args); + if (new_args == nullptr) { + gpr_log(GPR_ERROR, + "Failed to create channel args during subchannel creation."); return nullptr; } grpc_connector* connector = grpc_chttp2_connector_create(); - grpc_subchannel* s = grpc_subchannel_create(connector, subchannel_args); + grpc_subchannel* s = grpc_subchannel_create(connector, new_args); grpc_connector_unref(connector); - grpc_channel_args_destroy( - const_cast<grpc_channel_args*>(subchannel_args->args)); - gpr_free(subchannel_args); + grpc_channel_args_destroy(new_args); return s; } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 349d8681d5..ade88da4cb 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -335,6 +335,9 @@ static void add_to_storage(struct stream_obj* s, /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ op_and_state* new_op = grpc_core::New<op_and_state>(s, *op); + // Pontential fix to crash on GPR_ASSERT(!curr->done) + // TODO (mxyan): check if this is indeed necessary. + new_op->done = false; gpr_mu_lock(&s->mu); storage->head = new_op; storage->num_pending_ops++; @@ -391,7 +394,7 @@ static void execute_from_storage(stream_obj* s) { gpr_mu_lock(&s->mu); for (struct op_and_state* curr = s->storage.head; curr != nullptr;) { CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); - GPR_ASSERT(curr->done == 0); + GPR_ASSERT(!curr->done); enum e_op_result result = execute_stream_op(curr); CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, op_result_string(result)); @@ -400,13 +403,12 @@ static void execute_from_storage(stream_obj* s) { struct op_and_state* next = curr->next; remove_from_storage(s, curr); curr = next; - } - /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */ - if (result == NO_ACTION_POSSIBLE) { + } else if (result == NO_ACTION_POSSIBLE) { curr = curr->next; } else if (result == ACTION_TAKEN_WITH_CALLBACK) { + /* wait for the callback */ break; - } + } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */ } gpr_mu_unlock(&s->mu); } diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 7c0062eb4e..402f8904ea 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -29,6 +29,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); @@ -228,8 +229,14 @@ bool grpc_combiner_continue_exec_ctx() { grpc_core::ExecCtx::Get()->IsReadyToFinish(), lock->time_to_execute_final_list)); + // offload only if all the following conditions are true: + // 1. the combiner is contended and has more than one closure to execute + // 2. the current execution context needs to finish as soon as possible + // 3. the DEFAULT executor is threaded + // 4. the current thread is not a worker for any background poller if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - grpc_executor_is_threaded()) { + grpc_executor_is_threaded() && + !grpc_iomgr_is_any_background_poller_thread()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be // picked up on the executor diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 4b8c891e9b..9eb4c089d8 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1287,6 +1289,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7a4870db78..0a0891013a 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1644,6 +1646,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 67cbfbbd02..c479206410 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() { * event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1828,6 +1830,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 32d1b6c43e..fb2e70eee4 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -399,6 +399,10 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { g_event_engine->pollset_set_del_fd(pollset_set, fd); } +bool grpc_is_any_background_poller_thread(void) { + return g_event_engine->is_any_background_poller_thread(); +} + void grpc_shutdown_background_closure(void) { g_event_engine->shutdown_background_closure(); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 812c7a0f0f..94ac9fdba6 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -80,6 +80,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); + bool (*is_any_background_poller_thread)(void); void (*shutdown_background_closure)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; @@ -181,6 +182,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd); void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); +/* Returns true if the caller is a worker thread for any background poller. */ +bool grpc_is_any_background_poller_thread(); + /* Shut down all the closures registered in the background poller. */ void grpc_shutdown_background_closure(); diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index eb29973514..a492146857 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -161,6 +161,10 @@ void grpc_iomgr_shutdown_background_closure() { grpc_iomgr_platform_shutdown_background_closure(); } +bool grpc_iomgr_is_any_background_poller_thread() { + return grpc_iomgr_platform_is_any_background_poller_thread(); +} + void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) { obj->name = gpr_strdup(name); gpr_mu_lock(&g_mu); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 8ea9289e06..6261aa550c 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -39,6 +39,9 @@ void grpc_iomgr_shutdown(); * background poller. */ void grpc_iomgr_shutdown_background_closure(); +/** Returns true if the caller is a worker thread for any background poller. */ +bool grpc_iomgr_is_any_background_poller_thread(); + /* Exposed only for testing */ size_t grpc_iomgr_count_objects_for_testing(); diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index 4b112c9097..e1cd8f7310 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -41,10 +41,14 @@ static void iomgr_platform_init(void) { static void iomgr_platform_flush(void) {} static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } static void iomgr_platform_shutdown_background_closure(void) {} +static bool iomgr_platform_is_any_background_poller_thread(void) { + return false; +} static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_custom_iomgr_init(grpc_socket_vtable* socket, grpc_custom_resolver_vtable* resolver, diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc index b6c9211865..e68b1cf581 100644 --- a/src/core/lib/iomgr/iomgr_internal.cc +++ b/src/core/lib/iomgr/iomgr_internal.cc @@ -45,3 +45,7 @@ void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); } void grpc_iomgr_platform_shutdown_background_closure() { iomgr_platform_vtable->shutdown_background_closure(); } + +bool grpc_iomgr_platform_is_any_background_poller_thread() { + return iomgr_platform_vtable->is_any_background_poller_thread(); +} diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index bca7409907..2250ad9a18 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -36,6 +36,7 @@ typedef struct grpc_iomgr_platform_vtable { void (*flush)(void); void (*shutdown)(void); void (*shutdown_background_closure)(void); + bool (*is_any_background_poller_thread)(void); } grpc_iomgr_platform_vtable; void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name); @@ -56,6 +57,9 @@ void grpc_iomgr_platform_shutdown(void); /** shut down all the closures registered in the background poller */ void grpc_iomgr_platform_shutdown_background_closure(void); +/** return true is the caller is a worker thread for any background poller */ +bool grpc_iomgr_platform_is_any_background_poller_thread(void); + bool grpc_iomgr_abort_on_leaks(void); #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 9386adf060..278c8de688 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -55,9 +55,14 @@ static void iomgr_platform_shutdown_background_closure(void) { grpc_shutdown_background_closure(); } +static bool iomgr_platform_is_any_background_poller_thread(void) { + return grpc_is_any_background_poller_thread(); +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 552ef4309c..462ac41fcd 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -58,9 +58,14 @@ static void iomgr_platform_shutdown_background_closure(void) { grpc_shutdown_background_closure(); } +static bool iomgr_platform_is_any_background_poller_thread(void) { + return grpc_is_any_background_poller_thread(); +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { char* enable_cfstream = getenv(grpc_cfstream_env_var); diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index 24ef0dba7b..0579e16aa7 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -73,9 +73,14 @@ static void iomgr_platform_shutdown(void) { static void iomgr_platform_shutdown_background_closure(void) {} +static bool iomgr_platform_is_any_background_poller_thread(void) { + return false; +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable); |