diff options
Diffstat (limited to 'src')
48 files changed, 577 insertions, 1709 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 013096d46b..3ec539884b 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -276,7 +276,7 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, w->chand = chand; grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_combiner_scheduler(chand->combiner)); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state, @@ -637,7 +637,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, start_transport_op_locked, - op, grpc_combiner_scheduler(chand->combiner, false)), + op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } @@ -668,7 +668,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - chand->combiner = grpc_combiner_create(NULL); + chand->combiner = grpc_combiner_create(); gpr_mu_init(&chand->info_mu); gpr_mu_init(&chand->external_connectivity_watcher_list_mu); @@ -679,7 +679,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_combiner_scheduler(chand->combiner)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -738,9 +738,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { grpc_closure_sched( - exec_ctx, - grpc_closure_create(shutdown_resolver_locked, chand->resolver, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_create(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { @@ -771,11 +770,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, * PER-CALL FUNCTIONS */ -#define GET_CALL(call_data) \ - ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call))) - -#define CANCELLED_CALL ((grpc_subchannel_call *)1) - /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting @@ -796,11 +790,9 @@ typedef struct client_channel_call_data { grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; - grpc_error *cancel_error; - - /** either 0 for no call, 1 for cancelled, or a pointer to a - grpc_subchannel_call */ - gpr_atm subchannel_call; + /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest + bit is 0), or a pointer to an error (if the lowest bit is 1) */ + gpr_atm subchannel_call_or_error; gpr_arena *arena; bool pick_pending; @@ -822,10 +814,43 @@ typedef struct client_channel_call_data { grpc_closure *original_on_complete; } call_data; +typedef struct { + grpc_subchannel_call *subchannel_call; + grpc_error *error; +} call_or_error; + +static call_or_error get_call_or_error(call_data *p) { + gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error); + if (c == 0) + return (call_or_error){NULL, NULL}; + else if (c & 1) + return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)}; + else + return (call_or_error){(grpc_subchannel_call *)c, NULL}; +} + +static bool set_call_or_error(call_data *p, call_or_error coe) { + // this should always be under a lock + call_or_error existing = get_call_or_error(p); + if (existing.error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(coe.error); + return false; + } + GPR_ASSERT(existing.subchannel_call == NULL); + if (coe.error != GRPC_ERROR_NONE) { + GPR_ASSERT(coe.subchannel_call == NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error); + } else { + GPR_ASSERT(coe.subchannel_call != NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, + (gpr_atm)coe.subchannel_call); + } + return true; +} + grpc_subchannel_call *grpc_client_channel_get_subchannel_call( grpc_call_element *call_elem) { - grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data); - return scc == CANCELLED_CALL ? NULL : scc; + return get_call_or_error(call_elem->call_data).subchannel_call; } static void add_waiting_locked(call_data *calld, @@ -857,18 +882,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { return; } - grpc_subchannel_call *call = GET_CALL(calld); + call_or_error call = get_call_or_error(calld); grpc_transport_stream_op_batch **ops = calld->waiting_ops; size_t nops = calld->waiting_ops_count; - if (call == CANCELLED_CALL) { - fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); + if (call.error != GRPC_ERROR_NONE) { + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error)); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; for (size_t i = 0; i < nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]); } gpr_free(ops); } @@ -929,19 +954,23 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); + call_or_error coe = get_call_or_error(calld); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL); - fail_locked(exec_ctx, calld, - error == GRPC_ERROR_NONE - ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Call dropped by load balancing policy") - : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1)); - } else if (GET_CALL(calld) == CANCELLED_CALL) { + grpc_error *failure = + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1); + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); + fail_locked(exec_ctx, calld, failure); + } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ + grpc_error *child_errors[] = {error, coe.error}; grpc_error *cancellation_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Cancelled before creating subchannel", &error, 1); + "Cancelled before creating subchannel", child_errors, + GPR_ARRAY_SIZE(child_errors)); /* if due to deadline, attach the deadline exceeded status to the error */ if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) { cancellation_error = @@ -961,8 +990,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - gpr_atm_rel_store(&calld->subchannel_call, - (gpr_atm)(uintptr_t)subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); fail_locked(exec_ctx, calld, new_error); @@ -975,8 +1004,9 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; - grpc_subchannel_call *subchannel_call = GET_CALL(calld); - if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) { + grpc_subchannel_call *subchannel_call = + get_call_or_error(calld).subchannel_call; + if (subchannel_call == NULL) { return NULL; } else { return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); @@ -1118,7 +1148,7 @@ static bool pick_subchannel_locked( cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, - grpc_combiner_scheduler(chand->combiner, true)); + grpc_combiner_scheduler(chand->combiner)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { @@ -1135,58 +1165,45 @@ static void start_transport_stream_op_batch_locked_inner( grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_subchannel_call *call; /* need to recheck that another thread hasn't set the call */ - call = GET_CALL(calld); - if (call == CANCELLED_CALL) { + call_or_error coe = get_call_or_error(calld); + if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + exec_ctx, op, GRPC_ERROR_REF(coe.error)); /* early out */ return; } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); /* early out */ return; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_stream) { - if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, - (gpr_atm)(uintptr_t)CANCELLED_CALL)) { - /* recurse to retry */ - start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); - /* early out */ - return; + grpc_error *error = op->payload->cancel_stream.cancel_error; + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any ops are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first op does get passed down. */ + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); + if (calld->pick_pending) { + cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } else { - /* Stash a copy of cancel_error in our call data, so that we can use - it for subsequent operations. This ensures that if the call is - cancelled before any ops are passed down (e.g., if the deadline - is in the past when the call starts), we can return the right - error to the caller when the first op does get passed down. */ - calld->cancel_error = - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); - if (calld->pick_pending) { - cancel_pick_locked( - exec_ctx, elem, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - } else { - fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - } - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - /* early out */ - return; + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); } + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, + GRPC_ERROR_REF(error)); + /* early out */ + return; } /* if we don't have a subchannel, try to get one */ if (!calld->pick_pending && calld->connected_subchannel == NULL && op->send_initial_metadata) { calld->pick_pending = true; grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, - grpc_combiner_scheduler(chand->combiner, true)); + grpc_combiner_scheduler(chand->combiner)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so @@ -1200,10 +1217,10 @@ static void start_transport_stream_op_batch_locked_inner( calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, - (gpr_atm)CANCELLED_CALL); grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); + set_call_or_error(calld, + (call_or_error){.error = GRPC_ERROR_REF(error)}); fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; // Early out. @@ -1225,8 +1242,8 @@ static void start_transport_stream_op_batch_locked_inner( .context = calld->subchannel_call_context}; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - gpr_atm_rel_store(&calld->subchannel_call, - (gpr_atm)(uintptr_t)subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (error != GRPC_ERROR_NONE) { fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); @@ -1305,17 +1322,17 @@ static void cc_start_transport_stream_op_batch( op); } /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); + call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); - if (call == CANCELLED_CALL) { + if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + exec_ctx, op, GRPC_ERROR_REF(coe.error)); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; @@ -1324,10 +1341,9 @@ static void cc_start_transport_stream_op_batch( GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); op->handler_private.extra_arg = elem; grpc_closure_sched( - exec_ctx, - grpc_closure_init(&op->handler_private.closure, - start_transport_stream_op_batch_locked, op, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_init(&op->handler_private.closure, + start_transport_stream_op_batch_locked, op, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1364,12 +1380,14 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, if (calld->method_params != NULL) { method_parameters_unref(calld->method_params); } - GRPC_ERROR_UNREF(calld->cancel_error); - grpc_subchannel_call *call = GET_CALL(calld); - if (call != NULL && call != CANCELLED_CALL) { - grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + call_or_error coe = get_call_or_error(calld); + GRPC_ERROR_UNREF(coe.error); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call, + then_schedule_closure); then_schedule_closure = NULL; - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, + "client_channel_destroy_call"); } GPR_ASSERT(!calld->pick_pending); GPR_ASSERT(calld->waiting_ops_count == 0); @@ -1439,9 +1457,8 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); grpc_closure_sched( - exec_ctx, - grpc_closure_create(try_to_connect_locked, chand, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_create(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } return out; @@ -1578,6 +1595,6 @@ void grpc_client_channel_watch_connectivity_state( grpc_closure_sched( exec_ctx, grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, - grpc_combiner_scheduler(chand->combiner, true)), + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index e3efb735df..0c8e179925 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -74,11 +74,10 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { - grpc_closure_sched( - exec_ctx, - grpc_closure_create(shutdown_locked, policy, - grpc_combiner_scheduler(policy->combiner, false)), - GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, grpc_closure_create( + shutdown_locked, policy, + grpc_combiner_scheduler(policy->combiner)), + GRPC_ERROR_NONE); } else { grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 5ecbd3ba93..d3182f35fe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -741,7 +741,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_zalloc(sizeof(rr_connectivity_data)); grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); rr_connectivity->glb_policy = glb_policy; rr_connectivity->state = rr_state; @@ -1006,7 +1006,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_closure_init(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, - grpc_combiner_scheduler(args->combiner, false)); + grpc_combiner_scheduler(args->combiner)); grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); @@ -1252,7 +1252,7 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, gpr_time_add(now, glb_policy->client_stats_report_interval); grpc_closure_init(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, next_client_load_report_time, &glb_policy->client_load_report_closure, now); @@ -1280,7 +1280,7 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, op.data.send_message.send_message = glb_policy->client_load_report_payload; grpc_closure_init(&glb_policy->client_load_report_closure, client_load_report_done_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_call_error call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); @@ -1386,13 +1386,13 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_closure_init(&glb_policy->lb_on_sent_initial_request, lb_on_sent_initial_request_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_closure_init(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_closure_init(&glb_policy->lb_on_response_received, lb_on_response_received_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); gpr_backoff_init(&glb_policy->lb_call_backoff_state, GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, @@ -1693,9 +1693,9 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); - grpc_closure_init( - &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, - glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_closure_init(&glb_policy->lb_on_call_retry, + lb_call_on_retry_timer_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->retry_timer_active = true; grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry, now); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index 51cc632649..0e2b4131f1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -655,7 +655,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, - grpc_combiner_scheduler(args->combiner, false)); + grpc_combiner_scheduler(args->combiner)); return &p->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 33d9522380..9ecb0a39da 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -749,7 +749,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, sd->subchannel = subchannel; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner, false)); + grpc_combiner_scheduler(args->combiner)); /* use some sentinel value outside of the range of * grpc_connectivity_state to signal an undefined previous state. We * won't be referring to this value again and it'll be overwritten after diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index 2bf44a4f1b..92f060b422 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -268,10 +268,10 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx, GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); grpc_closure_init(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); grpc_closure_init(&r->dns_ares_on_resolved_locked, dns_ares_on_resolved_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c index afa978ff01..6c263c32f9 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c @@ -179,7 +179,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "retrying immediately"); } grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { @@ -201,7 +201,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, grpc_closure_create(dns_on_resolved_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)), + grpc_combiner_scheduler(r->base.combiner)), &r->addresses); } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index 59560e068d..cbb0e628ed 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -158,10 +158,9 @@ void grpc_fake_resolver_response_generator_set_response( GPR_ASSERT(generator->resolver != NULL); generator->next_response = grpc_channel_args_copy(next_response); grpc_closure_sched( - exec_ctx, - grpc_closure_create( - set_response_cb, generator, - grpc_combiner_scheduler(generator->resolver->base.combiner, false)), + exec_ctx, grpc_closure_create(set_response_cb, generator, + grpc_combiner_scheduler( + generator->resolver->base.combiner)), GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 0428cf4e5b..bdebc8eeec 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -35,7 +35,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -252,7 +251,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ep = ep; /* one ref is for destroy */ gpr_ref_init(&t->refs, 1); - t->combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep)); + t->combiner = grpc_combiner_create(); t->peer_string = grpc_endpoint_get_peer(ep); t->endpoint_reading = 1; t->next_stream_id = is_client ? 1 : 2; @@ -273,29 +272,29 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->write_action, write_action, t, grpc_schedule_on_exec_ctx); grpc_closure_init(&t->read_action_locked, read_action_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->retry_initiate_ping_locked, retry_initiate_ping_locked, - t, grpc_combiner_scheduler(t->combiner, false)); + t, grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, - t, grpc_combiner_scheduler(t->combiner, false)); + t, grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->start_keepalive_ping_locked, start_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->finish_keepalive_ping_locked, finish_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); @@ -338,7 +337,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - grpc_chttp2_initiate_write(exec_ctx, t, false, "initial_write"); + grpc_chttp2_initiate_write(exec_ctx, t, "initial_write"); } /* configure http2 the way we like it */ @@ -550,7 +549,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); + grpc_chttp2_initiate_write(exec_ctx, t, "init"); post_benign_reclaimer(exec_ctx, t); } @@ -568,9 +567,9 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_closure_sched(exec_ctx, grpc_closure_create( - destroy_transport_locked, t, - grpc_combiner_scheduler(t->combiner, false)), + grpc_closure_sched(exec_ctx, + grpc_closure_create(destroy_transport_locked, t, + grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); } @@ -663,7 +662,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_slice_buffer_init(&s->frame_storage); s->pending_byte_stream = false; grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); @@ -747,7 +746,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, - grpc_combiner_scheduler(t->combiner, false)), + grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("destroy_stream", 0); } @@ -785,8 +784,6 @@ static const char *write_state_name(grpc_chttp2_write_state st) { return "WRITING"; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: return "WRITING+MORE"; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: - return "WRITING+MORE+COVERED"; } GPR_UNREACHABLE_CODE(return "UNKNOWN"); } @@ -809,8 +806,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - bool covered_by_poller, const char *reason) { + grpc_chttp2_transport *t, const char *reason) { GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); switch (t->write_state) { @@ -819,28 +815,16 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_closure_sched( exec_ctx, - grpc_closure_init( - &t->write_action_begin_locked, write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)), + grpc_closure_init(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: - set_write_state( - exec_ctx, t, - covered_by_poller - ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER - : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, - reason); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, + reason); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: - if (covered_by_poller) { - set_write_state( - exec_ctx, t, - GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, - reason); - } - break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: break; } GPR_TIMER_END("grpc_chttp2_initiate_write", 0); @@ -856,10 +840,10 @@ void grpc_chttp2_become_writable( case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK: break; case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED: - grpc_chttp2_initiate_write(exec_ctx, t, true, reason); + grpc_chttp2_initiate_write(exec_ctx, t, reason); break; case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED: - grpc_chttp2_initiate_write(exec_ctx, t, false, reason); + grpc_chttp2_initiate_write(exec_ctx, t, reason); break; } } @@ -896,7 +880,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { grpc_endpoint_write( exec_ctx, t->ep, &t->outbuf, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner, false))); + grpc_combiner_scheduler(t->combiner))); GPR_TIMER_END("write_action", 0); } @@ -932,21 +916,9 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_closure_run( exec_ctx, - grpc_closure_init( - &t->write_action_begin_locked, write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner, false)), - GRPC_ERROR_NONE); - break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: - GPR_TIMER_MARK("state=writing_stale_with_poller", 0); - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, - "continue writing [covered]"); - GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_closure_run( - exec_ctx, grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner, true)), + grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); break; } @@ -969,7 +941,7 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) { t->settings[GRPC_LOCAL_SETTINGS][id] = use_value; t->dirtied_local_settings = 1; - grpc_chttp2_initiate_write(exec_ctx, t, false, "push_setting"); + grpc_chttp2_initiate_write(exec_ctx, t, "push_setting"); } } @@ -1365,7 +1337,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->next_message_end_offset = s->flow_controlled_bytes_written + (int64_t)s->flow_controlled_buffer.length + (int64_t)len; - s->complete_fetch_covered_by_poller = op->covered_by_poller; if (flags & GRPC_WRITE_BUFFER_HINT) { s->next_message_end_offset -= t->write_buffer_size; s->write_buffering = true; @@ -1487,9 +1458,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, - grpc_closure_init( - &op->handler_private.closure, perform_stream_op_locked, op, - grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), + grpc_closure_init(&op->handler_private.closure, perform_stream_op_locked, + op, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); } @@ -1516,7 +1486,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_NONE); if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, GRPC_ERROR_NONE)) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, "send_ping"); } } @@ -1524,7 +1494,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = tp; t->ping_state.is_delayed_ping_timer_set = false; - grpc_chttp2_initiate_write(exec_ctx, t, false, "retry_send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping"); } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1539,7 +1509,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings"); + grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings"); } } @@ -1552,7 +1522,7 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &slice, &http_error); grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error, grpc_slice_ref_internal(slice), &t->qbuf); - grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent"); + grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent"); GRPC_ERROR_UNREF(error); } @@ -1578,12 +1548,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t = op->handler_private.extra_arg; grpc_error *close_transport = op->disconnect_with_error; - if (op->on_connectivity_state_change != NULL) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change); - } - if (op->goaway_error) { send_goaway(exec_ctx, t, op->goaway_error); } @@ -1607,6 +1571,12 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, op->send_ping); } + if (op->on_connectivity_state_change != NULL) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); + } + if (close_transport != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, close_transport); } @@ -1623,11 +1593,11 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(msg); op->handler_private.extra_arg = gt; GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - grpc_closure_sched( - exec_ctx, grpc_closure_init(&op->handler_private.closure, - perform_transport_op_locked, op, - grpc_combiner_scheduler(t->combiner, false)), - GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, + grpc_closure_init(&op->handler_private.closure, + perform_transport_op_locked, op, + grpc_combiner_scheduler(t->combiner)), + GRPC_ERROR_NONE); } /******************************************************************************* @@ -1782,7 +1752,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream"); } } if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) { @@ -2095,7 +2065,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &s->stats.outgoing)); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error); - grpc_chttp2_initiate_write(exec_ctx, t, false, "close_from_api"); + grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api"); } typedef struct { @@ -2626,9 +2596,9 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, bs->next_action.on_complete = on_complete; grpc_closure_sched( exec_ctx, - grpc_closure_init( - &bs->next_action.closure, incoming_byte_stream_next_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner, false)), + grpc_closure_init(&bs->next_action.closure, + incoming_byte_stream_next_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_next", 0); return false; @@ -2683,10 +2653,9 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_closure_sched( - exec_ctx, - grpc_closure_init( - &bs->destroy_action, incoming_byte_stream_destroy_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner, false)), + exec_ctx, grpc_closure_init( + &bs->destroy_action, incoming_byte_stream_destroy_locked, + bs, grpc_combiner_scheduler(bs->transport->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_destroy", 0); } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 04354e0dc2..3d7c6fbfad 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -117,7 +117,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks)); } t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes; - grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response"); + grpc_chttp2_initiate_write(exec_ctx, t, "ping response"); } } } diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 815a87cbe3..682be2c89b 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -109,8 +109,7 @@ grpc_error *grpc_chttp2_window_update_parser_parse( received_update); bool is_zero = t->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_initiate_write(exec_ctx, t, false, - "new_global_flow_control"); + grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control"); } } } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 7476ff6188..ab5f3288ac 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1649,7 +1649,7 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, false, "force_rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream"); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); @@ -1697,9 +1697,9 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, and can avoid the extra write */ GRPC_CHTTP2_STREAM_REF(s, "final_rst"); grpc_closure_sched( - exec_ctx, grpc_closure_create(force_client_rst_stream, s, - grpc_combiner_finally_scheduler( - t->combiner, false)), + exec_ctx, + grpc_closure_create(force_client_rst_stream, s, + grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 933f254bde..4041b29fec 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -58,7 +58,6 @@ typedef enum { GRPC_CHTTP2_WRITE_STATE_IDLE, GRPC_CHTTP2_WRITE_STATE_WRITING, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, - GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, } grpc_chttp2_write_state; typedef enum { @@ -443,7 +442,6 @@ struct grpc_chttp2_stream { grpc_slice fetching_slice; int64_t next_message_end_offset; int64_t flow_controlled_bytes_written; - bool complete_fetch_covered_by_poller; grpc_closure complete_fetch_locked; grpc_closure *fetching_send_message_finished; @@ -534,8 +532,7 @@ struct grpc_chttp2_stream { The actual call chain is documented in the implementation of this function. */ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - bool covered_by_poller, const char *reason); + grpc_chttp2_transport *t, const char *reason); typedef enum { GRPC_CHTTP2_NOTHING_TO_WRITE, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 8bbd9c24b8..941260be9a 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -418,7 +418,7 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, incoming_frame_size); if (t->incoming_window <= target_incoming_window / 2) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); + grpc_chttp2_initiate_write(exec_ctx, t, "flow_control"); } return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 5c3b9f5ada..02cf42283a 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -191,8 +191,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) && stream_ref_if_not_destroyed(&s->refcount->refs)) { - grpc_chttp2_initiate_write(exec_ctx, t, false, - "transport.read_flow_control"); + grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control"); } } } diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 4b8102ea54..f6976c5650 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -24,7 +24,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include "src/core/lib/iomgr/workqueue.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false); @@ -41,93 +41,47 @@ grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false); struct grpc_combiner { grpc_combiner *next_combiner_on_this_exec_ctx; - grpc_workqueue *optional_workqueue; - grpc_closure_scheduler uncovered_scheduler; - grpc_closure_scheduler covered_scheduler; - grpc_closure_scheduler uncovered_finally_scheduler; - grpc_closure_scheduler covered_finally_scheduler; + grpc_closure_scheduler scheduler; + grpc_closure_scheduler finally_scheduler; gpr_mpscq queue; + // either: + // a pointer to the initiating exec ctx if that is the only exec_ctx that has + // ever queued to this combiner, or NULL. If this is non-null, it's not + // dereferencable (since the initiating exec_ctx may have gone out of scope) + gpr_atm initiating_exec_ctx_or_null; // state is: // lower bit - zero if orphaned (STATE_UNORPHANED) // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) gpr_atm state; - // number of elements in the list that are covered by a poller: if >0, we can - // offload safely - gpr_atm elements_covered_by_poller; bool time_to_execute_final_list; - bool final_list_covered_by_poller; grpc_closure_list final_list; grpc_closure offload; gpr_refcount refs; }; -static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, grpc_error *error); -static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, +static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error); +static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); -static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, - grpc_error *error); -static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, - grpc_error *error); - -static const grpc_closure_scheduler_vtable scheduler_uncovered = { - combiner_exec_uncovered, combiner_exec_uncovered, - "combiner:immediately:uncovered"}; -static const grpc_closure_scheduler_vtable scheduler_covered = { - combiner_exec_covered, combiner_exec_covered, - "combiner:immediately:covered"}; -static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = { - combiner_finally_exec_uncovered, combiner_finally_exec_uncovered, - "combiner:finally:uncovered"}; -static const grpc_closure_scheduler_vtable finally_scheduler_covered = { - combiner_finally_exec_covered, combiner_finally_exec_covered, - "combiner:finally:covered"}; -static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); - -typedef struct { - grpc_error *error; - bool covered_by_poller; -} error_data; - -static uintptr_t pack_error_data(error_data d) { - return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0); -} - -static error_data unpack_error_data(uintptr_t p) { - return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1}; -} +static const grpc_closure_scheduler_vtable scheduler = { + combiner_exec, combiner_exec, "combiner:immediately"}; +static const grpc_closure_scheduler_vtable finally_scheduler = { + combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; -static bool is_covered_by_poller(grpc_combiner *lock) { - return lock->final_list_covered_by_poller || - gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0; -} - -#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d" -#define IS_COVERED_BY_POLLER_ARGS(lock) \ - (lock)->final_list_covered_by_poller, \ - gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \ - is_covered_by_poller((lock)) +static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { +grpc_combiner *grpc_combiner_create(void) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); gpr_ref_init(&lock->refs, 1); lock->next_combiner_on_this_exec_ctx = NULL; lock->time_to_execute_final_list = false; - lock->optional_workqueue = optional_workqueue; - lock->final_list_covered_by_poller = false; - lock->uncovered_scheduler.vtable = &scheduler_uncovered; - lock->covered_scheduler.vtable = &scheduler_covered; - lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered; - lock->covered_finally_scheduler.vtable = &finally_scheduler_covered; + lock->scheduler.vtable = &scheduler; + lock->finally_scheduler.vtable = &finally_scheduler; gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); - gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - grpc_closure_init(&lock->offload, offload, lock, - grpc_workqueue_scheduler(lock->optional_workqueue)); + grpc_closure_init(&lock->offload, offload, lock, grpc_executor_scheduler); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } @@ -136,7 +90,6 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock)); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); gpr_mpscq_destroy(&lock->queue); - GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner"); gpr_free(lock); } @@ -193,48 +146,40 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, } } -static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error, - bool covered_by_poller) { +#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ + ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ + offsetof(grpc_combiner, scheduler_name))) + +static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, + grpc_error *error) { GPR_TIMER_BEGIN("combiner.execute", 0); + grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, - cl, covered_by_poller, last)); - GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed - assert(cl->cb); - cl->error_data.scratch = - pack_error_data((error_data){error, covered_by_poller}); - if (covered_by_poller) { - gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1); - } - gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, + lock, cl, last)); if (last == 1) { + gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, + (gpr_atm)exec_ctx); // first element on this list: add it to the list of combiner locks // executing within this exec_ctx push_last_on_exec_ctx(exec_ctx, lock); + } else { + // there may be a race with setting here: if that happens, we may delay + // offload for one or two actions, and that's fine + gpr_atm initiator = + gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null); + if (initiator != 0 && initiator != (gpr_atm)exec_ctx) { + gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0); + } } + GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + assert(cl->cb); + cl->error_data.error = error; + gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); GPR_TIMER_END("combiner.execute", 0); } -#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ - ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ - offsetof(grpc_combiner, scheduler_name))) - -static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, - grpc_error *error) { - combiner_exec(exec_ctx, - COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl, - error, false); -} - -static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, - grpc_error *error) { - combiner_exec(exec_ctx, - COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl, - error, true); -} - static void move_next(grpc_exec_ctx *exec_ctx) { exec_ctx->active_combiner = exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; @@ -250,8 +195,7 @@ static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { move_next(exec_ctx); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock, - lock->optional_workqueue)); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock)); grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE); } @@ -263,22 +207,23 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { return false; } - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, - "C:%p grpc_combiner_continue_exec_ctx workqueue=%p " - "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT - " exec_ctx_ready_to_finish=%d " - "time_to_execute_final_list=%d", - lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock), - grpc_exec_ctx_ready_to_finish(exec_ctx), - lock->time_to_execute_final_list)); - - if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) && - grpc_exec_ctx_ready_to_finish(exec_ctx)) { + bool contended = + gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null) == 0; + + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_continue_exec_ctx " + "contended=%d " + "exec_ctx_ready_to_finish=%d " + "time_to_execute_final_list=%d", + lock, contended, + grpc_exec_ctx_ready_to_finish(exec_ctx), + lock->time_to_execute_final_list)); + + if (contended && grpc_exec_ctx_ready_to_finish(exec_ctx) && + grpc_executor_is_threaded()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); - // this execution context wants to move on, and we have a workqueue (and - // so can help the execution context out): schedule remaining work to be - // picked up on the workqueue + // this execution context wants to move on: schedule remaining work to be + // picked up on the executor queue_offload(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; @@ -295,29 +240,23 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { // queue is in an inconsistent state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); - if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { - queue_offload(exec_ctx, lock); - } + queue_offload(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } GPR_TIMER_BEGIN("combiner.exec1", 0); grpc_closure *cl = (grpc_closure *)n; - error_data err = unpack_error_data(cl->error_data.scratch); + grpc_error *cl_err = cl->error_data.error; #ifndef NDEBUG cl->scheduled = false; #endif - cl->cb(exec_ctx, cl->cb_arg, err.error); - if (err.covered_by_poller) { - gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1); - } - GRPC_ERROR_UNREF(err.error); + cl->cb(exec_ctx, cl->cb_arg, cl_err); + GRPC_ERROR_UNREF(cl_err); GPR_TIMER_END("combiner.exec1", 0); } else { grpc_closure *c = lock->final_list.head; GPR_ASSERT(c != NULL); grpc_closure_list_init(&lock->final_list); - lock->final_list_covered_by_poller = false; int loops = 0; while (c != NULL) { GPR_TIMER_BEGIN("combiner.exec_1final", 0); @@ -383,20 +322,20 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error); -static void combiner_execute_finally(grpc_exec_ctx *exec_ctx, - grpc_combiner *lock, grpc_closure *closure, - grpc_error *error, - bool covered_by_poller) { - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock, - closure, exec_ctx->active_combiner, covered_by_poller)); +static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_error *error) { + grpc_combiner *lock = + COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_execute_finally c=%p; ac=%p", + lock, closure, exec_ctx->active_combiner)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - grpc_closure_sched( - exec_ctx, grpc_closure_create(enqueue_finally, closure, - grpc_combiner_scheduler(lock, false)), - error); + grpc_closure_sched(exec_ctx, + grpc_closure_create(enqueue_finally, closure, + grpc_combiner_scheduler(lock)), + error); GPR_TIMER_END("combiner.execute_finally", 0); return; } @@ -404,42 +343,20 @@ static void combiner_execute_finally(grpc_exec_ctx *exec_ctx, if (grpc_closure_list_empty(lock->final_list)) { gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); } - if (covered_by_poller) { - lock->final_list_covered_by_poller = true; - } grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error) { - combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error), false); -} - -static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, - grpc_closure *cl, - grpc_error *error) { - combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER( - cl, uncovered_finally_scheduler), - cl, error, false); -} - -static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, - grpc_closure *cl, grpc_error *error) { - combiner_execute_finally( - exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler), - cl, error, true); + combiner_finally_exec(exec_ctx, closure, GRPC_ERROR_REF(error)); } -grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner, - bool covered_by_poller) { - return covered_by_poller ? &combiner->covered_scheduler - : &combiner->uncovered_scheduler; +grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner) { + return &combiner->scheduler; } grpc_closure_scheduler *grpc_combiner_finally_scheduler( - grpc_combiner *combiner, bool covered_by_poller) { - return covered_by_poller ? &combiner->covered_finally_scheduler - : &combiner->uncovered_finally_scheduler; + grpc_combiner *combiner) { + return &combiner->finally_scheduler; } diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index d6571ad4c6..a616113ca0 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -33,7 +33,7 @@ // Initialize the lock, with an optional workqueue to shift load to when // necessary -grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); +grpc_combiner *grpc_combiner_create(void); //#define GRPC_COMBINER_REFCOUNT_DEBUG #ifdef GRPC_COMBINER_REFCOUNT_DEBUG @@ -56,11 +56,9 @@ grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); // Fetch a scheduler to schedule closures against -grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, - bool covered_by_poller); +grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock); // Scheduler to execute \a action within the lock just prior to unlocking. -grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock, - bool covered_by_poller); +grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock); bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index 116f18424c..37cce335ca 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -54,10 +54,6 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) { int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); } -grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { - return ep->vtable->get_workqueue(ep); -} - grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) { return ep->vtable->get_resource_user(ep); } diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index ec355d413a..8f0523a981 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -37,7 +37,6 @@ struct grpc_endpoint_vtable { grpc_slice_buffer *slices, grpc_closure *cb); void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb); - grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep); void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -63,9 +62,6 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep); */ int grpc_endpoint_get_fd(grpc_endpoint *ep); -/* Retrieve a reference to the workqueue associated with this endpoint */ -grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); - /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 7d7aa44912..63f6962ede 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -43,7 +43,6 @@ #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -115,7 +114,9 @@ struct grpc_pollset { * Pollset-set Declarations */ -struct grpc_pollset_set {}; +struct grpc_pollset_set { + char unused; +}; /******************************************************************************* * Common helpers @@ -268,10 +269,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - return (grpc_workqueue *)0xb0b51ed; -} - static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { grpc_lfev_set_ready(exec_ctx, &fd->read_closure); @@ -298,8 +295,6 @@ GPR_TLS_DECL(g_current_thread_worker); static gpr_atm g_active_poller; static pollset_neighbourhood *g_neighbourhoods; static size_t g_num_neighbourhoods; -static gpr_mu g_wq_mu; -static grpc_closure_list g_wq_items; /* Return true if first in list */ static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { @@ -348,8 +343,6 @@ static grpc_error *pollset_global_init(void) { gpr_atm_no_barrier_store(&g_active_poller, 0); global_wakeup_fd.read_fd = -1; grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd); - gpr_mu_init(&g_wq_mu); - g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; if (err != GRPC_ERROR_NONE) return err; struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), .data.ptr = &global_wakeup_fd}; @@ -368,7 +361,6 @@ static grpc_error *pollset_global_init(void) { static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); - gpr_mu_destroy(&g_wq_mu); if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); for (size_t i = 0; i < g_num_neighbourhoods; i++) { gpr_mu_destroy(&g_neighbourhoods[i].mu); @@ -492,9 +484,6 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, for (int i = 0; i < r; i++) { void *data_ptr = events[i].data.ptr; if (data_ptr == &global_wakeup_fd) { - gpr_mu_lock(&g_wq_mu); - grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list); - gpr_mu_unlock(&g_wq_mu); append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else { @@ -777,84 +766,6 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) {} /******************************************************************************* - * Workqueue Definitions - */ - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) {} -#endif - -static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - // find a neighbourhood to wakeup - bool scheduled = false; - size_t initial_neighbourhood = choose_neighbourhood(); - for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) { - pollset_neighbourhood *neighbourhood = - &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods]; - if (gpr_mu_trylock(&neighbourhood->mu)) { - if (neighbourhood->active_root != NULL) { - grpc_pollset *inspect = neighbourhood->active_root; - do { - if (gpr_mu_trylock(&inspect->mu)) { - if (inspect->root_worker != NULL) { - grpc_pollset_worker *inspect_worker = inspect->root_worker; - do { - if (inspect_worker->kick_state == UNKICKED) { - inspect_worker->kick_state = KICKED; - grpc_closure_list_append( - &inspect_worker->schedule_on_end_work, closure, error); - if (inspect_worker->initialized_cv) { - gpr_cv_signal(&inspect_worker->cv); - } - scheduled = true; - } - inspect_worker = inspect_worker->next; - } while (!scheduled && inspect_worker != inspect->root_worker); - } - gpr_mu_unlock(&inspect->mu); - } - inspect = inspect->next; - } while (!scheduled && inspect != neighbourhood->active_root); - } - gpr_mu_unlock(&neighbourhood->mu); - } - } - if (!scheduled) { - gpr_mu_lock(&g_wq_mu); - grpc_closure_list_append(&g_wq_items, closure, error); - gpr_mu_unlock(&g_wq_mu); - GRPC_LOG_IF_ERROR("workqueue_scheduler", - grpc_wakeup_fd_wakeup(&global_wakeup_fd)); - } -} - -static const grpc_closure_scheduler_vtable - singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched, - "epoll1_workqueue"}; - -static grpc_closure_scheduler singleton_workqueue_scheduler = { - &singleton_workqueue_scheduler_vtable}; - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - return &singleton_workqueue_scheduler; -} - -/******************************************************************************* * Pollset-set Definitions */ @@ -905,7 +816,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -923,10 +833,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index 0304d97fd8..d7ae0d371e 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -46,7 +46,6 @@ #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/env.h" @@ -169,13 +168,15 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +//#define PI_REFCOUNT_DEBUG + +#ifdef PI_REFCOUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) #define PI_UNREF(exec_ctx, p, r) \ pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) -#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */ +#else /* defined(PI_REFCOUNT_DEBUG) */ #define PI_ADD_REF(p, r) pi_add_ref((p)) #define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) @@ -189,8 +190,6 @@ typedef struct worker_node { /* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { - grpc_closure_scheduler workqueue_scheduler; - gpr_mu mu; /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement the refcount. @@ -211,15 +210,6 @@ typedef struct polling_island { /* Number of threads currently polling on this island */ gpr_atm poller_count; - /* Mutex guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_mu workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; /* The list of workers waiting to do polling on this polling island */ gpr_mu worker_list_mu; @@ -308,8 +298,6 @@ static __thread polling_island *g_current_thread_polling_island; /* Forward declaration */ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -322,13 +310,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - static void pi_add_ref(polling_island *pi); static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#ifdef PI_REFCOUNT_DEBUG static void pi_add_ref_dbg(polling_island *pi, const char *reason, const char *file, int line) { long old_cnt = gpr_atm_acq_load(&pi->ref_count); @@ -344,36 +329,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } - -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - pi_add_ref_dbg((polling_island *)workqueue, reason, file, line); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - pi_add_ref((polling_island *)workqueue); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - pi_unref(exec_ctx, (polling_island *)workqueue); - } -} #endif static void pi_add_ref(polling_island *pi) { @@ -577,17 +532,12 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, *error = GRPC_ERROR_NONE; pi = gpr_malloc(sizeof(*pi)); - pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; gpr_mu_init(&pi->mu); pi->fd_cnt = 0; pi->fd_capacity = 0; pi->fds = NULL; pi->epoll_fd = -1; - gpr_mu_init(&pi->workqueue_read_mu); - gpr_mpscq_init(&pi->workqueue_items); - gpr_atm_rel_store(&pi->workqueue_item_count, 0); - gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->poller_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); @@ -595,11 +545,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, gpr_mu_init(&pi->worker_list_mu); worker_node_init(&pi->worker_list_head); - if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd), - err_desc)) { - goto done; - } - pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { @@ -607,8 +552,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, goto done; } - polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error); - if (initial_fd != NULL) { polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); } @@ -627,11 +570,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { if (pi->epoll_fd >= 0) { close(pi->epoll_fd); } - GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0); - gpr_mu_destroy(&pi->workqueue_read_mu); - gpr_mpscq_destroy(&pi->workqueue_items); gpr_mu_destroy(&pi->mu); - grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd); gpr_mu_destroy(&pi->worker_list_mu); GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head)); @@ -779,45 +718,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) { } } -static void workqueue_maybe_wakeup(polling_island *pi) { - /* If this thread is the current poller, then it may be that it's about to - decrement the current poller count, so we need to look past this thread */ - bool is_current_poller = (g_current_thread_polling_island == pi); - gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0; - gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count); - /* Only issue a wakeup if it's likely that some poller could come in and take - it right now. Note that since we do an anticipatory mpscq_pop every poll - loop, it's ok if we miss the wakeup here, as we'll get the work item when - the next poller enters anyway. */ - if (current_pollers > min_current_pollers_for_wakeup) { - GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", - grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd)); - } -} - -static void workqueue_move_items_to_parent(polling_island *q) { - polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to); - if (p == NULL) { - return; - } - gpr_mu_lock(&q->workqueue_read_mu); - int num_added = 0; - while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items); - if (n != NULL) { - gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1); - gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1); - gpr_mpscq_push(&p->workqueue_items, n); - num_added++; - } - } - gpr_mu_unlock(&q->workqueue_read_mu); - if (num_added > 0) { - workqueue_maybe_wakeup(p); - } - workqueue_move_items_to_parent(p); -} - static polling_island *polling_island_merge(polling_island *p, polling_island *q, grpc_error **error) { @@ -842,8 +742,6 @@ static polling_island *polling_island_merge(polling_island *p, /* Add the 'merged_to' link from p --> q */ gpr_atm_rel_store(&p->merged_to, (gpr_atm)q); PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */ - - workqueue_move_items_to_parent(p); } /* else if p == q, nothing needs to be done */ @@ -854,32 +752,6 @@ static polling_island *polling_island_merge(polling_island *p, return q; } -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler; - /* take a ref to the workqueue: otherwise it can happen that whatever events - * this kicks off ends up destroying the workqueue before this function - * completes */ - GRPC_WORKQUEUE_REF(workqueue, "enqueue"); - polling_island *pi = (polling_island *)workqueue; - gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_maybe_wakeup(pi); - } - workqueue_move_items_to_parent(pi); - GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); - GPR_TIMER_END("workqueue.enqueue", 0); -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - polling_island *pi = (polling_island *)workqueue; - return workqueue == NULL ? grpc_schedule_on_exec_ctx - : &pi->workqueue_scheduler; -} - static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -1138,14 +1010,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - gpr_mu_lock(&fd->po.mu); - grpc_workqueue *workqueue = - GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue"); - gpr_mu_unlock(&fd->po.mu); - return workqueue; -} - /******************************************************************************* * Pollset Definitions */ @@ -1417,33 +1281,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { gpr_mu_destroy(&pollset->po.mu); } -static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, - polling_island *pi) { - if (gpr_mu_trylock(&pi->workqueue_read_mu)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items); - gpr_mu_unlock(&pi->workqueue_read_mu); - if (n != NULL) { - if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) { - workqueue_maybe_wakeup(pi); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - return true; - } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - workqueue_maybe_wakeup(pi); - } - } - return false; -} - /* NOTE: This function may modify 'now' */ static bool acquire_polling_lease(grpc_pollset_worker *worker, polling_island *pi, gpr_timespec deadline, @@ -1579,12 +1416,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &pi->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), - err_desc); - maybe_do_workqueue_work(exec_ctx, pi); - } else if (data_ptr == &polling_island_wakeup_fd) { + if (data_ptr == &polling_island_wakeup_fd) { GRPC_POLLING_TRACE( "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " "%d) got merged", @@ -1660,15 +1492,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, PI_ADD_REF(pi, "ps_work"); gpr_mu_unlock(&pollset->po.mu); - /* If we get some workqueue work to do, it might end up completing an item on - the completion queue, so there's no need to poll... so we skip that and - redo the complete loop to verify */ - if (!maybe_do_workqueue_work(exec_ctx, pi)) { - g_current_thread_polling_island = pi; - pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, - deadline, sig_mask, error); - g_current_thread_polling_island = NULL; - } + g_current_thread_polling_island = pi; + pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, deadline, + sig_mask, error); + g_current_thread_polling_island = NULL; GPR_ASSERT(pi != NULL); @@ -2021,7 +1848,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -2039,10 +1865,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c index 23f0aa68d9..8692b5b3c3 100644 --- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c +++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c @@ -46,7 +46,6 @@ #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -94,23 +93,22 @@ static void fd_global_shutdown(void); * epoll set Declarations */ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +//#define EPS_REFCOUNT_DEBUG + +#ifdef EPS_REFCOUNT_DEBUG #define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__) #define EPS_UNREF(exec_ctx, p, r) \ eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) -#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */ +#else /* defined(EPS_REFCOUNT_DEBUG) */ #define EPS_ADD_REF(p, r) eps_add_ref((p)) #define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p)) #endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */ -/* This is also used as grpc_workqueue (by directly casting it) */ typedef struct epoll_set { - grpc_closure_scheduler workqueue_scheduler; - /* Mutex poller should acquire to poll this. This enforces that only one * poller can be polling on epoll_set at any time */ gpr_mu mu; @@ -124,15 +122,6 @@ typedef struct epoll_set { /* Number of threads currently polling on this epoll set*/ gpr_atm poller_count; - /* Mutex guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_mu workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; /* Is the epoll set shutdown */ gpr_atm is_shutdown; @@ -166,7 +155,9 @@ struct grpc_pollset { /******************************************************************************* * Pollset-set Declarations */ -struct grpc_pollset_set {}; +struct grpc_pollset_set { + char unused; +}; /***************************************************************************** * Dedicated polling threads and pollsets - Declarations @@ -220,8 +211,6 @@ static __thread epoll_set *g_current_thread_epoll_set; /* Forward declaration */ static void epoll_set_delete(epoll_set *eps); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -234,13 +223,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - static void eps_add_ref(epoll_set *eps); static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#ifdef EPS_REFCOUNT_DEBUG static void eps_add_ref_dbg(epoll_set *eps, const char *reason, const char *file, int line) { long old_cnt = gpr_atm_acq_load(&eps->ref_count); @@ -256,36 +242,6 @@ static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps, gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)eps, old_cnt, (old_cnt - 1), reason, file, line); } - -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - eps_add_ref_dbg((epoll_set *)workqueue, reason, file, line); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - eps_unref_dbg(exec_ctx, (epoll_set *)workqueue, reason, file, line); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - eps_add_ref((epoll_set *)workqueue); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - eps_unref(exec_ctx, (epoll_set *)workqueue); - } -} #endif static void eps_add_ref(epoll_set *eps) { @@ -379,24 +335,15 @@ static epoll_set *epoll_set_create(grpc_error **error) { *error = GRPC_ERROR_NONE; eps = gpr_malloc(sizeof(*eps)); - eps->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; eps->epoll_fd = -1; gpr_mu_init(&eps->mu); - gpr_mu_init(&eps->workqueue_read_mu); - gpr_mpscq_init(&eps->workqueue_items); - gpr_atm_rel_store(&eps->workqueue_item_count, 0); gpr_atm_rel_store(&eps->ref_count, 0); gpr_atm_rel_store(&eps->poller_count, 0); gpr_atm_rel_store(&eps->is_shutdown, false); - if (!append_error(error, grpc_wakeup_fd_init(&eps->workqueue_wakeup_fd), - err_desc)) { - goto done; - } - eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (eps->epoll_fd < 0) { @@ -404,8 +351,6 @@ static epoll_set *epoll_set_create(grpc_error **error) { goto done; } - epoll_set_add_wakeup_fd_locked(eps, &eps->workqueue_wakeup_fd, error); - done: if (*error != GRPC_ERROR_NONE) { epoll_set_delete(eps); @@ -419,57 +364,11 @@ static void epoll_set_delete(epoll_set *eps) { close(eps->epoll_fd); } - GPR_ASSERT(gpr_atm_no_barrier_load(&eps->workqueue_item_count) == 0); gpr_mu_destroy(&eps->mu); - gpr_mu_destroy(&eps->workqueue_read_mu); - gpr_mpscq_destroy(&eps->workqueue_items); - grpc_wakeup_fd_destroy(&eps->workqueue_wakeup_fd); gpr_free(eps); } -static void workqueue_maybe_wakeup(epoll_set *eps) { - /* If this thread is the current poller, then it may be that it's about to - decrement the current poller count, so we need to look past this thread */ - bool is_current_poller = (g_current_thread_epoll_set == eps); - gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0; - gpr_atm current_pollers = gpr_atm_no_barrier_load(&eps->poller_count); - /* Only issue a wakeup if it's likely that some poller could come in and take - it right now. Note that since we do an anticipatory mpscq_pop every poll - loop, it's ok if we miss the wakeup here, as we'll get the work item when - the next poller enters anyway. */ - if (current_pollers > min_current_pollers_for_wakeup) { - GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", - grpc_wakeup_fd_wakeup(&eps->workqueue_wakeup_fd)); - } -} - -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler; - /* take a ref to the workqueue: otherwise it can happen that whatever events - * this kicks off ends up destroying the workqueue before this function - * completes */ - GRPC_WORKQUEUE_REF(workqueue, "enqueue"); - epoll_set *eps = (epoll_set *)workqueue; - gpr_atm last = gpr_atm_no_barrier_fetch_add(&eps->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&eps->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_maybe_wakeup(eps); - } - - GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); - GPR_TIMER_END("workqueue.enqueue", 0); -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - epoll_set *eps = (epoll_set *)workqueue; - return workqueue == NULL ? grpc_schedule_on_exec_ctx - : &eps->workqueue_scheduler; -} - static grpc_error *epoll_set_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -665,8 +564,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } - /******************************************************************************* * Pollset Definitions */ @@ -850,32 +747,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } -static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, epoll_set *eps) { - if (gpr_mu_trylock(&eps->workqueue_read_mu)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&eps->workqueue_items); - gpr_mu_unlock(&eps->workqueue_read_mu); - if (n != NULL) { - if (gpr_atm_full_fetch_add(&eps->workqueue_item_count, -1) > 1) { - workqueue_maybe_wakeup(eps); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - return true; - } else if (gpr_atm_no_barrier_load(&eps->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - workqueue_maybe_wakeup(eps); - } - } - return false; -} - /* Blocking call */ static void acquire_epoll_lease(epoll_set *eps) { if (g_num_threads_per_eps > 1) { @@ -919,12 +790,7 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &eps->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&eps->workqueue_wakeup_fd), - err_desc); - maybe_do_workqueue_work(exec_ctx, eps); - } else if (data_ptr == &epoll_set_wakeup_fd) { + if (data_ptr == &epoll_set_wakeup_fd) { gpr_atm_rel_store(&eps->is_shutdown, 1); gpr_log(GPR_INFO, "pollset poller: shutdown set"); } else { @@ -951,18 +817,13 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps, epoll set. */ epoll_fd = eps->epoll_fd; - /* If we get some workqueue work to do, it might end up completing an item on - the completion queue, so there's no need to poll... so we skip that and - redo the complete loop to verify */ - if (!maybe_do_workqueue_work(exec_ctx, eps)) { - gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1); - g_current_thread_epoll_set = eps; + gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1); + g_current_thread_epoll_set = eps; - do_epoll_wait(exec_ctx, epoll_fd, eps, error); + do_epoll_wait(exec_ctx, epoll_fd, eps, error); - g_current_thread_epoll_set = NULL; - gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1); - } + g_current_thread_epoll_set = NULL; + gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1); GPR_TIMER_END("epoll_set_work", 0); } @@ -1105,7 +966,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1123,10 +983,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 096edb7fb9..c3627dc914 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -44,7 +44,6 @@ #include "src/core/lib/iomgr/sys_epoll_wrapper.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/spinlock.h" @@ -124,17 +123,6 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; - grpc_closure_scheduler workqueue_scheduler; - /* Spinlock guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_spinlock workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ @@ -157,12 +145,6 @@ struct grpc_fd { static void fd_global_init(void); static void fd_global_shutdown(void); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); - -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - /******************************************************************************* * Pollset Declarations */ @@ -332,13 +314,6 @@ static grpc_fd *fd_create(int fd, const char *name) { grpc_lfev_init(&new_fd->write_closure); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); - GRPC_LOG_IF_ERROR("fd_create", - grpc_wakeup_fd_init(&new_fd->workqueue_wakeup_fd)); - new_fd->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; - new_fd->workqueue_read_mu = GPR_SPINLOCK_INITIALIZER; - gpr_mpscq_init(&new_fd->workqueue_items); - gpr_atm_no_barrier_store(&new_fd->workqueue_item_count, 0); - new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; @@ -431,91 +406,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - REF_BY(fd, 2, "return_workqueue"); - return (grpc_workqueue *)fd; -} - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - ref_by((grpc_fd *)workqueue, 2, file, line, reason); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - unref_by(exec_ctx, (grpc_fd *)workqueue, 2, file, line, reason); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - ref_by((grpc_fd *)workqueue, 2); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - unref_by(exec_ctx, (grpc_fd *)workqueue, 2); - } -} -#endif - -static void workqueue_wakeup(grpc_fd *fd) { - GRPC_LOG_IF_ERROR("workqueue_enqueue", - grpc_wakeup_fd_wakeup(&fd->workqueue_wakeup_fd)); -} - -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_fd *fd = (grpc_fd *)(((char *)closure->scheduler) - - offsetof(grpc_fd, workqueue_scheduler)); - REF_BY(fd, 2, "workqueue_enqueue"); - gpr_atm last = gpr_atm_no_barrier_fetch_add(&fd->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&fd->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_wakeup(fd); - } - UNREF_BY(exec_ctx, fd, 2, "workqueue_enqueue"); -} - -static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - /* handle spurious wakeups */ - if (!gpr_spinlock_trylock(&fd->workqueue_read_mu)) return; - gpr_mpscq_node *n = gpr_mpscq_pop(&fd->workqueue_items); - gpr_spinlock_unlock(&fd->workqueue_read_mu); - if (n != NULL) { - if (gpr_atm_full_fetch_add(&fd->workqueue_item_count, -1) > 1) { - workqueue_wakeup(fd); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - } else if (gpr_atm_no_barrier_load(&fd->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - workqueue_wakeup(fd); - } -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - return &((grpc_fd *)workqueue)->workqueue_scheduler; -} - /******************************************************************************* * Pollable Definitions */ @@ -581,22 +471,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { .data.ptr = fd}; if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { - case EEXIST: /* if this fd is already in the epoll set, the workqueue fd - must also be - just return */ - gpr_mu_unlock(&fd->orphaned_mu); - return GRPC_ERROR_NONE; - default: - append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); - } - } - struct epoll_event ev_wq = { - .events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLEXCLUSIVE), - .data.ptr = (void *)(1 + (intptr_t)fd)}; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->workqueue_wakeup_fd.read_fd, &ev_wq) != - 0) { - switch (errno) { - case EEXIST: /* if the workqueue fd is already in the epoll set we're ok - - no need to do anything special */ + case EEXIST: break; default: append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); @@ -859,29 +734,21 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); } else { - grpc_fd *fd = (grpc_fd *)(((intptr_t)data_ptr) & ~(intptr_t)1); - bool is_workqueue = (((intptr_t)data_ptr) & 1) != 0; + grpc_fd *fd = (grpc_fd *)data_ptr; bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (events[i].events & EPOLLOUT) != 0; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, - "PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d " + "PS:%p poll %p got fd %p: cancel=%d read=%d " "write=%d", - pollset, p, fd, is_workqueue, cancel, read_ev, write_ev); + pollset, p, fd, cancel, read_ev, write_ev); } - if (is_workqueue) { - append_error(&error, - grpc_wakeup_fd_consume_wakeup(&fd->workqueue_wakeup_fd), - err_desc); - fd_invoke_workqueue(exec_ctx, fd); - } else { - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd, pollset); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd, pollset); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); } } } @@ -1434,7 +1301,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1452,10 +1318,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index a58218bae3..29a86f73ff 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -44,7 +44,6 @@ #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -162,7 +161,9 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +//#define PI_REFCOUNT_DEBUG + +#ifdef PI_REFCOUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) #define PI_UNREF(exec_ctx, p, r) \ @@ -177,8 +178,6 @@ static void fd_global_shutdown(void); /* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { - grpc_closure_scheduler workqueue_scheduler; - gpr_mu mu; /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement the refcount. @@ -199,15 +198,6 @@ typedef struct polling_island { /* Number of threads currently polling on this island */ gpr_atm poller_count; - /* Mutex guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_mu workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; /* The fd of the underlying epoll set */ int epoll_fd; @@ -282,8 +272,6 @@ static __thread polling_island *g_current_thread_polling_island; /* Forward declaration */ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -296,13 +284,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - static void pi_add_ref(polling_island *pi); static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#ifdef PI_REFCOUNT_DEBUG static void pi_add_ref_dbg(polling_island *pi, const char *reason, const char *file, int line) { long old_cnt = gpr_atm_acq_load(&pi->ref_count); @@ -318,36 +303,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } - -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - pi_add_ref_dbg((polling_island *)workqueue, reason, file, line); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - pi_add_ref((polling_island *)workqueue); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - pi_unref(exec_ctx, (polling_island *)workqueue); - } -} #endif static void pi_add_ref(polling_island *pi) { @@ -511,26 +466,16 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, *error = GRPC_ERROR_NONE; pi = gpr_malloc(sizeof(*pi)); - pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; gpr_mu_init(&pi->mu); pi->fd_cnt = 0; pi->fd_capacity = 0; pi->fds = NULL; pi->epoll_fd = -1; - gpr_mu_init(&pi->workqueue_read_mu); - gpr_mpscq_init(&pi->workqueue_items); - gpr_atm_rel_store(&pi->workqueue_item_count, 0); - gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->poller_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); - if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd), - err_desc)) { - goto done; - } - pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { @@ -538,8 +483,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, goto done; } - polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error); - if (initial_fd != NULL) { polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); } @@ -558,11 +501,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { if (pi->epoll_fd >= 0) { close(pi->epoll_fd); } - GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0); - gpr_mu_destroy(&pi->workqueue_read_mu); - gpr_mpscq_destroy(&pi->workqueue_items); gpr_mu_destroy(&pi->mu); - grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd); gpr_free(pi->fds); gpr_free(pi); } @@ -707,45 +646,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) { } } -static void workqueue_maybe_wakeup(polling_island *pi) { - /* If this thread is the current poller, then it may be that it's about to - decrement the current poller count, so we need to look past this thread */ - bool is_current_poller = (g_current_thread_polling_island == pi); - gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0; - gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count); - /* Only issue a wakeup if it's likely that some poller could come in and take - it right now. Note that since we do an anticipatory mpscq_pop every poll - loop, it's ok if we miss the wakeup here, as we'll get the work item when - the next poller enters anyway. */ - if (current_pollers >= min_current_pollers_for_wakeup) { - GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", - grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd)); - } -} - -static void workqueue_move_items_to_parent(polling_island *q) { - polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to); - if (p == NULL) { - return; - } - gpr_mu_lock(&q->workqueue_read_mu); - int num_added = 0; - while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items); - if (n != NULL) { - gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1); - gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1); - gpr_mpscq_push(&p->workqueue_items, n); - num_added++; - } - } - gpr_mu_unlock(&q->workqueue_read_mu); - if (num_added > 0) { - workqueue_maybe_wakeup(p); - } - workqueue_move_items_to_parent(p); -} - static polling_island *polling_island_merge(polling_island *p, polling_island *q, grpc_error **error) { @@ -770,8 +670,6 @@ static polling_island *polling_island_merge(polling_island *p, /* Add the 'merged_to' link from p --> q */ gpr_atm_rel_store(&p->merged_to, (gpr_atm)q); PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */ - - workqueue_move_items_to_parent(p); } /* else if p == q, nothing needs to be done */ @@ -782,32 +680,6 @@ static polling_island *polling_island_merge(polling_island *p, return q; } -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler; - /* take a ref to the workqueue: otherwise it can happen that whatever events - * this kicks off ends up destroying the workqueue before this function - * completes */ - GRPC_WORKQUEUE_REF(workqueue, "enqueue"); - polling_island *pi = (polling_island *)workqueue; - gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_maybe_wakeup(pi); - } - workqueue_move_items_to_parent(pi); - GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); - GPR_TIMER_END("workqueue.enqueue", 0); -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - polling_island *pi = (polling_island *)workqueue; - return workqueue == NULL ? grpc_schedule_on_exec_ctx - : &pi->workqueue_scheduler; -} - static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -1069,14 +941,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - gpr_mu_lock(&fd->po.mu); - grpc_workqueue *workqueue = - GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue"); - gpr_mu_unlock(&fd->po.mu); - return workqueue; -} - /******************************************************************************* * Pollset Definitions */ @@ -1314,44 +1178,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { gpr_mu_destroy(&pollset->po.mu); } -static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, - polling_island *pi) { - if (gpr_mu_trylock(&pi->workqueue_read_mu)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items); - gpr_mu_unlock(&pi->workqueue_read_mu); - if (n != NULL) { - gpr_atm remaining = - gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) - 1; - GRPC_POLLING_TRACE( - "maybe_do_workqueue_work: pi: %p: got closure %p, remaining = " - "%" PRIdPTR, - pi, n, remaining); - if (remaining > 0) { - workqueue_maybe_wakeup(pi); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - return true; - } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - GRPC_POLLING_TRACE( - "maybe_do_workqueue_work: pi: %p: more to do, but not yet", pi); - workqueue_maybe_wakeup(pi); - } - } else { - GRPC_POLLING_TRACE("maybe_do_workqueue_work: pi: %p: read already locked", - pi); - } - return false; -} - #define GRPC_EPOLL_MAX_EVENTS 100 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, @@ -1407,76 +1233,61 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, PI_ADD_REF(pi, "ps_work"); gpr_mu_unlock(&pollset->po.mu); - /* If we get some workqueue work to do, it might end up completing an item on - the completion queue, so there's no need to poll... so we skip that and - redo the complete loop to verify */ - GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker %p, pi %p", pollset, - worker, pi); - if (!maybe_do_workqueue_work(exec_ctx, pi)) { - GRPC_POLLING_TRACE("pollset_work: begins"); - gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); - g_current_thread_polling_island = pi; - - GRPC_SCHEDULING_START_BLOCKING_REGION; - ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, - sig_mask); - GRPC_SCHEDULING_END_BLOCKING_REGION; - if (ep_rv < 0) { - if (errno != EINTR) { - gpr_asprintf(&err_msg, - "epoll_wait() epoll fd: %d failed with error: %d (%s)", - epoll_fd, errno, strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - } else { - /* We were interrupted. Save an interation by doing a zero timeout - epoll_wait to see if there are any other events of interest */ - GRPC_POLLING_TRACE( - "pollset_work: pollset: %p, worker: %p received kick", - (void *)pollset, (void *)worker); - ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); - } + gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); + g_current_thread_polling_island = pi; + + GRPC_SCHEDULING_START_BLOCKING_REGION; + ep_rv = + epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); + GRPC_SCHEDULING_END_BLOCKING_REGION; + if (ep_rv < 0) { + if (errno != EINTR) { + gpr_asprintf(&err_msg, + "epoll_wait() epoll fd: %d failed with error: %d (%s)", + epoll_fd, errno, strerror(errno)); + append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); + } else { + /* We were interrupted. Save an interation by doing a zero timeout + epoll_wait to see if there are any other events of interest */ + GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick", + (void *)pollset, (void *)worker); + ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); } + } #ifdef GRPC_TSAN - /* See the definition of g_poll_sync for more details */ - gpr_atm_acq_load(&g_epoll_sync); + /* See the definition of g_poll_sync for more details */ + gpr_atm_acq_load(&g_epoll_sync); #endif /* defined(GRPC_TSAN) */ - for (int i = 0; i < ep_rv; ++i) { - void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &pi->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), - err_desc); - maybe_do_workqueue_work(exec_ctx, pi); - } else if (data_ptr == &polling_island_wakeup_fd) { - GRPC_POLLING_TRACE( - "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " - "%d) got merged", - (void *)pollset, (void *)worker, epoll_fd); - /* This means that our polling island is merged with a different - island. We do not have to do anything here since the subsequent call - to the function pollset_work_and_unlock() will pick up the correct - epoll_fd */ - } else { - grpc_fd *fd = data_ptr; - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd, pollset); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } + for (int i = 0; i < ep_rv; ++i) { + void *data_ptr = ep_ev[i].data.ptr; + if (data_ptr == &polling_island_wakeup_fd) { + GRPC_POLLING_TRACE( + "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " + "%d) got merged", + (void *)pollset, (void *)worker, epoll_fd); + /* This means that our polling island is merged with a different + island. We do not have to do anything here since the subsequent call + to the function pollset_work_and_unlock() will pick up the correct + epoll_fd */ + } else { + grpc_fd *fd = data_ptr; + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd, pollset); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); } } - - g_current_thread_polling_island = NULL; - gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); - GRPC_POLLING_TRACE("pollset_work: ends"); } + g_current_thread_polling_island = NULL; + gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); + GPR_ASSERT(pi != NULL); /* Before leaving, release the extra ref we added to the polling island. It @@ -1867,7 +1678,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1885,10 +1695,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 806f985229..6145c9ec3f 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -633,8 +633,6 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } - /******************************************************************************* * pollset_posix.c */ @@ -1274,30 +1272,6 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, } /******************************************************************************* - * workqueue stubs - */ - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - return workqueue; -} -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) {} -#endif - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - return grpc_schedule_on_exec_ctx; -} - -/******************************************************************************* * Condition Variable polling extensions */ @@ -1514,7 +1488,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1532,10 +1505,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 4377b2a0f6..54960d1ecc 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -156,10 +156,6 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { return g_event_engine->fd_create(fd, name); } -grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) { - return g_event_engine->fd_get_workqueue(fd); -} - int grpc_fd_wrapped_fd(grpc_fd *fd) { return g_event_engine->fd_wrapped_fd(fd); } @@ -261,26 +257,4 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd); } -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, - int line, const char *reason) { - return g_event_engine->workqueue_ref(workqueue, file, line, reason); -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason); -} -#else -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { - return g_event_engine->workqueue_ref(workqueue); -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - g_event_engine->workqueue_unref(exec_ctx, workqueue); -} -#endif - -grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { - return g_event_engine->workqueue_scheduler(workqueue); -} - #endif // GRPC_POSIX_SOCKET diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index f87fe16901..54c4f2ee11 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -26,7 +26,6 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" extern grpc_tracer_flag grpc_polling_trace; /* Disabled by default */ @@ -45,7 +44,6 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); bool (*fd_is_shutdown)(grpc_fd *fd); - grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); @@ -82,17 +80,6 @@ typedef struct grpc_event_engine_vtable { grpc_pollset_set *pollset_set, grpc_fd *fd); void (*shutdown_engine)(void); - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG - grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue, const char *file, - int line, const char *reason); - void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason); -#else - grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue); - void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); -#endif - grpc_closure_scheduler *(*workqueue_scheduler)(grpc_workqueue *workqueue); } grpc_event_engine_vtable; void grpc_event_engine_init(void); @@ -106,9 +93,6 @@ const char *grpc_get_poll_strategy_name(); This takes ownership of closing fd. */ grpc_fd *grpc_fd_create(int fd, const char *name); -/* Get a workqueue that's associated with this fd */ -grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd); - /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd *fd); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 7cef32adce..6699be3646 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -23,7 +23,6 @@ #include <grpc/support/thd.h> #include "src/core/lib/iomgr/combiner.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) { diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 4ca23a2407..7621a7fe75 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -21,132 +21,172 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include <grpc/support/tls.h> +#include <grpc/support/useful.h> + #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/support/spinlock.h" + +#define MAX_DEPTH 2 -typedef struct grpc_executor_data { - int busy; /**< is the thread currently running? */ - int shutting_down; /**< has \a grpc_shutdown() been invoked? */ - int pending_join; /**< has the thread finished but not been joined? */ - grpc_closure_list closures; /**< collection of pending work */ - gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a - pending_join are true */ - gpr_thd_options options; +typedef struct { gpr_mu mu; -} grpc_executor; + gpr_cv cv; + grpc_closure_list elems; + size_t depth; + bool shutdown; + gpr_thd_id id; +} thread_state; -static grpc_executor g_executor; +static thread_state *g_thread_state; +static size_t g_max_threads; +static gpr_atm g_cur_threads; +static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; -void grpc_executor_init() { - memset(&g_executor, 0, sizeof(grpc_executor)); - gpr_mu_init(&g_executor.mu); - g_executor.options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&g_executor.options); -} +GPR_TLS_DECL(g_this_thread_state); -/* thread body */ -static void closure_exec_thread_func(void *ignored) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - while (1) { - gpr_mu_lock(&g_executor.mu); - if (g_executor.shutting_down != 0) { - gpr_mu_unlock(&g_executor.mu); - break; - } - if (grpc_closure_list_empty(g_executor.closures)) { - /* no more work, time to die */ - GPR_ASSERT(g_executor.busy == 1); - g_executor.busy = 0; - gpr_mu_unlock(&g_executor.mu); - break; - } else { - grpc_closure *c = g_executor.closures.head; - grpc_closure_list_init(&g_executor.closures); - gpr_mu_unlock(&g_executor.mu); - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; +static void executor_thread(void *arg); + +static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { + size_t n = 0; + + grpc_closure *c = list.head; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; #ifndef NDEBUG - c->scheduled = false; + c->scheduled = false; #endif - c->cb(&exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - } - grpc_exec_ctx_flush(&exec_ctx); - } + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + n++; } - grpc_exec_ctx_finish(&exec_ctx); + + return n; } -/* Spawn the thread if new work has arrived a no thread is up */ -static void maybe_spawn_locked() { - if (grpc_closure_list_empty(g_executor.closures) == 1) { - return; - } - if (g_executor.shutting_down == 1) { - return; - } +bool grpc_executor_is_threaded() { + return gpr_atm_no_barrier_load(&g_cur_threads) > 0; +} - if (g_executor.busy != 0) { - /* Thread still working. New work will be picked up by already running - * thread. Not spawning anything. */ - return; - } else if (g_executor.pending_join != 0) { - /* Pickup the remains of the previous incarnations of the thread. */ - gpr_thd_join(g_executor.tid); - g_executor.pending_join = 0; +void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { + gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads); + if (threading) { + if (cur_threads > 0) return; + g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); + gpr_atm_no_barrier_store(&g_cur_threads, 1); + gpr_tls_init(&g_this_thread_state); + g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads); + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_init(&g_thread_state[i].mu); + gpr_cv_init(&g_thread_state[i].cv); + g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; + } + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0], + &opt); + } else { + if (cur_threads == 0) return; + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_lock(&g_thread_state[i].mu); + g_thread_state[i].shutdown = true; + gpr_cv_signal(&g_thread_state[i].cv); + gpr_mu_unlock(&g_thread_state[i].mu); + } + /* ensure no thread is adding a new thread... once this is past, then + no thread will try to add a new one either (since shutdown is true) */ + gpr_spinlock_lock(&g_adding_thread_lock); + gpr_spinlock_unlock(&g_adding_thread_lock); + for (gpr_atm i = 0; i < g_cur_threads; i++) { + gpr_thd_join(g_thread_state[i].id); + } + gpr_atm_no_barrier_store(&g_cur_threads, 0); + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_destroy(&g_thread_state[i].mu); + gpr_cv_destroy(&g_thread_state[i].cv); + run_closures(exec_ctx, g_thread_state[i].elems); + } + gpr_free(g_thread_state); + gpr_tls_destroy(&g_this_thread_state); } +} - /* All previous instances of the thread should have been joined at this point. - * Spawn time! */ - g_executor.busy = 1; - GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, - &g_executor.options)); - g_executor.pending_join = 1; +void grpc_executor_init(grpc_exec_ctx *exec_ctx) { + gpr_atm_no_barrier_store(&g_cur_threads, 0); + grpc_executor_set_threading(exec_ctx, true); } -static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - gpr_mu_lock(&g_executor.mu); - if (g_executor.shutting_down == 0) { - grpc_closure_list_append(&g_executor.closures, closure, error); - maybe_spawn_locked(); +void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { + grpc_executor_set_threading(exec_ctx, false); +} + +static void executor_thread(void *arg) { + thread_state *ts = arg; + gpr_tls_set(&g_this_thread_state, (intptr_t)ts); + + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); + + size_t subtract_depth = 0; + for (;;) { + gpr_mu_lock(&ts->mu); + ts->depth -= subtract_depth; + while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { + gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + if (ts->shutdown) { + gpr_mu_unlock(&ts->mu); + break; + } + grpc_closure_list exec = ts->elems; + ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; + gpr_mu_unlock(&ts->mu); + + subtract_depth = run_closures(&exec_ctx, exec); + grpc_exec_ctx_flush(&exec_ctx); } - gpr_mu_unlock(&g_executor.mu); + grpc_exec_ctx_finish(&exec_ctx); } -void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { - int pending_join; - - gpr_mu_lock(&g_executor.mu); - pending_join = g_executor.pending_join; - g_executor.shutting_down = 1; - gpr_mu_unlock(&g_executor.mu); - /* we can release the lock at this point despite the access to the closure - * list below because we aren't accepting new work */ - - /* Execute pending callbacks, some may be performing cleanups */ - grpc_closure *c = g_executor.closures.head; - grpc_closure_list_init(&g_executor.closures); - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; +static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count == 0) { + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + return; + } + thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); + if (ts == NULL) { + ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; } - grpc_exec_ctx_flush(exec_ctx); - GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); - if (pending_join) { - gpr_thd_join(g_executor.tid); + gpr_mu_lock(&ts->mu); + if (grpc_closure_list_empty(ts->elems)) { + gpr_cv_signal(&ts->cv); + } + grpc_closure_list_append(&ts->elems, closure, error); + ts->depth++; + bool try_new_thread = ts->depth > MAX_DEPTH && + cur_thread_count < g_max_threads && !ts->shutdown; + gpr_mu_unlock(&ts->mu); + if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count < g_max_threads) { + gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, + &g_thread_state[cur_thread_count], &opt); + } + gpr_spinlock_unlock(&g_adding_thread_lock); } - gpr_mu_destroy(&g_executor.mu); } static const grpc_closure_scheduler_vtable executor_vtable = { diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index ace9d80b06..c3382a0a12 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -26,11 +26,18 @@ * This mechanism is meant to outsource work (grpc_closure instances) to a * thread, for those cases where blocking isn't an option but there isn't a * non-blocking solution available. */ -void grpc_executor_init(); +void grpc_executor_init(grpc_exec_ctx *exec_ctx); extern grpc_closure_scheduler *grpc_executor_scheduler; /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx); +/** Is the executor multi-threaded? */ +bool grpc_executor_is_threaded(); + +/* enable/disable threading - must be called after grpc_executor_init and before + grpc_executor_shutdown */ +void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable); + #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index c8b784e4c0..3d19953eeb 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -29,6 +29,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/timer.h" @@ -41,11 +42,12 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; -void grpc_iomgr_init(void) { +void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); + grpc_executor_init(exec_ctx); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; @@ -53,7 +55,7 @@ void grpc_iomgr_init(void) { grpc_iomgr_platform_init(); } -void grpc_iomgr_start(void) { grpc_timer_manager_init(); } +void grpc_iomgr_start(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_init(); } static size_t count_objects(void) { grpc_iomgr_object *obj; @@ -78,6 +80,7 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_shutdown(); grpc_iomgr_platform_flush(); + grpc_executor_shutdown(exec_ctx); gpr_mu_lock(&g_mu); g_shutdown = 1; diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 45dedc2b0a..e3cd6ebe79 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -23,10 +23,10 @@ #include "src/core/lib/iomgr/port.h" /** Initializes the iomgr. */ -void grpc_iomgr_init(void); +void grpc_iomgr_init(grpc_exec_ctx *exec_ctx); /** Starts any background threads for iomgr. */ -void grpc_iomgr_start(void); +void grpc_iomgr_start(grpc_exec_ctx *exec_ctx); /** Signals the intention to shutdown the iomgr. Expects to be able to flush * exec_ctx. */ diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index d3632c4cba..7c3fb93279 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -566,7 +566,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq, grpc_resource_quota *grpc_resource_quota_create(const char *name) { grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota)); gpr_ref_init(&resource_quota->refs, 1); - resource_quota->combiner = grpc_combiner_create(NULL); + resource_quota->combiner = grpc_combiner_create(); resource_quota->free_pool = INT64_MAX; resource_quota->size = INT64_MAX; gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); @@ -579,12 +579,11 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR, (intptr_t)resource_quota); } - grpc_closure_init( - &resource_quota->rq_step_closure, rq_step, resource_quota, - grpc_combiner_finally_scheduler(resource_quota->combiner, true)); + grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota, + grpc_combiner_finally_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_quota->rq_reclamation_done_closure, rq_reclamation_done, resource_quota, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_quota->roots[i] = NULL; } @@ -689,18 +688,18 @@ grpc_resource_user *grpc_resource_user_create( grpc_resource_quota_ref_internal(resource_quota); grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_user->add_to_free_pool_closure, &ru_add_to_free_pool, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_user->post_reclaimer_closure[0], &ru_post_benign_reclaimer, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_user->post_reclaimer_closure[1], &ru_post_destructive_reclaimer, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); gpr_mu_init(&resource_user->mu); gpr_atm_rel_store(&resource_user->refs, 1); gpr_atm_rel_store(&resource_user->shutdown, 0); @@ -757,12 +756,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user) { if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) { - grpc_closure_sched(exec_ctx, - grpc_closure_create( - ru_shutdown, resource_user, - grpc_combiner_scheduler( - resource_user->resource_quota->combiner, false)), - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + grpc_closure_create( + ru_shutdown, resource_user, + grpc_combiner_scheduler(resource_user->resource_quota->combiner)), + GRPC_ERROR_NONE); } } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 36091803bb..a2404f97c1 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -543,26 +543,15 @@ static int tcp_get_fd(grpc_endpoint *ep) { return tcp->fd; } -static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *)ep; - return grpc_fd_get_workqueue(tcp->em_fd); -} - static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return tcp->resource_user; } -static const grpc_endpoint_vtable vtable = {tcp_read, - tcp_write, - tcp_get_workqueue, - tcp_add_to_pollset, - tcp_add_to_pollset_set, - tcp_shutdown, - tcp_destroy, - tcp_get_resource_user, - tcp_get_peer, - tcp_get_fd}; +static const grpc_endpoint_vtable vtable = { + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer, + tcp_get_fd}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 5d3d315234..a37a75c8fa 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -318,15 +318,12 @@ static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { return tcp->resource_user; } -static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } - static int uv_get_fd(grpc_endpoint *ep) { return -1; } static grpc_endpoint_vtable vtable = { - uv_endpoint_read, uv_endpoint_write, uv_get_workqueue, - uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown, - uv_destroy, uv_get_resource_user, uv_get_peer, - uv_get_fd}; + uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset, + uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy, + uv_get_resource_user, uv_get_peer, uv_get_fd}; grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index a0a2563956..dbae42e936 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -395,8 +395,6 @@ static char *win_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } - static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return tcp->resource_user; @@ -404,16 +402,10 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { static int win_get_fd(grpc_endpoint *ep) { return -1; } -static grpc_endpoint_vtable vtable = {win_read, - win_write, - win_get_workqueue, - win_add_to_pollset, - win_add_to_pollset_set, - win_shutdown, - win_destroy, - win_get_resource_user, - win_get_peer, - win_get_fd}; +static grpc_endpoint_vtable vtable = { + win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, + win_shutdown, win_destroy, win_get_resource_user, win_get_peer, + win_get_fd}; grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_channel_args *channel_args, diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h deleted file mode 100644 index 558d4955d3..0000000000 --- a/src/core/lib/iomgr/workqueue.h +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_H -#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_H - -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/iomgr/pollset.h" -#include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/iomgr/port.h" - -#ifdef GPR_WINDOWS -#include "src/core/lib/iomgr/workqueue_windows.h" -#endif - -/* grpc_workqueue is forward declared in exec_ctx.h */ - -/* Reference counting functions. Use the macro's always - (GRPC_WORKQUEUE_{REF,UNREF}). - - Pass in a descriptive reason string for reffing/unreffing as the last - argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that - string will be printed alongside the refcount. When it is not defined, the - string will be discarded at compilation time. */ - -/*#define GRPC_WORKQUEUE_REFCOUNT_DEBUG*/ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -#define GRPC_WORKQUEUE_REF(p, r) \ - grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \ - grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, - int line, const char *reason); -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason); -#else -#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) -#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p)) -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue); -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); -#endif - -/** Fetch the workqueue closure scheduler. Items added to a work queue will be - started in approximately the order they were enqueued, on some thread that - may or may not be the current thread. Successive closures enqueued onto a - workqueue MAY be executed concurrently. - - It is generally more expensive to add a closure to a workqueue than to the - execution context, both in terms of CPU work and in execution latency. - - Use work queues when it's important that other threads be given a chance to - tackle some workload. */ -grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue); - -#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c deleted file mode 100644 index 1900aa1ca6..0000000000 --- a/src/core/lib/iomgr/workqueue_uv.c +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/port.h" - -#ifdef GRPC_UV - -#include "src/core/lib/iomgr/workqueue.h" - -// Minimal implementation of grpc_workqueue for libuv -// Works by directly enqueuing workqueue items onto the current execution -// context, which is at least correct, if not performant or in the spirit of -// workqueues. - -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, - int line, const char *reason) { - return workqueue; -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} -#endif - -grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { - return grpc_schedule_on_exec_ctx; -} - -#endif /* GPR_UV */ diff --git a/src/core/lib/iomgr/workqueue_uv.h b/src/core/lib/iomgr/workqueue_uv.h deleted file mode 100644 index 5cac45ea2a..0000000000 --- a/src/core/lib/iomgr/workqueue_uv.h +++ /dev/null @@ -1,22 +0,0 @@ -/* - * - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H -#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H - -#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c deleted file mode 100644 index c7d650c767..0000000000 --- a/src/core/lib/iomgr/workqueue_windows.c +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_WINDOWS - -#include "src/core/lib/iomgr/workqueue.h" - -// Minimal implementation of grpc_workqueue for Windows -// Works by directly enqueuing workqueue items onto the current execution -// context, which is at least correct, if not performant or in the spirit of -// workqueues. - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, - int line, const char *reason) { - return workqueue; -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} -#endif - -grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { - return grpc_schedule_on_exec_ctx; -} - -#endif /* GPR_WINDOWS */ diff --git a/src/core/lib/iomgr/workqueue_windows.h b/src/core/lib/iomgr/workqueue_windows.h deleted file mode 100644 index 47ceed1110..0000000000 --- a/src/core/lib/iomgr/workqueue_windows.h +++ /dev/null @@ -1,22 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H -#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H - -#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 537f6f922a..58112b04b4 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -50,7 +50,8 @@ typedef struct { */ grpc_polling_entity *pollent; grpc_transport_stream_op_batch op; - uint8_t security_context_set; + gpr_atm security_context_set; + gpr_mu security_context_mu; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; grpc_auth_metadata_context auth_md_context; } call_data; @@ -238,19 +239,26 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_linked_mdelem *l; grpc_client_security_context *sec_ctx = NULL; - if (!op->cancel_stream && calld->security_context_set == 0) { - calld->security_context_set = 1; - GPR_ASSERT(op->payload->context != NULL); - if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { - op->payload->context[GRPC_CONTEXT_SECURITY].value = - grpc_client_security_context_create(); - op->payload->context[GRPC_CONTEXT_SECURITY].destroy = - grpc_client_security_context_destroy; + if (!op->cancel_stream) { + /* double checked lock over security context to ensure it's set once */ + if (gpr_atm_acq_load(&calld->security_context_set) == 0) { + gpr_mu_lock(&calld->security_context_mu); + if (gpr_atm_acq_load(&calld->security_context_set) == 0) { + GPR_ASSERT(op->payload->context != NULL); + if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + op->payload->context[GRPC_CONTEXT_SECURITY].value = + grpc_client_security_context_create(); + op->payload->context[GRPC_CONTEXT_SECURITY].destroy = + grpc_client_security_context_destroy; + } + sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; + GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); + sec_ctx->auth_context = + GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); + gpr_atm_rel_store(&calld->security_context_set, 1); + } + gpr_mu_unlock(&calld->security_context_mu); } - sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; - GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); - sec_ctx->auth_context = - GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); } if (op->send_initial_metadata) { @@ -297,6 +305,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; memset(calld, 0, sizeof(*calld)); + gpr_mu_init(&calld->security_context_mu); return GRPC_ERROR_NONE; } @@ -320,6 +329,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_slice_unref_internal(exec_ctx, calld->method); } reset_auth_metadata_context(&calld->auth_md_context); + gpr_mu_destroy(&calld->security_context_mu); } /* Constructor for channel_data */ diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 8823458da5..643e057229 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -365,11 +365,6 @@ static int endpoint_get_fd(grpc_endpoint *secure_ep) { return grpc_endpoint_get_fd(ep->wrapped_ep); } -static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) { - secure_endpoint *ep = (secure_endpoint *)secure_ep; - return grpc_endpoint_get_workqueue(ep->wrapped_ep); -} - static grpc_resource_user *endpoint_get_resource_user( grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; @@ -378,7 +373,6 @@ static grpc_resource_user *endpoint_get_resource_user( static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_write, - endpoint_get_workqueue, endpoint_add_to_pollset, endpoint_add_to_pollset_set, endpoint_shutdown, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index e4d27ffbf6..fd4ed726b8 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1453,7 +1453,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *stream_op = &bctl->op; grpc_transport_stream_op_batch_payload *stream_op_payload = &call->stream_op_payload; - stream_op->covered_by_poller = true; /* rewrite batch ops into a transport op */ for (i = 0; i < nops; i++) { @@ -1642,10 +1641,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come - from server.c. In that case, it's coming from accept_stream, and in - that case we're not necessarily covered by a poller. */ - stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = true; call->buffered_metadata[0] = op->data.recv_initial_metadata.recv_initial_metadata; diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 3575c5495c..01a1f33db2 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -113,6 +113,7 @@ void grpc_init(void) { int i; gpr_once_init(&g_basic_init, do_basic_init); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); @@ -139,8 +140,7 @@ void grpc_init(void) { grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); #endif grpc_security_pre_init(); - grpc_iomgr_init(); - grpc_executor_init(); + grpc_iomgr_init(&exec_ctx); gpr_timers_global_init(); grpc_handshaker_factory_registry_init(); grpc_security_init(); @@ -156,19 +156,20 @@ void grpc_init(void) { grpc_tracer_init("GRPC_TRACE"); /* no more changes to channel init pipelines */ grpc_channel_init_finalize(); - grpc_iomgr_start(); + grpc_iomgr_start(&exec_ctx); } gpr_mu_unlock(&g_init_mu); + grpc_exec_ctx_finish(&exec_ctx); GRPC_API_TRACE("grpc_init(void)", 0, ()); } void grpc_shutdown(void) { int i; GRPC_API_TRACE("grpc_shutdown(void)", 0, ()); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { - grpc_executor_shutdown(&exec_ctx); grpc_iomgr_shutdown(&exec_ctx); gpr_timers_global_destroy(); grpc_tracer_shutdown(); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 16f12c345b..d231157c87 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -112,10 +112,6 @@ typedef struct grpc_transport_stream_op_batch { /** Values for the stream op (fields set are determined by flags above) */ grpc_transport_stream_op_batch_payload *payload; - /** Is the completion of this op covered by a poller (if false: the op should - complete independently of some pollset being polled) */ - bool covered_by_poller : 1; - /** Send initial metadata to the peer, from the provided metadata batch. */ bool send_initial_metadata : 1; diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index b47b1e8306..7b18229ba6 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -64,9 +64,6 @@ char *grpc_transport_stream_op_batch_string( gpr_strvec b; gpr_strvec_init(&b); - gpr_strvec_add( - &b, gpr_strdup(op->covered_by_poller ? "[COVERED]" : "[UNCOVERED]")); - if (op->send_initial_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{")); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index e3b2ffaf9e..48782174a7 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -143,8 +143,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/wakeup_fd_nospecial.c', 'src/core/lib/iomgr/wakeup_fd_pipe.c', 'src/core/lib/iomgr/wakeup_fd_posix.c', - 'src/core/lib/iomgr/workqueue_uv.c', - 'src/core/lib/iomgr/workqueue_windows.c', 'src/core/lib/json/json.c', 'src/core/lib/json/json_reader.c', 'src/core/lib/json/json_string.c', |