diff options
Diffstat (limited to 'src')
52 files changed, 733 insertions, 1877 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index ab71467d73..2ec9b15f76 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -291,7 +291,7 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, w->chand = chand; grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_combiner_scheduler(chand->combiner)); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state, @@ -652,7 +652,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, start_transport_op_locked, - op, grpc_combiner_scheduler(chand->combiner, false)), + op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } @@ -683,7 +683,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - chand->combiner = grpc_combiner_create(NULL); + chand->combiner = grpc_combiner_create(); gpr_mu_init(&chand->info_mu); gpr_mu_init(&chand->external_connectivity_watcher_list_mu); @@ -694,7 +694,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_combiner_scheduler(chand->combiner)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -753,9 +753,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { grpc_closure_sched( - exec_ctx, - grpc_closure_create(shutdown_resolver_locked, chand->resolver, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_create(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { @@ -786,11 +785,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, * PER-CALL FUNCTIONS */ -#define GET_CALL(call_data) \ - ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call))) - -#define CANCELLED_CALL ((grpc_subchannel_call *)1) - /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting @@ -811,11 +805,9 @@ typedef struct client_channel_call_data { grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; - grpc_error *cancel_error; - - /** either 0 for no call, 1 for cancelled, or a pointer to a - grpc_subchannel_call */ - gpr_atm subchannel_call; + /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest + bit is 0), or a pointer to an error (if the lowest bit is 1) */ + gpr_atm subchannel_call_or_error; gpr_arena *arena; bool pick_pending; @@ -837,10 +829,43 @@ typedef struct client_channel_call_data { grpc_closure *original_on_complete; } call_data; +typedef struct { + grpc_subchannel_call *subchannel_call; + grpc_error *error; +} call_or_error; + +static call_or_error get_call_or_error(call_data *p) { + gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error); + if (c == 0) + return (call_or_error){NULL, NULL}; + else if (c & 1) + return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)}; + else + return (call_or_error){(grpc_subchannel_call *)c, NULL}; +} + +static bool set_call_or_error(call_data *p, call_or_error coe) { + // this should always be under a lock + call_or_error existing = get_call_or_error(p); + if (existing.error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(coe.error); + return false; + } + GPR_ASSERT(existing.subchannel_call == NULL); + if (coe.error != GRPC_ERROR_NONE) { + GPR_ASSERT(coe.subchannel_call == NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error); + } else { + GPR_ASSERT(coe.subchannel_call != NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, + (gpr_atm)coe.subchannel_call); + } + return true; +} + grpc_subchannel_call *grpc_client_channel_get_subchannel_call( grpc_call_element *call_elem) { - grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data); - return scc == CANCELLED_CALL ? NULL : scc; + return get_call_or_error(call_elem->call_data).subchannel_call; } static void add_waiting_locked(call_data *calld, @@ -872,18 +897,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { return; } - grpc_subchannel_call *call = GET_CALL(calld); + call_or_error call = get_call_or_error(calld); grpc_transport_stream_op_batch **ops = calld->waiting_ops; size_t nops = calld->waiting_ops_count; - if (call == CANCELLED_CALL) { - fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); + if (call.error != GRPC_ERROR_NONE) { + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error)); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; for (size_t i = 0; i < nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]); } gpr_free(ops); } @@ -944,19 +969,23 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); + call_or_error coe = get_call_or_error(calld); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL); - fail_locked(exec_ctx, calld, - error == GRPC_ERROR_NONE - ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Call dropped by load balancing policy") - : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1)); - } else if (GET_CALL(calld) == CANCELLED_CALL) { + grpc_error *failure = + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1); + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); + fail_locked(exec_ctx, calld, failure); + } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ + grpc_error *child_errors[] = {error, coe.error}; grpc_error *cancellation_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Cancelled before creating subchannel", &error, 1); + "Cancelled before creating subchannel", child_errors, + GPR_ARRAY_SIZE(child_errors)); /* if due to deadline, attach the deadline exceeded status to the error */ if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) { cancellation_error = @@ -976,8 +1005,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - gpr_atm_rel_store(&calld->subchannel_call, - (gpr_atm)(uintptr_t)subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); fail_locked(exec_ctx, calld, new_error); @@ -990,8 +1019,9 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; - grpc_subchannel_call *subchannel_call = GET_CALL(calld); - if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) { + grpc_subchannel_call *subchannel_call = + get_call_or_error(calld).subchannel_call; + if (subchannel_call == NULL) { return NULL; } else { return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); @@ -1133,7 +1163,7 @@ static bool pick_subchannel_locked( cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, - grpc_combiner_scheduler(chand->combiner, true)); + grpc_combiner_scheduler(chand->combiner)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { @@ -1150,58 +1180,45 @@ static void start_transport_stream_op_batch_locked_inner( grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_subchannel_call *call; /* need to recheck that another thread hasn't set the call */ - call = GET_CALL(calld); - if (call == CANCELLED_CALL) { + call_or_error coe = get_call_or_error(calld); + if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + exec_ctx, op, GRPC_ERROR_REF(coe.error)); /* early out */ return; } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); /* early out */ return; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_stream) { - if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, - (gpr_atm)(uintptr_t)CANCELLED_CALL)) { - /* recurse to retry */ - start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); - /* early out */ - return; + grpc_error *error = op->payload->cancel_stream.cancel_error; + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any ops are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first op does get passed down. */ + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); + if (calld->pick_pending) { + cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } else { - /* Stash a copy of cancel_error in our call data, so that we can use - it for subsequent operations. This ensures that if the call is - cancelled before any ops are passed down (e.g., if the deadline - is in the past when the call starts), we can return the right - error to the caller when the first op does get passed down. */ - calld->cancel_error = - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); - if (calld->pick_pending) { - cancel_pick_locked( - exec_ctx, elem, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - } else { - fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - } - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - /* early out */ - return; + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); } + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, + GRPC_ERROR_REF(error)); + /* early out */ + return; } /* if we don't have a subchannel, try to get one */ if (!calld->pick_pending && calld->connected_subchannel == NULL && op->send_initial_metadata) { calld->pick_pending = true; grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, - grpc_combiner_scheduler(chand->combiner, true)); + grpc_combiner_scheduler(chand->combiner)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so @@ -1215,12 +1232,13 @@ static void start_transport_stream_op_batch_locked_inner( calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, - (gpr_atm)CANCELLED_CALL); grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); + set_call_or_error(calld, + (call_or_error){.error = GRPC_ERROR_REF(error)}); fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(error)); return; // Early out. } } else { @@ -1240,8 +1258,8 @@ static void start_transport_stream_op_batch_locked_inner( .context = calld->subchannel_call_context}; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - gpr_atm_rel_store(&calld->subchannel_call, - (gpr_atm)(uintptr_t)subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (error != GRPC_ERROR_NONE) { fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); @@ -1320,17 +1338,17 @@ static void cc_start_transport_stream_op_batch( op); } /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); + call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); - if (call == CANCELLED_CALL) { + if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + exec_ctx, op, GRPC_ERROR_REF(coe.error)); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; @@ -1339,10 +1357,9 @@ static void cc_start_transport_stream_op_batch( GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); op->handler_private.extra_arg = elem; grpc_closure_sched( - exec_ctx, - grpc_closure_init(&op->handler_private.closure, - start_transport_stream_op_batch_locked, op, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_init(&op->handler_private.closure, + start_transport_stream_op_batch_locked, op, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1379,12 +1396,14 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, if (calld->method_params != NULL) { method_parameters_unref(calld->method_params); } - GRPC_ERROR_UNREF(calld->cancel_error); - grpc_subchannel_call *call = GET_CALL(calld); - if (call != NULL && call != CANCELLED_CALL) { - grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + call_or_error coe = get_call_or_error(calld); + GRPC_ERROR_UNREF(coe.error); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call, + then_schedule_closure); then_schedule_closure = NULL; - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, + "client_channel_destroy_call"); } GPR_ASSERT(!calld->pick_pending); GPR_ASSERT(calld->waiting_ops_count == 0); @@ -1454,9 +1473,8 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); grpc_closure_sched( - exec_ctx, - grpc_closure_create(try_to_connect_locked, chand, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_create(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } return out; @@ -1593,6 +1611,6 @@ void grpc_client_channel_watch_connectivity_state( grpc_closure_sched( exec_ctx, grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, - grpc_combiner_scheduler(chand->combiner, true)), + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 7ac2cb1775..0083a8044f 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -89,11 +89,10 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { - grpc_closure_sched( - exec_ctx, - grpc_closure_create(shutdown_locked, policy, - grpc_combiner_scheduler(policy->combiner, false)), - GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, grpc_closure_create( + shutdown_locked, policy, + grpc_combiner_scheduler(policy->combiner)), + GRPC_ERROR_NONE); } else { grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 6eb0a9ef21..a23a56c052 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -756,7 +756,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_zalloc(sizeof(rr_connectivity_data)); grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); rr_connectivity->glb_policy = glb_policy; rr_connectivity->state = rr_state; @@ -1267,7 +1267,7 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, gpr_time_add(now, glb_policy->client_stats_report_interval); grpc_closure_init(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, next_client_load_report_time, &glb_policy->client_load_report_closure, now); @@ -1295,7 +1295,7 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, op.data.send_message.send_message = glb_policy->client_load_report_payload; grpc_closure_init(&glb_policy->client_load_report_closure, client_load_report_done_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_call_error call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); @@ -1401,13 +1401,13 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_closure_init(&glb_policy->lb_on_sent_initial_request, lb_on_sent_initial_request_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_closure_init(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_closure_init(&glb_policy->lb_on_response_received, lb_on_response_received_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); gpr_backoff_init(&glb_policy->lb_call_backoff_state, GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, @@ -1708,9 +1708,9 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); - grpc_closure_init( - &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, - glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_closure_init(&glb_policy->lb_on_call_retry, + lb_call_on_retry_timer_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->retry_timer_active = true; grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry, now); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index b6f86255df..691c07cb7f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -670,7 +670,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, - grpc_combiner_scheduler(args->combiner, false)); + grpc_combiner_scheduler(args->combiner)); return &p->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index c9758eef88..2e9f029fc8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -764,7 +764,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, sd->subchannel = subchannel; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner, false)); + grpc_combiner_scheduler(args->combiner)); /* use some sentinel value outside of the range of * grpc_connectivity_state to signal an undefined previous state. We * won't be referring to this value again and it'll be overwritten after diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index 578e8d697f..861b809b19 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -284,10 +284,10 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx, GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); grpc_closure_init(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); grpc_closure_init(&r->dns_ares_on_resolved_locked, dns_ares_on_resolved_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c index ebe6a07215..f6d7176f5d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c @@ -194,7 +194,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "retrying immediately"); } grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { @@ -216,7 +216,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, grpc_closure_create(dns_on_resolved_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)), + grpc_combiner_scheduler(r->base.combiner)), &r->addresses); } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index cb9b1f07c2..a93ba3fa96 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -173,10 +173,9 @@ void grpc_fake_resolver_response_generator_set_response( GPR_ASSERT(generator->resolver != NULL); generator->next_response = grpc_channel_args_copy(next_response); grpc_closure_sched( - exec_ctx, - grpc_closure_create( - set_response_cb, generator, - grpc_combiner_scheduler(generator->resolver->base.combiner, false)), + exec_ctx, grpc_closure_create(set_response_cb, generator, + grpc_combiner_scheduler( + generator->resolver->base.combiner)), GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index f3268bcfca..f112ce1b76 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -50,7 +50,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -267,7 +266,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ep = ep; /* one ref is for destroy */ gpr_ref_init(&t->refs, 1); - t->combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep)); + t->combiner = grpc_combiner_create(); t->peer_string = grpc_endpoint_get_peer(ep); t->endpoint_reading = 1; t->next_stream_id = is_client ? 1 : 2; @@ -288,29 +287,29 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->write_action, write_action, t, grpc_schedule_on_exec_ctx); grpc_closure_init(&t->read_action_locked, read_action_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->retry_initiate_ping_locked, retry_initiate_ping_locked, - t, grpc_combiner_scheduler(t->combiner, false)); + t, grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, - t, grpc_combiner_scheduler(t->combiner, false)); + t, grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->start_keepalive_ping_locked, start_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->finish_keepalive_ping_locked, finish_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_closure_init(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired_locked, t, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); @@ -353,7 +352,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - grpc_chttp2_initiate_write(exec_ctx, t, false, "initial_write"); + grpc_chttp2_initiate_write(exec_ctx, t, "initial_write"); } /* configure http2 the way we like it */ @@ -565,7 +564,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); + grpc_chttp2_initiate_write(exec_ctx, t, "init"); post_benign_reclaimer(exec_ctx, t); } @@ -583,9 +582,9 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_closure_sched(exec_ctx, grpc_closure_create( - destroy_transport_locked, t, - grpc_combiner_scheduler(t->combiner, false)), + grpc_closure_sched(exec_ctx, + grpc_closure_create(destroy_transport_locked, t, + grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); } @@ -678,7 +677,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_slice_buffer_init(&s->frame_storage); s->pending_byte_stream = false; grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s, - grpc_combiner_scheduler(t->combiner, false)); + grpc_combiner_scheduler(t->combiner)); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); @@ -762,7 +761,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, - grpc_combiner_scheduler(t->combiner, false)), + grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("destroy_stream", 0); } @@ -800,8 +799,6 @@ static const char *write_state_name(grpc_chttp2_write_state st) { return "WRITING"; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: return "WRITING+MORE"; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: - return "WRITING+MORE+COVERED"; } GPR_UNREACHABLE_CODE(return "UNKNOWN"); } @@ -824,8 +821,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - bool covered_by_poller, const char *reason) { + grpc_chttp2_transport *t, const char *reason) { GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); switch (t->write_state) { @@ -834,28 +830,16 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_closure_sched( exec_ctx, - grpc_closure_init( - &t->write_action_begin_locked, write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)), + grpc_closure_init(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: - set_write_state( - exec_ctx, t, - covered_by_poller - ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER - : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, - reason); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, + reason); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: - if (covered_by_poller) { - set_write_state( - exec_ctx, t, - GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, - reason); - } - break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: break; } GPR_TIMER_END("grpc_chttp2_initiate_write", 0); @@ -871,10 +855,10 @@ void grpc_chttp2_become_writable( case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK: break; case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED: - grpc_chttp2_initiate_write(exec_ctx, t, true, reason); + grpc_chttp2_initiate_write(exec_ctx, t, reason); break; case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED: - grpc_chttp2_initiate_write(exec_ctx, t, false, reason); + grpc_chttp2_initiate_write(exec_ctx, t, reason); break; } } @@ -911,7 +895,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { grpc_endpoint_write( exec_ctx, t->ep, &t->outbuf, grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner, false))); + grpc_combiner_scheduler(t->combiner))); GPR_TIMER_END("write_action", 0); } @@ -947,21 +931,9 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); grpc_closure_run( exec_ctx, - grpc_closure_init( - &t->write_action_begin_locked, write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner, false)), - GRPC_ERROR_NONE); - break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: - GPR_TIMER_MARK("state=writing_stale_with_poller", 0); - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, - "continue writing [covered]"); - GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_closure_run( - exec_ctx, grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner, true)), + grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); break; } @@ -984,7 +956,7 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) { t->settings[GRPC_LOCAL_SETTINGS][id] = use_value; t->dirtied_local_settings = 1; - grpc_chttp2_initiate_write(exec_ctx, t, false, "push_setting"); + grpc_chttp2_initiate_write(exec_ctx, t, "push_setting"); } } @@ -1380,7 +1352,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->next_message_end_offset = s->flow_controlled_bytes_written + (int64_t)s->flow_controlled_buffer.length + (int64_t)len; - s->complete_fetch_covered_by_poller = op->covered_by_poller; if (flags & GRPC_WRITE_BUFFER_HINT) { s->next_message_end_offset -= t->write_buffer_size; s->write_buffering = true; @@ -1502,9 +1473,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, - grpc_closure_init( - &op->handler_private.closure, perform_stream_op_locked, op, - grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), + grpc_closure_init(&op->handler_private.closure, perform_stream_op_locked, + op, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); } @@ -1531,7 +1501,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_NONE); if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, GRPC_ERROR_NONE)) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, "send_ping"); } } @@ -1539,7 +1509,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = tp; t->ping_state.is_delayed_ping_timer_set = false; - grpc_chttp2_initiate_write(exec_ctx, t, false, "retry_send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping"); } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1554,7 +1524,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings"); + grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings"); } } @@ -1567,7 +1537,7 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &slice, &http_error); grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error, grpc_slice_ref_internal(slice), &t->qbuf); - grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent"); + grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent"); GRPC_ERROR_UNREF(error); } @@ -1593,12 +1563,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t = op->handler_private.extra_arg; grpc_error *close_transport = op->disconnect_with_error; - if (op->on_connectivity_state_change != NULL) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change); - } - if (op->goaway_error) { send_goaway(exec_ctx, t, op->goaway_error); } @@ -1622,6 +1586,12 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, op->send_ping); } + if (op->on_connectivity_state_change != NULL) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); + } + if (close_transport != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, close_transport); } @@ -1638,11 +1608,11 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(msg); op->handler_private.extra_arg = gt; GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - grpc_closure_sched( - exec_ctx, grpc_closure_init(&op->handler_private.closure, - perform_transport_op_locked, op, - grpc_combiner_scheduler(t->combiner, false)), - GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, + grpc_closure_init(&op->handler_private.closure, + perform_transport_op_locked, op, + grpc_combiner_scheduler(t->combiner)), + GRPC_ERROR_NONE); } /******************************************************************************* @@ -1797,7 +1767,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream"); } } if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) { @@ -2110,7 +2080,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &s->stats.outgoing)); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error); - grpc_chttp2_initiate_write(exec_ctx, t, false, "close_from_api"); + grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api"); } typedef struct { @@ -2641,9 +2611,9 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, bs->next_action.on_complete = on_complete; grpc_closure_sched( exec_ctx, - grpc_closure_init( - &bs->next_action.closure, incoming_byte_stream_next_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner, false)), + grpc_closure_init(&bs->next_action.closure, + incoming_byte_stream_next_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_next", 0); return false; @@ -2698,10 +2668,9 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_closure_sched( - exec_ctx, - grpc_closure_init( - &bs->destroy_action, incoming_byte_stream_destroy_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner, false)), + exec_ctx, grpc_closure_init( + &bs->destroy_action, incoming_byte_stream_destroy_locked, + bs, grpc_combiner_scheduler(bs->transport->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_destroy", 0); } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index f09ca60739..c6269e29f9 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -132,7 +132,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks)); } t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes; - grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response"); + grpc_chttp2_initiate_write(exec_ctx, t, "ping response"); } } } diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 8ed72dddca..579eb6d0c0 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -124,8 +124,7 @@ grpc_error *grpc_chttp2_window_update_parser_parse( received_update); bool is_zero = t->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_initiate_write(exec_ctx, t, false, - "new_global_flow_control"); + grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control"); } } } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index bb98bc4a79..0576cd1e79 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1664,7 +1664,7 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, false, "force_rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream"); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); @@ -1712,9 +1712,9 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, and can avoid the extra write */ GRPC_CHTTP2_STREAM_REF(s, "final_rst"); grpc_closure_sched( - exec_ctx, grpc_closure_create(force_client_rst_stream, s, - grpc_combiner_finally_scheduler( - t->combiner, false)), + exec_ctx, + grpc_closure_create(force_client_rst_stream, s, + grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8d66e396ee..211d6aacf1 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -73,7 +73,6 @@ typedef enum { GRPC_CHTTP2_WRITE_STATE_IDLE, GRPC_CHTTP2_WRITE_STATE_WRITING, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, - GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, } grpc_chttp2_write_state; typedef enum { @@ -458,7 +457,6 @@ struct grpc_chttp2_stream { grpc_slice fetching_slice; int64_t next_message_end_offset; int64_t flow_controlled_bytes_written; - bool complete_fetch_covered_by_poller; grpc_closure complete_fetch_locked; grpc_closure *fetching_send_message_finished; @@ -549,8 +547,7 @@ struct grpc_chttp2_stream { The actual call chain is documented in the implementation of this function. */ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - bool covered_by_poller, const char *reason); + grpc_chttp2_transport *t, const char *reason); typedef enum { GRPC_CHTTP2_NOTHING_TO_WRITE, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 1a676e2259..0b2891d546 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -433,7 +433,7 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, incoming_frame_size); if (t->incoming_window <= target_incoming_window / 2) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); + grpc_chttp2_initiate_write(exec_ctx, t, "flow_control"); } return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 5be1092946..048fe2ed32 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -206,8 +206,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) && stream_ref_if_not_destroyed(&s->refcount->refs)) { - grpc_chttp2_initiate_write(exec_ctx, t, false, - "transport.read_flow_control"); + grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control"); } } } diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 863f22c614..075f4b2092 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -39,7 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include "src/core/lib/iomgr/workqueue.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false); @@ -56,93 +56,47 @@ grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false); struct grpc_combiner { grpc_combiner *next_combiner_on_this_exec_ctx; - grpc_workqueue *optional_workqueue; - grpc_closure_scheduler uncovered_scheduler; - grpc_closure_scheduler covered_scheduler; - grpc_closure_scheduler uncovered_finally_scheduler; - grpc_closure_scheduler covered_finally_scheduler; + grpc_closure_scheduler scheduler; + grpc_closure_scheduler finally_scheduler; gpr_mpscq queue; + // either: + // a pointer to the initiating exec ctx if that is the only exec_ctx that has + // ever queued to this combiner, or NULL. If this is non-null, it's not + // dereferencable (since the initiating exec_ctx may have gone out of scope) + gpr_atm initiating_exec_ctx_or_null; // state is: // lower bit - zero if orphaned (STATE_UNORPHANED) // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) gpr_atm state; - // number of elements in the list that are covered by a poller: if >0, we can - // offload safely - gpr_atm elements_covered_by_poller; bool time_to_execute_final_list; - bool final_list_covered_by_poller; grpc_closure_list final_list; grpc_closure offload; gpr_refcount refs; }; -static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, grpc_error *error); -static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, +static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error); +static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); -static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, - grpc_error *error); -static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, - grpc_error *error); - -static const grpc_closure_scheduler_vtable scheduler_uncovered = { - combiner_exec_uncovered, combiner_exec_uncovered, - "combiner:immediately:uncovered"}; -static const grpc_closure_scheduler_vtable scheduler_covered = { - combiner_exec_covered, combiner_exec_covered, - "combiner:immediately:covered"}; -static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = { - combiner_finally_exec_uncovered, combiner_finally_exec_uncovered, - "combiner:finally:uncovered"}; -static const grpc_closure_scheduler_vtable finally_scheduler_covered = { - combiner_finally_exec_covered, combiner_finally_exec_covered, - "combiner:finally:covered"}; -static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); - -typedef struct { - grpc_error *error; - bool covered_by_poller; -} error_data; - -static uintptr_t pack_error_data(error_data d) { - return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0); -} - -static error_data unpack_error_data(uintptr_t p) { - return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1}; -} +static const grpc_closure_scheduler_vtable scheduler = { + combiner_exec, combiner_exec, "combiner:immediately"}; +static const grpc_closure_scheduler_vtable finally_scheduler = { + combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; -static bool is_covered_by_poller(grpc_combiner *lock) { - return lock->final_list_covered_by_poller || - gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0; -} - -#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d" -#define IS_COVERED_BY_POLLER_ARGS(lock) \ - (lock)->final_list_covered_by_poller, \ - gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \ - is_covered_by_poller((lock)) +static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { +grpc_combiner *grpc_combiner_create(void) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); gpr_ref_init(&lock->refs, 1); lock->next_combiner_on_this_exec_ctx = NULL; lock->time_to_execute_final_list = false; - lock->optional_workqueue = optional_workqueue; - lock->final_list_covered_by_poller = false; - lock->uncovered_scheduler.vtable = &scheduler_uncovered; - lock->covered_scheduler.vtable = &scheduler_covered; - lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered; - lock->covered_finally_scheduler.vtable = &finally_scheduler_covered; + lock->scheduler.vtable = &scheduler; + lock->finally_scheduler.vtable = &finally_scheduler; gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); - gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - grpc_closure_init(&lock->offload, offload, lock, - grpc_workqueue_scheduler(lock->optional_workqueue)); + grpc_closure_init(&lock->offload, offload, lock, grpc_executor_scheduler); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } @@ -151,7 +105,6 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock)); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); gpr_mpscq_destroy(&lock->queue); - GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner"); gpr_free(lock); } @@ -208,48 +161,40 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, } } -static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error, - bool covered_by_poller) { +#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ + ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ + offsetof(grpc_combiner, scheduler_name))) + +static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, + grpc_error *error) { GPR_TIMER_BEGIN("combiner.execute", 0); + grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, - cl, covered_by_poller, last)); - GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed - assert(cl->cb); - cl->error_data.scratch = - pack_error_data((error_data){error, covered_by_poller}); - if (covered_by_poller) { - gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1); - } - gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, + lock, cl, last)); if (last == 1) { + gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, + (gpr_atm)exec_ctx); // first element on this list: add it to the list of combiner locks // executing within this exec_ctx push_last_on_exec_ctx(exec_ctx, lock); + } else { + // there may be a race with setting here: if that happens, we may delay + // offload for one or two actions, and that's fine + gpr_atm initiator = + gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null); + if (initiator != 0 && initiator != (gpr_atm)exec_ctx) { + gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0); + } } + GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + assert(cl->cb); + cl->error_data.error = error; + gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); GPR_TIMER_END("combiner.execute", 0); } -#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ - ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ - offsetof(grpc_combiner, scheduler_name))) - -static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, - grpc_error *error) { - combiner_exec(exec_ctx, - COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl, - error, false); -} - -static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, - grpc_error *error) { - combiner_exec(exec_ctx, - COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl, - error, true); -} - static void move_next(grpc_exec_ctx *exec_ctx) { exec_ctx->active_combiner = exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; @@ -265,8 +210,7 @@ static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { move_next(exec_ctx); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock, - lock->optional_workqueue)); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock)); grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE); } @@ -278,18 +222,20 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { return false; } - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, - "C:%p grpc_combiner_continue_exec_ctx workqueue=%p " - "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT - " exec_ctx_ready_to_finish=%d " - "time_to_execute_final_list=%d", - lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock), - grpc_exec_ctx_ready_to_finish(exec_ctx), - lock->time_to_execute_final_list)); - - if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) && - grpc_exec_ctx_ready_to_finish(exec_ctx)) { + bool contended = + gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null) == 0; + + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_continue_exec_ctx " + "contended=%d " + "exec_ctx_ready_to_finish=%d " + "time_to_execute_final_list=%d", + lock, contended, + grpc_exec_ctx_ready_to_finish(exec_ctx), + lock->time_to_execute_final_list)); + + if (contended && grpc_exec_ctx_ready_to_finish(exec_ctx) && + grpc_executor_is_threaded()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on, and we have a workqueue (and // so can help the execution context out): schedule remaining work to be @@ -310,29 +256,23 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { // queue is in an inconsistent state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); - if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { - queue_offload(exec_ctx, lock); - } + queue_offload(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } GPR_TIMER_BEGIN("combiner.exec1", 0); grpc_closure *cl = (grpc_closure *)n; - error_data err = unpack_error_data(cl->error_data.scratch); + grpc_error *cl_err = cl->error_data.error; #ifndef NDEBUG cl->scheduled = false; #endif - cl->cb(exec_ctx, cl->cb_arg, err.error); - if (err.covered_by_poller) { - gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1); - } - GRPC_ERROR_UNREF(err.error); + cl->cb(exec_ctx, cl->cb_arg, cl_err); + GRPC_ERROR_UNREF(cl_err); GPR_TIMER_END("combiner.exec1", 0); } else { grpc_closure *c = lock->final_list.head; GPR_ASSERT(c != NULL); grpc_closure_list_init(&lock->final_list); - lock->final_list_covered_by_poller = false; int loops = 0; while (c != NULL) { GPR_TIMER_BEGIN("combiner.exec_1final", 0); @@ -398,20 +338,20 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error); -static void combiner_execute_finally(grpc_exec_ctx *exec_ctx, - grpc_combiner *lock, grpc_closure *closure, - grpc_error *error, - bool covered_by_poller) { - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock, - closure, exec_ctx->active_combiner, covered_by_poller)); +static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_error *error) { + grpc_combiner *lock = + COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_execute_finally c=%p; ac=%p", + lock, closure, exec_ctx->active_combiner)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - grpc_closure_sched( - exec_ctx, grpc_closure_create(enqueue_finally, closure, - grpc_combiner_scheduler(lock, false)), - error); + grpc_closure_sched(exec_ctx, + grpc_closure_create(enqueue_finally, closure, + grpc_combiner_scheduler(lock)), + error); GPR_TIMER_END("combiner.execute_finally", 0); return; } @@ -419,42 +359,20 @@ static void combiner_execute_finally(grpc_exec_ctx *exec_ctx, if (grpc_closure_list_empty(lock->final_list)) { gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); } - if (covered_by_poller) { - lock->final_list_covered_by_poller = true; - } grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error) { - combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error), false); -} - -static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, - grpc_closure *cl, - grpc_error *error) { - combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER( - cl, uncovered_finally_scheduler), - cl, error, false); -} - -static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, - grpc_closure *cl, grpc_error *error) { - combiner_execute_finally( - exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler), - cl, error, true); + combiner_finally_exec(exec_ctx, closure, GRPC_ERROR_REF(error)); } -grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner, - bool covered_by_poller) { - return covered_by_poller ? &combiner->covered_scheduler - : &combiner->uncovered_scheduler; +grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner) { + return &combiner->scheduler; } grpc_closure_scheduler *grpc_combiner_finally_scheduler( - grpc_combiner *combiner, bool covered_by_poller) { - return covered_by_poller ? &combiner->covered_finally_scheduler - : &combiner->uncovered_finally_scheduler; + grpc_combiner *combiner) { + return &combiner->finally_scheduler; } diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 6ab7a2b26b..5aa8443667 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -48,7 +48,7 @@ // Initialize the lock, with an optional workqueue to shift load to when // necessary -grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); +grpc_combiner *grpc_combiner_create(void); //#define GRPC_COMBINER_REFCOUNT_DEBUG #ifdef GRPC_COMBINER_REFCOUNT_DEBUG @@ -71,11 +71,9 @@ grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); // Fetch a scheduler to schedule closures against -grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, - bool covered_by_poller); +grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock); // Scheduler to execute \a action within the lock just prior to unlocking. -grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock, - bool covered_by_poller); +grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock); bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index bf6e98146a..60b8410a45 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -69,10 +69,6 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) { int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); } -grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { - return ep->vtable->get_workqueue(ep); -} - grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) { return ep->vtable->get_resource_user(ep); } diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 740357ecc5..f8cee0158b 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -52,7 +52,6 @@ struct grpc_endpoint_vtable { grpc_slice_buffer *slices, grpc_closure *cb); void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb); - grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep); void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -78,9 +77,6 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep); */ int grpc_endpoint_get_fd(grpc_endpoint *ep); -/* Retrieve a reference to the workqueue associated with this endpoint */ -grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); - /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index ad69f808cd..39785f6d56 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -58,7 +58,6 @@ #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -130,7 +129,9 @@ struct grpc_pollset { * Pollset-set Declarations */ -struct grpc_pollset_set {}; +struct grpc_pollset_set { + char unused; +}; /******************************************************************************* * Common helpers @@ -283,10 +284,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - return (grpc_workqueue *)0xb0b51ed; -} - static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { grpc_lfev_set_ready(exec_ctx, &fd->read_closure); @@ -313,8 +310,6 @@ GPR_TLS_DECL(g_current_thread_worker); static gpr_atm g_active_poller; static pollset_neighbourhood *g_neighbourhoods; static size_t g_num_neighbourhoods; -static gpr_mu g_wq_mu; -static grpc_closure_list g_wq_items; /* Return true if first in list */ static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { @@ -363,8 +358,6 @@ static grpc_error *pollset_global_init(void) { gpr_atm_no_barrier_store(&g_active_poller, 0); global_wakeup_fd.read_fd = -1; grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd); - gpr_mu_init(&g_wq_mu); - g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; if (err != GRPC_ERROR_NONE) return err; struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), .data.ptr = &global_wakeup_fd}; @@ -383,7 +376,6 @@ static grpc_error *pollset_global_init(void) { static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); - gpr_mu_destroy(&g_wq_mu); if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); for (size_t i = 0; i < g_num_neighbourhoods; i++) { gpr_mu_destroy(&g_neighbourhoods[i].mu); @@ -507,9 +499,6 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, for (int i = 0; i < r; i++) { void *data_ptr = events[i].data.ptr; if (data_ptr == &global_wakeup_fd) { - gpr_mu_lock(&g_wq_mu); - grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list); - gpr_mu_unlock(&g_wq_mu); append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else { @@ -792,84 +781,6 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) {} /******************************************************************************* - * Workqueue Definitions - */ - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) {} -#endif - -static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - // find a neighbourhood to wakeup - bool scheduled = false; - size_t initial_neighbourhood = choose_neighbourhood(); - for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) { - pollset_neighbourhood *neighbourhood = - &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods]; - if (gpr_mu_trylock(&neighbourhood->mu)) { - if (neighbourhood->active_root != NULL) { - grpc_pollset *inspect = neighbourhood->active_root; - do { - if (gpr_mu_trylock(&inspect->mu)) { - if (inspect->root_worker != NULL) { - grpc_pollset_worker *inspect_worker = inspect->root_worker; - do { - if (inspect_worker->kick_state == UNKICKED) { - inspect_worker->kick_state = KICKED; - grpc_closure_list_append( - &inspect_worker->schedule_on_end_work, closure, error); - if (inspect_worker->initialized_cv) { - gpr_cv_signal(&inspect_worker->cv); - } - scheduled = true; - } - inspect_worker = inspect_worker->next; - } while (!scheduled && inspect_worker != inspect->root_worker); - } - gpr_mu_unlock(&inspect->mu); - } - inspect = inspect->next; - } while (!scheduled && inspect != neighbourhood->active_root); - } - gpr_mu_unlock(&neighbourhood->mu); - } - } - if (!scheduled) { - gpr_mu_lock(&g_wq_mu); - grpc_closure_list_append(&g_wq_items, closure, error); - gpr_mu_unlock(&g_wq_mu); - GRPC_LOG_IF_ERROR("workqueue_scheduler", - grpc_wakeup_fd_wakeup(&global_wakeup_fd)); - } -} - -static const grpc_closure_scheduler_vtable - singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched, - "epoll1_workqueue"}; - -static grpc_closure_scheduler singleton_workqueue_scheduler = { - &singleton_workqueue_scheduler_vtable}; - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - return &singleton_workqueue_scheduler; -} - -/******************************************************************************* * Pollset-set Definitions */ @@ -920,7 +831,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -938,10 +848,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index d23bf6c06c..d7a2d35484 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -61,7 +61,6 @@ #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/env.h" @@ -184,13 +183,15 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +//#define PI_REFCOUNT_DEBUG + +#ifdef PI_REFCOUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) #define PI_UNREF(exec_ctx, p, r) \ pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) -#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */ +#else /* defined(PI_REFCOUNT_DEBUG) */ #define PI_ADD_REF(p, r) pi_add_ref((p)) #define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) @@ -204,8 +205,6 @@ typedef struct worker_node { /* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { - grpc_closure_scheduler workqueue_scheduler; - gpr_mu mu; /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement the refcount. @@ -226,15 +225,6 @@ typedef struct polling_island { /* Number of threads currently polling on this island */ gpr_atm poller_count; - /* Mutex guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_mu workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; /* The list of workers waiting to do polling on this polling island */ gpr_mu worker_list_mu; @@ -323,8 +313,6 @@ static __thread polling_island *g_current_thread_polling_island; /* Forward declaration */ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -337,13 +325,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - static void pi_add_ref(polling_island *pi); static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#ifdef PI_REFCOUNT_DEBUG static void pi_add_ref_dbg(polling_island *pi, const char *reason, const char *file, int line) { long old_cnt = gpr_atm_acq_load(&pi->ref_count); @@ -359,36 +344,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } - -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - pi_add_ref_dbg((polling_island *)workqueue, reason, file, line); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - pi_add_ref((polling_island *)workqueue); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - pi_unref(exec_ctx, (polling_island *)workqueue); - } -} #endif static void pi_add_ref(polling_island *pi) { @@ -592,17 +547,12 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, *error = GRPC_ERROR_NONE; pi = gpr_malloc(sizeof(*pi)); - pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; gpr_mu_init(&pi->mu); pi->fd_cnt = 0; pi->fd_capacity = 0; pi->fds = NULL; pi->epoll_fd = -1; - gpr_mu_init(&pi->workqueue_read_mu); - gpr_mpscq_init(&pi->workqueue_items); - gpr_atm_rel_store(&pi->workqueue_item_count, 0); - gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->poller_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); @@ -610,11 +560,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, gpr_mu_init(&pi->worker_list_mu); worker_node_init(&pi->worker_list_head); - if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd), - err_desc)) { - goto done; - } - pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { @@ -622,8 +567,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, goto done; } - polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error); - if (initial_fd != NULL) { polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); } @@ -642,11 +585,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { if (pi->epoll_fd >= 0) { close(pi->epoll_fd); } - GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0); - gpr_mu_destroy(&pi->workqueue_read_mu); - gpr_mpscq_destroy(&pi->workqueue_items); gpr_mu_destroy(&pi->mu); - grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd); gpr_mu_destroy(&pi->worker_list_mu); GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head)); @@ -794,45 +733,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) { } } -static void workqueue_maybe_wakeup(polling_island *pi) { - /* If this thread is the current poller, then it may be that it's about to - decrement the current poller count, so we need to look past this thread */ - bool is_current_poller = (g_current_thread_polling_island == pi); - gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0; - gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count); - /* Only issue a wakeup if it's likely that some poller could come in and take - it right now. Note that since we do an anticipatory mpscq_pop every poll - loop, it's ok if we miss the wakeup here, as we'll get the work item when - the next poller enters anyway. */ - if (current_pollers > min_current_pollers_for_wakeup) { - GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", - grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd)); - } -} - -static void workqueue_move_items_to_parent(polling_island *q) { - polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to); - if (p == NULL) { - return; - } - gpr_mu_lock(&q->workqueue_read_mu); - int num_added = 0; - while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items); - if (n != NULL) { - gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1); - gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1); - gpr_mpscq_push(&p->workqueue_items, n); - num_added++; - } - } - gpr_mu_unlock(&q->workqueue_read_mu); - if (num_added > 0) { - workqueue_maybe_wakeup(p); - } - workqueue_move_items_to_parent(p); -} - static polling_island *polling_island_merge(polling_island *p, polling_island *q, grpc_error **error) { @@ -857,8 +757,6 @@ static polling_island *polling_island_merge(polling_island *p, /* Add the 'merged_to' link from p --> q */ gpr_atm_rel_store(&p->merged_to, (gpr_atm)q); PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */ - - workqueue_move_items_to_parent(p); } /* else if p == q, nothing needs to be done */ @@ -869,32 +767,6 @@ static polling_island *polling_island_merge(polling_island *p, return q; } -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler; - /* take a ref to the workqueue: otherwise it can happen that whatever events - * this kicks off ends up destroying the workqueue before this function - * completes */ - GRPC_WORKQUEUE_REF(workqueue, "enqueue"); - polling_island *pi = (polling_island *)workqueue; - gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_maybe_wakeup(pi); - } - workqueue_move_items_to_parent(pi); - GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); - GPR_TIMER_END("workqueue.enqueue", 0); -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - polling_island *pi = (polling_island *)workqueue; - return workqueue == NULL ? grpc_schedule_on_exec_ctx - : &pi->workqueue_scheduler; -} - static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -1153,14 +1025,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - gpr_mu_lock(&fd->po.mu); - grpc_workqueue *workqueue = - GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue"); - gpr_mu_unlock(&fd->po.mu); - return workqueue; -} - /******************************************************************************* * Pollset Definitions */ @@ -1432,33 +1296,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { gpr_mu_destroy(&pollset->po.mu); } -static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, - polling_island *pi) { - if (gpr_mu_trylock(&pi->workqueue_read_mu)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items); - gpr_mu_unlock(&pi->workqueue_read_mu); - if (n != NULL) { - if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) { - workqueue_maybe_wakeup(pi); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - return true; - } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - workqueue_maybe_wakeup(pi); - } - } - return false; -} - /* NOTE: This function may modify 'now' */ static bool acquire_polling_lease(grpc_pollset_worker *worker, polling_island *pi, gpr_timespec deadline, @@ -1594,12 +1431,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &pi->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), - err_desc); - maybe_do_workqueue_work(exec_ctx, pi); - } else if (data_ptr == &polling_island_wakeup_fd) { + if (data_ptr == &polling_island_wakeup_fd) { GRPC_POLLING_TRACE( "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " "%d) got merged", @@ -1675,15 +1507,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, PI_ADD_REF(pi, "ps_work"); gpr_mu_unlock(&pollset->po.mu); - /* If we get some workqueue work to do, it might end up completing an item on - the completion queue, so there's no need to poll... so we skip that and - redo the complete loop to verify */ - if (!maybe_do_workqueue_work(exec_ctx, pi)) { - g_current_thread_polling_island = pi; - pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, - deadline, sig_mask, error); - g_current_thread_polling_island = NULL; - } + g_current_thread_polling_island = pi; + pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, deadline, + sig_mask, error); + g_current_thread_polling_island = NULL; GPR_ASSERT(pi != NULL); @@ -2036,7 +1863,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -2054,10 +1880,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c index bb44321922..c56b47be11 100644 --- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c +++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c @@ -61,7 +61,6 @@ #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -109,23 +108,22 @@ static void fd_global_shutdown(void); * epoll set Declarations */ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +//#define EPS_REFCOUNT_DEBUG + +#ifdef EPS_REFCOUNT_DEBUG #define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__) #define EPS_UNREF(exec_ctx, p, r) \ eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) -#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */ +#else /* defined(EPS_REFCOUNT_DEBUG) */ #define EPS_ADD_REF(p, r) eps_add_ref((p)) #define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p)) #endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */ -/* This is also used as grpc_workqueue (by directly casting it) */ typedef struct epoll_set { - grpc_closure_scheduler workqueue_scheduler; - /* Mutex poller should acquire to poll this. This enforces that only one * poller can be polling on epoll_set at any time */ gpr_mu mu; @@ -139,15 +137,6 @@ typedef struct epoll_set { /* Number of threads currently polling on this epoll set*/ gpr_atm poller_count; - /* Mutex guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_mu workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; /* Is the epoll set shutdown */ gpr_atm is_shutdown; @@ -181,7 +170,9 @@ struct grpc_pollset { /******************************************************************************* * Pollset-set Declarations */ -struct grpc_pollset_set {}; +struct grpc_pollset_set { + char unused; +}; /***************************************************************************** * Dedicated polling threads and pollsets - Declarations @@ -235,8 +226,6 @@ static __thread epoll_set *g_current_thread_epoll_set; /* Forward declaration */ static void epoll_set_delete(epoll_set *eps); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -249,13 +238,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - static void eps_add_ref(epoll_set *eps); static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#ifdef EPS_REFCOUNT_DEBUG static void eps_add_ref_dbg(epoll_set *eps, const char *reason, const char *file, int line) { long old_cnt = gpr_atm_acq_load(&eps->ref_count); @@ -271,36 +257,6 @@ static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps, gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)eps, old_cnt, (old_cnt - 1), reason, file, line); } - -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - eps_add_ref_dbg((epoll_set *)workqueue, reason, file, line); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - eps_unref_dbg(exec_ctx, (epoll_set *)workqueue, reason, file, line); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - eps_add_ref((epoll_set *)workqueue); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - eps_unref(exec_ctx, (epoll_set *)workqueue); - } -} #endif static void eps_add_ref(epoll_set *eps) { @@ -394,24 +350,15 @@ static epoll_set *epoll_set_create(grpc_error **error) { *error = GRPC_ERROR_NONE; eps = gpr_malloc(sizeof(*eps)); - eps->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; eps->epoll_fd = -1; gpr_mu_init(&eps->mu); - gpr_mu_init(&eps->workqueue_read_mu); - gpr_mpscq_init(&eps->workqueue_items); - gpr_atm_rel_store(&eps->workqueue_item_count, 0); gpr_atm_rel_store(&eps->ref_count, 0); gpr_atm_rel_store(&eps->poller_count, 0); gpr_atm_rel_store(&eps->is_shutdown, false); - if (!append_error(error, grpc_wakeup_fd_init(&eps->workqueue_wakeup_fd), - err_desc)) { - goto done; - } - eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (eps->epoll_fd < 0) { @@ -419,8 +366,6 @@ static epoll_set *epoll_set_create(grpc_error **error) { goto done; } - epoll_set_add_wakeup_fd_locked(eps, &eps->workqueue_wakeup_fd, error); - done: if (*error != GRPC_ERROR_NONE) { epoll_set_delete(eps); @@ -434,57 +379,11 @@ static void epoll_set_delete(epoll_set *eps) { close(eps->epoll_fd); } - GPR_ASSERT(gpr_atm_no_barrier_load(&eps->workqueue_item_count) == 0); gpr_mu_destroy(&eps->mu); - gpr_mu_destroy(&eps->workqueue_read_mu); - gpr_mpscq_destroy(&eps->workqueue_items); - grpc_wakeup_fd_destroy(&eps->workqueue_wakeup_fd); gpr_free(eps); } -static void workqueue_maybe_wakeup(epoll_set *eps) { - /* If this thread is the current poller, then it may be that it's about to - decrement the current poller count, so we need to look past this thread */ - bool is_current_poller = (g_current_thread_epoll_set == eps); - gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0; - gpr_atm current_pollers = gpr_atm_no_barrier_load(&eps->poller_count); - /* Only issue a wakeup if it's likely that some poller could come in and take - it right now. Note that since we do an anticipatory mpscq_pop every poll - loop, it's ok if we miss the wakeup here, as we'll get the work item when - the next poller enters anyway. */ - if (current_pollers > min_current_pollers_for_wakeup) { - GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", - grpc_wakeup_fd_wakeup(&eps->workqueue_wakeup_fd)); - } -} - -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler; - /* take a ref to the workqueue: otherwise it can happen that whatever events - * this kicks off ends up destroying the workqueue before this function - * completes */ - GRPC_WORKQUEUE_REF(workqueue, "enqueue"); - epoll_set *eps = (epoll_set *)workqueue; - gpr_atm last = gpr_atm_no_barrier_fetch_add(&eps->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&eps->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_maybe_wakeup(eps); - } - - GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); - GPR_TIMER_END("workqueue.enqueue", 0); -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - epoll_set *eps = (epoll_set *)workqueue; - return workqueue == NULL ? grpc_schedule_on_exec_ctx - : &eps->workqueue_scheduler; -} - static grpc_error *epoll_set_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -680,8 +579,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } - /******************************************************************************* * Pollset Definitions */ @@ -865,32 +762,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } -static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, epoll_set *eps) { - if (gpr_mu_trylock(&eps->workqueue_read_mu)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&eps->workqueue_items); - gpr_mu_unlock(&eps->workqueue_read_mu); - if (n != NULL) { - if (gpr_atm_full_fetch_add(&eps->workqueue_item_count, -1) > 1) { - workqueue_maybe_wakeup(eps); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - return true; - } else if (gpr_atm_no_barrier_load(&eps->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - workqueue_maybe_wakeup(eps); - } - } - return false; -} - /* Blocking call */ static void acquire_epoll_lease(epoll_set *eps) { if (g_num_threads_per_eps > 1) { @@ -934,12 +805,7 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &eps->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&eps->workqueue_wakeup_fd), - err_desc); - maybe_do_workqueue_work(exec_ctx, eps); - } else if (data_ptr == &epoll_set_wakeup_fd) { + if (data_ptr == &epoll_set_wakeup_fd) { gpr_atm_rel_store(&eps->is_shutdown, 1); gpr_log(GPR_INFO, "pollset poller: shutdown set"); } else { @@ -966,18 +832,13 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps, epoll set. */ epoll_fd = eps->epoll_fd; - /* If we get some workqueue work to do, it might end up completing an item on - the completion queue, so there's no need to poll... so we skip that and - redo the complete loop to verify */ - if (!maybe_do_workqueue_work(exec_ctx, eps)) { - gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1); - g_current_thread_epoll_set = eps; + gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1); + g_current_thread_epoll_set = eps; - do_epoll_wait(exec_ctx, epoll_fd, eps, error); + do_epoll_wait(exec_ctx, epoll_fd, eps, error); - g_current_thread_epoll_set = NULL; - gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1); - } + g_current_thread_epoll_set = NULL; + gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1); GPR_TIMER_END("epoll_set_work", 0); } @@ -1120,7 +981,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1138,10 +998,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 7cb6085e25..b1dad14ba4 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -59,7 +59,6 @@ #include "src/core/lib/iomgr/sys_epoll_wrapper.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/spinlock.h" @@ -139,17 +138,6 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; - grpc_closure_scheduler workqueue_scheduler; - /* Spinlock guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_spinlock workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ @@ -172,12 +160,6 @@ struct grpc_fd { static void fd_global_init(void); static void fd_global_shutdown(void); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); - -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - /******************************************************************************* * Pollset Declarations */ @@ -347,13 +329,6 @@ static grpc_fd *fd_create(int fd, const char *name) { grpc_lfev_init(&new_fd->write_closure); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); - GRPC_LOG_IF_ERROR("fd_create", - grpc_wakeup_fd_init(&new_fd->workqueue_wakeup_fd)); - new_fd->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; - new_fd->workqueue_read_mu = GPR_SPINLOCK_INITIALIZER; - gpr_mpscq_init(&new_fd->workqueue_items); - gpr_atm_no_barrier_store(&new_fd->workqueue_item_count, 0); - new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; @@ -446,91 +421,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - REF_BY(fd, 2, "return_workqueue"); - return (grpc_workqueue *)fd; -} - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - ref_by((grpc_fd *)workqueue, 2, file, line, reason); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - unref_by(exec_ctx, (grpc_fd *)workqueue, 2, file, line, reason); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - ref_by((grpc_fd *)workqueue, 2); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - unref_by(exec_ctx, (grpc_fd *)workqueue, 2); - } -} -#endif - -static void workqueue_wakeup(grpc_fd *fd) { - GRPC_LOG_IF_ERROR("workqueue_enqueue", - grpc_wakeup_fd_wakeup(&fd->workqueue_wakeup_fd)); -} - -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_fd *fd = (grpc_fd *)(((char *)closure->scheduler) - - offsetof(grpc_fd, workqueue_scheduler)); - REF_BY(fd, 2, "workqueue_enqueue"); - gpr_atm last = gpr_atm_no_barrier_fetch_add(&fd->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&fd->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_wakeup(fd); - } - UNREF_BY(exec_ctx, fd, 2, "workqueue_enqueue"); -} - -static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - /* handle spurious wakeups */ - if (!gpr_spinlock_trylock(&fd->workqueue_read_mu)) return; - gpr_mpscq_node *n = gpr_mpscq_pop(&fd->workqueue_items); - gpr_spinlock_unlock(&fd->workqueue_read_mu); - if (n != NULL) { - if (gpr_atm_full_fetch_add(&fd->workqueue_item_count, -1) > 1) { - workqueue_wakeup(fd); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - } else if (gpr_atm_no_barrier_load(&fd->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - workqueue_wakeup(fd); - } -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - return &((grpc_fd *)workqueue)->workqueue_scheduler; -} - /******************************************************************************* * Pollable Definitions */ @@ -596,22 +486,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { .data.ptr = fd}; if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { - case EEXIST: /* if this fd is already in the epoll set, the workqueue fd - must also be - just return */ - gpr_mu_unlock(&fd->orphaned_mu); - return GRPC_ERROR_NONE; - default: - append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); - } - } - struct epoll_event ev_wq = { - .events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLEXCLUSIVE), - .data.ptr = (void *)(1 + (intptr_t)fd)}; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->workqueue_wakeup_fd.read_fd, &ev_wq) != - 0) { - switch (errno) { - case EEXIST: /* if the workqueue fd is already in the epoll set we're ok - - no need to do anything special */ + case EEXIST: break; default: append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); @@ -874,29 +749,21 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); } else { - grpc_fd *fd = (grpc_fd *)(((intptr_t)data_ptr) & ~(intptr_t)1); - bool is_workqueue = (((intptr_t)data_ptr) & 1) != 0; + grpc_fd *fd = (grpc_fd *)data_ptr; bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (events[i].events & EPOLLOUT) != 0; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, - "PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d " + "PS:%p poll %p got fd %p: cancel=%d read=%d " "write=%d", - pollset, p, fd, is_workqueue, cancel, read_ev, write_ev); + pollset, p, fd, cancel, read_ev, write_ev); } - if (is_workqueue) { - append_error(&error, - grpc_wakeup_fd_consume_wakeup(&fd->workqueue_wakeup_fd), - err_desc); - fd_invoke_workqueue(exec_ctx, fd); - } else { - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd, pollset); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd, pollset); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); } } } @@ -1449,7 +1316,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1467,10 +1333,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 92c555b7ea..aba83a6da9 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -59,7 +59,6 @@ #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -177,7 +176,9 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +//#define PI_REFCOUNT_DEBUG + +#ifdef PI_REFCOUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) #define PI_UNREF(exec_ctx, p, r) \ @@ -192,8 +193,6 @@ static void fd_global_shutdown(void); /* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { - grpc_closure_scheduler workqueue_scheduler; - gpr_mu mu; /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement the refcount. @@ -214,15 +213,6 @@ typedef struct polling_island { /* Number of threads currently polling on this island */ gpr_atm poller_count; - /* Mutex guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_mu workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; /* The fd of the underlying epoll set */ int epoll_fd; @@ -297,8 +287,6 @@ static __thread polling_island *g_current_thread_polling_island; /* Forward declaration */ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -311,13 +299,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - static void pi_add_ref(polling_island *pi); static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#ifdef PI_REFCOUNT_DEBUG static void pi_add_ref_dbg(polling_island *pi, const char *reason, const char *file, int line) { long old_cnt = gpr_atm_acq_load(&pi->ref_count); @@ -333,36 +318,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } - -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - pi_add_ref_dbg((polling_island *)workqueue, reason, file, line); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - pi_add_ref((polling_island *)workqueue); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - pi_unref(exec_ctx, (polling_island *)workqueue); - } -} #endif static void pi_add_ref(polling_island *pi) { @@ -526,26 +481,16 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, *error = GRPC_ERROR_NONE; pi = gpr_malloc(sizeof(*pi)); - pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; gpr_mu_init(&pi->mu); pi->fd_cnt = 0; pi->fd_capacity = 0; pi->fds = NULL; pi->epoll_fd = -1; - gpr_mu_init(&pi->workqueue_read_mu); - gpr_mpscq_init(&pi->workqueue_items); - gpr_atm_rel_store(&pi->workqueue_item_count, 0); - gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->poller_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); - if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd), - err_desc)) { - goto done; - } - pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { @@ -553,8 +498,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, goto done; } - polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error); - if (initial_fd != NULL) { polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); } @@ -573,11 +516,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { if (pi->epoll_fd >= 0) { close(pi->epoll_fd); } - GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0); - gpr_mu_destroy(&pi->workqueue_read_mu); - gpr_mpscq_destroy(&pi->workqueue_items); gpr_mu_destroy(&pi->mu); - grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd); gpr_free(pi->fds); gpr_free(pi); } @@ -722,45 +661,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) { } } -static void workqueue_maybe_wakeup(polling_island *pi) { - /* If this thread is the current poller, then it may be that it's about to - decrement the current poller count, so we need to look past this thread */ - bool is_current_poller = (g_current_thread_polling_island == pi); - gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0; - gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count); - /* Only issue a wakeup if it's likely that some poller could come in and take - it right now. Note that since we do an anticipatory mpscq_pop every poll - loop, it's ok if we miss the wakeup here, as we'll get the work item when - the next poller enters anyway. */ - if (current_pollers >= min_current_pollers_for_wakeup) { - GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", - grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd)); - } -} - -static void workqueue_move_items_to_parent(polling_island *q) { - polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to); - if (p == NULL) { - return; - } - gpr_mu_lock(&q->workqueue_read_mu); - int num_added = 0; - while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items); - if (n != NULL) { - gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1); - gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1); - gpr_mpscq_push(&p->workqueue_items, n); - num_added++; - } - } - gpr_mu_unlock(&q->workqueue_read_mu); - if (num_added > 0) { - workqueue_maybe_wakeup(p); - } - workqueue_move_items_to_parent(p); -} - static polling_island *polling_island_merge(polling_island *p, polling_island *q, grpc_error **error) { @@ -785,8 +685,6 @@ static polling_island *polling_island_merge(polling_island *p, /* Add the 'merged_to' link from p --> q */ gpr_atm_rel_store(&p->merged_to, (gpr_atm)q); PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */ - - workqueue_move_items_to_parent(p); } /* else if p == q, nothing needs to be done */ @@ -797,32 +695,6 @@ static polling_island *polling_island_merge(polling_island *p, return q; } -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler; - /* take a ref to the workqueue: otherwise it can happen that whatever events - * this kicks off ends up destroying the workqueue before this function - * completes */ - GRPC_WORKQUEUE_REF(workqueue, "enqueue"); - polling_island *pi = (polling_island *)workqueue; - gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_maybe_wakeup(pi); - } - workqueue_move_items_to_parent(pi); - GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); - GPR_TIMER_END("workqueue.enqueue", 0); -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - polling_island *pi = (polling_island *)workqueue; - return workqueue == NULL ? grpc_schedule_on_exec_ctx - : &pi->workqueue_scheduler; -} - static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -1081,14 +953,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - gpr_mu_lock(&fd->po.mu); - grpc_workqueue *workqueue = - GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue"); - gpr_mu_unlock(&fd->po.mu); - return workqueue; -} - /******************************************************************************* * Pollset Definitions */ @@ -1326,44 +1190,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { gpr_mu_destroy(&pollset->po.mu); } -static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, - polling_island *pi) { - if (gpr_mu_trylock(&pi->workqueue_read_mu)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items); - gpr_mu_unlock(&pi->workqueue_read_mu); - if (n != NULL) { - gpr_atm remaining = - gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) - 1; - GRPC_POLLING_TRACE( - "maybe_do_workqueue_work: pi: %p: got closure %p, remaining = " - "%" PRIdPTR, - pi, n, remaining); - if (remaining > 0) { - workqueue_maybe_wakeup(pi); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - return true; - } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - GRPC_POLLING_TRACE( - "maybe_do_workqueue_work: pi: %p: more to do, but not yet", pi); - workqueue_maybe_wakeup(pi); - } - } else { - GRPC_POLLING_TRACE("maybe_do_workqueue_work: pi: %p: read already locked", - pi); - } - return false; -} - #define GRPC_EPOLL_MAX_EVENTS 100 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, @@ -1419,76 +1245,61 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, PI_ADD_REF(pi, "ps_work"); gpr_mu_unlock(&pollset->po.mu); - /* If we get some workqueue work to do, it might end up completing an item on - the completion queue, so there's no need to poll... so we skip that and - redo the complete loop to verify */ - GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker %p, pi %p", pollset, - worker, pi); - if (!maybe_do_workqueue_work(exec_ctx, pi)) { - GRPC_POLLING_TRACE("pollset_work: begins"); - gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); - g_current_thread_polling_island = pi; - - GRPC_SCHEDULING_START_BLOCKING_REGION; - ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, - sig_mask); - GRPC_SCHEDULING_END_BLOCKING_REGION; - if (ep_rv < 0) { - if (errno != EINTR) { - gpr_asprintf(&err_msg, - "epoll_wait() epoll fd: %d failed with error: %d (%s)", - epoll_fd, errno, strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - } else { - /* We were interrupted. Save an interation by doing a zero timeout - epoll_wait to see if there are any other events of interest */ - GRPC_POLLING_TRACE( - "pollset_work: pollset: %p, worker: %p received kick", - (void *)pollset, (void *)worker); - ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); - } + gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); + g_current_thread_polling_island = pi; + + GRPC_SCHEDULING_START_BLOCKING_REGION; + ep_rv = + epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); + GRPC_SCHEDULING_END_BLOCKING_REGION; + if (ep_rv < 0) { + if (errno != EINTR) { + gpr_asprintf(&err_msg, + "epoll_wait() epoll fd: %d failed with error: %d (%s)", + epoll_fd, errno, strerror(errno)); + append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); + } else { + /* We were interrupted. Save an interation by doing a zero timeout + epoll_wait to see if there are any other events of interest */ + GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick", + (void *)pollset, (void *)worker); + ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); } + } #ifdef GRPC_TSAN - /* See the definition of g_poll_sync for more details */ - gpr_atm_acq_load(&g_epoll_sync); + /* See the definition of g_poll_sync for more details */ + gpr_atm_acq_load(&g_epoll_sync); #endif /* defined(GRPC_TSAN) */ - for (int i = 0; i < ep_rv; ++i) { - void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &pi->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), - err_desc); - maybe_do_workqueue_work(exec_ctx, pi); - } else if (data_ptr == &polling_island_wakeup_fd) { - GRPC_POLLING_TRACE( - "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " - "%d) got merged", - (void *)pollset, (void *)worker, epoll_fd); - /* This means that our polling island is merged with a different - island. We do not have to do anything here since the subsequent call - to the function pollset_work_and_unlock() will pick up the correct - epoll_fd */ - } else { - grpc_fd *fd = data_ptr; - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd, pollset); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } + for (int i = 0; i < ep_rv; ++i) { + void *data_ptr = ep_ev[i].data.ptr; + if (data_ptr == &polling_island_wakeup_fd) { + GRPC_POLLING_TRACE( + "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " + "%d) got merged", + (void *)pollset, (void *)worker, epoll_fd); + /* This means that our polling island is merged with a different + island. We do not have to do anything here since the subsequent call + to the function pollset_work_and_unlock() will pick up the correct + epoll_fd */ + } else { + grpc_fd *fd = data_ptr; + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd, pollset); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); } } - - g_current_thread_polling_island = NULL; - gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); - GRPC_POLLING_TRACE("pollset_work: ends"); } + g_current_thread_polling_island = NULL; + gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); + GPR_ASSERT(pi != NULL); /* Before leaving, release the extra ref we added to the polling island. It @@ -1879,7 +1690,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1897,10 +1707,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 3a7648ac32..acf425751d 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -648,8 +648,6 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } - /******************************************************************************* * pollset_posix.c */ @@ -1289,30 +1287,6 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, } /******************************************************************************* - * workqueue stubs - */ - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - return workqueue; -} -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) {} -#endif - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - return grpc_schedule_on_exec_ctx; -} - -/******************************************************************************* * Condition Variable polling extensions */ @@ -1529,7 +1503,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1547,10 +1520,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index c4d2f23e29..c633aec5ac 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -171,10 +171,6 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { return g_event_engine->fd_create(fd, name); } -grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) { - return g_event_engine->fd_get_workqueue(fd); -} - int grpc_fd_wrapped_fd(grpc_fd *fd) { return g_event_engine->fd_wrapped_fd(fd); } @@ -276,26 +272,4 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd); } -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, - int line, const char *reason) { - return g_event_engine->workqueue_ref(workqueue, file, line, reason); -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason); -} -#else -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { - return g_event_engine->workqueue_ref(workqueue); -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - g_event_engine->workqueue_unref(exec_ctx, workqueue); -} -#endif - -grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { - return g_event_engine->workqueue_scheduler(workqueue); -} - #endif // GRPC_POSIX_SOCKET diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 80619aab5f..cbe5862372 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -41,7 +41,6 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" extern grpc_tracer_flag grpc_polling_trace; /* Disabled by default */ @@ -60,7 +59,6 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); bool (*fd_is_shutdown)(grpc_fd *fd); - grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); @@ -97,17 +95,6 @@ typedef struct grpc_event_engine_vtable { grpc_pollset_set *pollset_set, grpc_fd *fd); void (*shutdown_engine)(void); - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG - grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue, const char *file, - int line, const char *reason); - void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason); -#else - grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue); - void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); -#endif - grpc_closure_scheduler *(*workqueue_scheduler)(grpc_workqueue *workqueue); } grpc_event_engine_vtable; void grpc_event_engine_init(void); @@ -121,9 +108,6 @@ const char *grpc_get_poll_strategy_name(); This takes ownership of closing fd. */ grpc_fd *grpc_fd_create(int fd, const char *name); -/* Get a workqueue that's associated with this fd */ -grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd); - /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd *fd); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 318bb2b713..44e2676c3d 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -38,7 +38,6 @@ #include <grpc/support/thd.h> #include "src/core/lib/iomgr/combiner.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) { diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 75fd5b1c50..8edc9224c7 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -36,132 +36,173 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include <grpc/support/tls.h> +#include <grpc/support/useful.h> + #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/support/spinlock.h" + +#define MAX_DEPTH 2 -typedef struct grpc_executor_data { - int busy; /**< is the thread currently running? */ - int shutting_down; /**< has \a grpc_shutdown() been invoked? */ - int pending_join; /**< has the thread finished but not been joined? */ - grpc_closure_list closures; /**< collection of pending work */ - gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a - pending_join are true */ - gpr_thd_options options; +typedef struct { gpr_mu mu; -} grpc_executor; + gpr_cv cv; + grpc_closure_list elems; + size_t depth; + bool shutdown; + gpr_thd_id id; +} thread_state; -static grpc_executor g_executor; +static thread_state *g_thread_state; +static size_t g_max_threads; +static gpr_atm g_cur_threads; +static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; -void grpc_executor_init() { - memset(&g_executor, 0, sizeof(grpc_executor)); - gpr_mu_init(&g_executor.mu); - g_executor.options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&g_executor.options); -} +GPR_TLS_DECL(g_this_thread_state); -/* thread body */ -static void closure_exec_thread_func(void *ignored) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - while (1) { - gpr_mu_lock(&g_executor.mu); - if (g_executor.shutting_down != 0) { - gpr_mu_unlock(&g_executor.mu); - break; - } - if (grpc_closure_list_empty(g_executor.closures)) { - /* no more work, time to die */ - GPR_ASSERT(g_executor.busy == 1); - g_executor.busy = 0; - gpr_mu_unlock(&g_executor.mu); - break; - } else { - grpc_closure *c = g_executor.closures.head; - grpc_closure_list_init(&g_executor.closures); - gpr_mu_unlock(&g_executor.mu); - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; +static void executor_thread(void *arg); + +static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { + size_t n = 0; + + grpc_closure *c = list.head; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; #ifndef NDEBUG - c->scheduled = false; + c->scheduled = false; #endif - c->cb(&exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - } - grpc_exec_ctx_flush(&exec_ctx); - } + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; } - grpc_exec_ctx_finish(&exec_ctx); + + return n; } -/* Spawn the thread if new work has arrived a no thread is up */ -static void maybe_spawn_locked() { - if (grpc_closure_list_empty(g_executor.closures) == 1) { - return; - } - if (g_executor.shutting_down == 1) { - return; - } +bool grpc_executor_is_threaded() { + return gpr_atm_no_barrier_load(&g_cur_threads) > 0; +} - if (g_executor.busy != 0) { - /* Thread still working. New work will be picked up by already running - * thread. Not spawning anything. */ - return; - } else if (g_executor.pending_join != 0) { - /* Pickup the remains of the previous incarnations of the thread. */ - gpr_thd_join(g_executor.tid); - g_executor.pending_join = 0; +void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { + gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads); + if (threading) { + if (cur_threads > 0) return; + g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); + gpr_atm_no_barrier_store(&g_cur_threads, 1); + gpr_tls_init(&g_this_thread_state); + g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads); + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_init(&g_thread_state[i].mu); + gpr_cv_init(&g_thread_state[i].cv); + g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; + } + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0], + &opt); + } else { + if (cur_threads == 0) return; + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_lock(&g_thread_state[i].mu); + g_thread_state[i].shutdown = true; + gpr_cv_signal(&g_thread_state[i].cv); + gpr_mu_unlock(&g_thread_state[i].mu); + } + /* ensure no thread is adding a new thread... once this is past, then + no thread will try to add a new one either (since shutdown is true) */ + gpr_spinlock_lock(&g_adding_thread_lock); + gpr_spinlock_unlock(&g_adding_thread_lock); + for (gpr_atm i = 0; i < g_cur_threads; i++) { + gpr_thd_join(g_thread_state[i].id); + } + gpr_atm_no_barrier_store(&g_cur_threads, 0); + for (size_t i = 0; i < g_max_threads; i++) { + gpr_mu_destroy(&g_thread_state[i].mu); + gpr_cv_destroy(&g_thread_state[i].cv); + run_closures(exec_ctx, g_thread_state[i].elems); + } + gpr_free(g_thread_state); + gpr_tls_destroy(&g_this_thread_state); } +} - /* All previous instances of the thread should have been joined at this point. - * Spawn time! */ - g_executor.busy = 1; - GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, - &g_executor.options)); - g_executor.pending_join = 1; +void grpc_executor_init(grpc_exec_ctx *exec_ctx) { + gpr_atm_no_barrier_store(&g_cur_threads, 0); + grpc_executor_set_threading(exec_ctx, true); } -static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - gpr_mu_lock(&g_executor.mu); - if (g_executor.shutting_down == 0) { - grpc_closure_list_append(&g_executor.closures, closure, error); - maybe_spawn_locked(); +void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { + grpc_executor_set_threading(exec_ctx, false); +} + +static void executor_thread(void *arg) { + thread_state *ts = arg; + gpr_tls_set(&g_this_thread_state, (intptr_t)ts); + + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); + + size_t subtract_depth = 0; + for (;;) { + gpr_mu_lock(&ts->mu); + ts->depth -= subtract_depth; + while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { + gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + if (ts->shutdown) { + gpr_mu_unlock(&ts->mu); + break; + } + grpc_closure_list exec = ts->elems; + ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; + gpr_mu_unlock(&ts->mu); + + subtract_depth = run_closures(&exec_ctx, exec); + grpc_exec_ctx_flush(&exec_ctx); } - gpr_mu_unlock(&g_executor.mu); + grpc_exec_ctx_finish(&exec_ctx); } -void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { - int pending_join; - - gpr_mu_lock(&g_executor.mu); - pending_join = g_executor.pending_join; - g_executor.shutting_down = 1; - gpr_mu_unlock(&g_executor.mu); - /* we can release the lock at this point despite the access to the closure - * list below because we aren't accepting new work */ - - /* Execute pending callbacks, some may be performing cleanups */ - grpc_closure *c = g_executor.closures.head; - grpc_closure_list_init(&g_executor.closures); - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; +static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count == 0) { + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + return; + } + thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); + if (ts == NULL) { + ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; } - grpc_exec_ctx_flush(exec_ctx); - GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); - if (pending_join) { - gpr_thd_join(g_executor.tid); + gpr_mu_lock(&ts->mu); + if (grpc_closure_list_empty(ts->elems)) { + gpr_cv_signal(&ts->cv); + } + grpc_closure_list_append(&ts->elems, closure, error); + ts->depth++; + bool try_new_thread = ts->depth > MAX_DEPTH && + cur_thread_count < g_max_threads && !ts->shutdown; + if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + gpr_mu_unlock(&ts->mu); + cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count < g_max_threads) { + gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, + &g_thread_state[cur_thread_count], &opt); + } + gpr_spinlock_unlock(&g_adding_thread_lock); + } else { + gpr_mu_unlock(&ts->mu); } - gpr_mu_destroy(&g_executor.mu); } static const grpc_closure_scheduler_vtable executor_vtable = { diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 1213016383..792d5056cb 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -41,11 +41,18 @@ * This mechanism is meant to outsource work (grpc_closure instances) to a * thread, for those cases where blocking isn't an option but there isn't a * non-blocking solution available. */ -void grpc_executor_init(); +void grpc_executor_init(grpc_exec_ctx *exec_ctx); extern grpc_closure_scheduler *grpc_executor_scheduler; /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx); +/** Is the executor multi-threaded? */ +bool grpc_executor_is_threaded(); + +/* enable/disable threading - must be called after grpc_executor_init and before + grpc_executor_shutdown */ +void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable); + #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 1fd41c2f88..391449136e 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -44,6 +44,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/timer.h" @@ -56,11 +57,12 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; -void grpc_iomgr_init(void) { +void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); + grpc_executor_init(exec_ctx); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; @@ -68,7 +70,7 @@ void grpc_iomgr_init(void) { grpc_iomgr_platform_init(); } -void grpc_iomgr_start(void) { grpc_timer_manager_init(); } +void grpc_iomgr_start(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_init(); } static size_t count_objects(void) { grpc_iomgr_object *obj; @@ -93,6 +95,7 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_shutdown(); grpc_iomgr_platform_flush(); + grpc_executor_shutdown(exec_ctx); gpr_mu_lock(&g_mu); g_shutdown = 1; @@ -107,7 +110,8 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { + if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == + GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 6e2e023615..bd6ca4a0b8 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -38,10 +38,10 @@ #include "src/core/lib/iomgr/port.h" /** Initializes the iomgr. */ -void grpc_iomgr_init(void); +void grpc_iomgr_init(grpc_exec_ctx *exec_ctx); /** Starts any background threads for iomgr. */ -void grpc_iomgr_start(void); +void grpc_iomgr_start(grpc_exec_ctx *exec_ctx); /** Signals the intention to shutdown the iomgr. Expects to be able to flush * exec_ctx. */ diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 6b2b85cce0..3c6a6ea1a5 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -581,7 +581,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq, grpc_resource_quota *grpc_resource_quota_create(const char *name) { grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota)); gpr_ref_init(&resource_quota->refs, 1); - resource_quota->combiner = grpc_combiner_create(NULL); + resource_quota->combiner = grpc_combiner_create(); resource_quota->free_pool = INT64_MAX; resource_quota->size = INT64_MAX; gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); @@ -594,12 +594,11 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR, (intptr_t)resource_quota); } - grpc_closure_init( - &resource_quota->rq_step_closure, rq_step, resource_quota, - grpc_combiner_finally_scheduler(resource_quota->combiner, true)); + grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota, + grpc_combiner_finally_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_quota->rq_reclamation_done_closure, rq_reclamation_done, resource_quota, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_quota->roots[i] = NULL; } @@ -704,18 +703,18 @@ grpc_resource_user *grpc_resource_user_create( grpc_resource_quota_ref_internal(resource_quota); grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_user->add_to_free_pool_closure, &ru_add_to_free_pool, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_user->post_reclaimer_closure[0], &ru_post_benign_reclaimer, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_user->post_reclaimer_closure[1], &ru_post_destructive_reclaimer, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user, - grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_combiner_scheduler(resource_quota->combiner)); gpr_mu_init(&resource_user->mu); gpr_atm_rel_store(&resource_user->refs, 1); gpr_atm_rel_store(&resource_user->shutdown, 0); @@ -772,12 +771,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user) { if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) { - grpc_closure_sched(exec_ctx, - grpc_closure_create( - ru_shutdown, resource_user, - grpc_combiner_scheduler( - resource_user->resource_quota->combiner, false)), - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + grpc_closure_create( + ru_shutdown, resource_user, + grpc_combiner_scheduler(resource_user->resource_quota->combiner)), + GRPC_ERROR_NONE); } } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 5d360b0b80..f066881237 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -558,26 +558,15 @@ static int tcp_get_fd(grpc_endpoint *ep) { return tcp->fd; } -static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *)ep; - return grpc_fd_get_workqueue(tcp->em_fd); -} - static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return tcp->resource_user; } -static const grpc_endpoint_vtable vtable = {tcp_read, - tcp_write, - tcp_get_workqueue, - tcp_add_to_pollset, - tcp_add_to_pollset_set, - tcp_shutdown, - tcp_destroy, - tcp_get_resource_user, - tcp_get_peer, - tcp_get_fd}; +static const grpc_endpoint_vtable vtable = { + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer, + tcp_get_fd}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index dc23e4f521..31e779af75 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -338,10 +338,9 @@ static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } static int uv_get_fd(grpc_endpoint *ep) { return -1; } static grpc_endpoint_vtable vtable = { - uv_endpoint_read, uv_endpoint_write, uv_get_workqueue, - uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown, - uv_destroy, uv_get_resource_user, uv_get_peer, - uv_get_fd}; + uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset, + uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy, + uv_get_resource_user, uv_get_peer, uv_get_fd}; grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index e0338f93c7..d6758f7c3b 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -89,6 +89,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); /* iomgr internal api for dealing with timers */ +typedef enum { + GRPC_TIMERS_NOT_CHECKED, + GRPC_TIMERS_CHECKED_AND_EMPTY, + GRPC_TIMERS_FIRED, +} grpc_timer_check_result; + /* Check for timers to be run, and run them. Return true if timer callbacks were executed. If next is non-null, TRY to update *next with the next running timer @@ -96,8 +102,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); *next is never guaranteed to be updated on any given execution; however, with high probability at least one thread in the system will see an update at any time slice. */ -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next); +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next); void grpc_timer_list_init(gpr_timespec now); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index b28340b71c..019cd8f309 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -101,8 +101,10 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { return a + b; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, - gpr_atm *next, grpc_error *error); +static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, + gpr_atm now, + gpr_atm *next, + grpc_error *error); static gpr_timespec dbl_to_ts(double d) { gpr_timespec ts; @@ -421,19 +423,22 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, return n; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, - gpr_atm *next, grpc_error *error) { - size_t n = 0; +static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, + gpr_atm now, + gpr_atm *next, + grpc_error *error) { + grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED; gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); gpr_tls_set(&g_last_seen_min_timer, min_timer); if (now < min_timer) { if (next != NULL) *next = GPR_MIN(*next, min_timer); - return 0; + return GRPC_TIMERS_CHECKED_AND_EMPTY; } if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { gpr_mu_lock(&g_shared_mutables.mu); + result = GRPC_TIMERS_CHECKED_AND_EMPTY; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR, @@ -448,14 +453,17 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += - pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); + if (pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, + error) > 0) { + result = GRPC_TIMERS_FIRED; + } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR - ", shard[%d]->min_deadline %" PRIdPTR - " --> %" PRIdPTR ", now=%" PRIdPTR, - n, (int)(g_shard_queue[0] - g_shards), + gpr_log(GPR_DEBUG, + " .. result --> %d" + ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR + ", now=%" PRIdPTR, + result, (int)(g_shard_queue[0] - g_shards), g_shard_queue[0]->min_deadline, new_min_deadline, now); } @@ -476,26 +484,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, g_shard_queue[0]->min_deadline); gpr_mu_unlock(&g_shared_mutables.mu); gpr_spinlock_unlock(&g_shared_mutables.checker_mu); - } else if (next != NULL) { - /* TODO(ctiller): this forces calling code to do an short poll, and - then retry the timer check (because this time through the timer list was - contended). - - We could reduce the cost here dramatically by keeping a count of how - many currently active pollers got through the uncontended case above - successfully, and waking up other pollers IFF that count drops to zero. - - Once that count is in place, this entire else branch could disappear. */ - *next = GPR_MIN(*next, now + 1); } GRPC_ERROR_UNREF(error); - return (int)n; + return result; } -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next) { +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next) { // prelude GPR_ASSERT(now.clock_type == g_clock_type); gpr_atm now_atm = timespec_to_atm_round_down(now); @@ -513,7 +510,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, now_atm, min_timer); } - return 0; + return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = @@ -538,7 +535,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_free(next_str); } // actual code - bool r; + grpc_timer_check_result r; gpr_atm next_atm; if (next == NULL) { r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); @@ -556,11 +553,10 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, next->tv_nsec, next_atm); } - gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r, - next_str); + gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); } - return r > 0; + return r; } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 24085093e7..082dfe8299 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -107,86 +107,124 @@ void grpc_timer_manager_tick() { grpc_exec_ctx_finish(&exec_ctx); } -static void timer_thread(void *unused) { - // this threads exec_ctx: we try to run things through to completion here - // since it's easy to spin up new threads - grpc_exec_ctx exec_ctx = - GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); +static void run_some_timers(grpc_exec_ctx *exec_ctx) { + // if there's something to execute... + gpr_mu_lock(&g_mu); + // remove a waiter from the pool, and start another thread if necessary + --g_waiter_count; + if (g_waiter_count == 0 && g_threaded) { + start_timer_thread_and_unlock(); + } else { + // if there's no thread waiting with a timeout, kick an existing + // waiter + // so that the next deadline is not missed + if (!g_has_timed_waiter) { + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "kick untimed waiter"); + } + gpr_cv_signal(&g_cv_wait); + } + gpr_mu_unlock(&g_mu); + } + // without our lock, flush the exec_ctx + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&g_mu); + // garbage collect any threads hanging out that are dead + gc_completed_threads(); + // get ready to wait again + ++g_waiter_count; + gpr_mu_unlock(&g_mu); +} + +// wait until 'next' (or forever if there is already a timed waiter in the pool) +// returns true if the thread should continue executing (false if it should +// shutdown) +static bool wait_until(gpr_timespec next) { + const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); + gpr_mu_lock(&g_mu); + // if we're not threaded anymore, leave + if (!g_threaded) { + gpr_mu_unlock(&g_mu); + return false; + } + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire + // all other timers wait forever + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) { + g_has_timed_waiter = true; + // we use a generation counter to track the timed waiter so we can + // cancel an existing one quickly (and when it actually times out it'll + // figure stuff out instead of incurring a wakeup) + my_timed_waiter_generation = ++g_timed_waiter_generation; + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", + wait_time.tv_sec, wait_time.tv_nsec); + } + } else { + next = inf_future; + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "sleep until kicked"); + } + } + gpr_cv_wait(&g_cv_wait, &g_mu, next); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, g_kicked); + } + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + } + // if this was a kick from the timer system, consume it (and don't stop + // this thread yet) + if (g_kicked) { + grpc_timer_consume_kick(); + g_kicked = false; + } + gpr_mu_unlock(&g_mu); + return true; +} + +static void timer_main_loop(grpc_exec_ctx *exec_ctx) { const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { gpr_timespec next = inf_future; gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); // check timer state, updates next to the next time to run a check - if (grpc_timer_check(&exec_ctx, now, &next)) { - // if there's something to execute... - gpr_mu_lock(&g_mu); - // remove a waiter from the pool, and start another thread if necessary - --g_waiter_count; - if (g_waiter_count == 0 && g_threaded) { - start_timer_thread_and_unlock(); - } else { - // if there's no thread waiting with a timeout, kick an existing waiter - // so that the next deadline is not missed - if (!g_has_timed_waiter) { - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "kick untimed waiter"); - } - gpr_cv_signal(&g_cv_wait); - } - gpr_mu_unlock(&g_mu); - } - // without our lock, flush the exec_ctx - grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(&g_mu); - // garbage collect any threads hanging out that are dead - gc_completed_threads(); - // get ready to wait again - ++g_waiter_count; - gpr_mu_unlock(&g_mu); - } else { - gpr_mu_lock(&g_mu); - // if we're not threaded anymore, leave - if (!g_threaded) break; - // if there's no timed waiter, we should become one: that waiter waits - // only until the next timer should expire - // all other timers wait forever - uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; - if (!g_has_timed_waiter) { - g_has_timed_waiter = true; - // we use a generation counter to track the timed waiter so we can - // cancel an existing one quickly (and when it actually times out it'll - // figure stuff out instead of incurring a wakeup) - my_timed_waiter_generation = ++g_timed_waiter_generation; + switch (grpc_timer_check(exec_ctx, now, &next)) { + case GRPC_TIMERS_FIRED: + run_some_timers(exec_ctx); + break; + case GRPC_TIMERS_NOT_CHECKED: + /* This case only happens under contention, meaning more than one timer + manager thread checked timers concurrently. + + If that happens, we're guaranteed that some other thread has just + checked timers, and this will avalanche into some other thread seeing + empty timers and doing a timed sleep. + + Consequently, we can just sleep forever here and be happy at some + saved wakeup cycles. */ if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "sleep for a while"); + gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } - } else { next = inf_future; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "sleep until kicked"); + /* fall through */ + case GRPC_TIMERS_CHECKED_AND_EMPTY: + if (!wait_until(next)) { + return; } - } - gpr_cv_wait(&g_cv_wait, &g_mu, next); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", - my_timed_waiter_generation == g_timed_waiter_generation, - g_kicked); - } - // if this was the timed waiter, then we need to check timers, and flag - // that there's now no timed waiter... we'll look for a replacement if - // there's work to do after checking timers (code above) - if (my_timed_waiter_generation == g_timed_waiter_generation) { - g_has_timed_waiter = false; - } - // if this was a kick from the timer system, consume it (and don't stop - // this thread yet) - if (g_kicked) { - grpc_timer_consume_kick(); - g_kicked = false; - } - gpr_mu_unlock(&g_mu); + break; } } +} + +static void timer_thread_cleanup(void) { + gpr_mu_lock(&g_mu); // terminate the thread: drop the waiter count, thread count, and let whomever // stopped the threading stuff know that we're done --g_waiter_count; @@ -199,12 +237,21 @@ static void timer_thread(void *unused) { ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); - grpc_exec_ctx_finish(&exec_ctx); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "End timer thread"); } } +static void timer_thread(void *unused) { + // this threads exec_ctx: we try to run things through to completion here + // since it's easy to spin up new threads + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); + timer_main_loop(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + timer_thread_cleanup(); +} + static void start_threads(void) { gpr_mu_lock(&g_mu); if (!g_threaded) { diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 2952e44b58..967e84eb14 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -96,9 +96,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } } -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next) { - return false; +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next) { + return GRPC_TIMERS_NOT_CHECKED; } void grpc_timer_list_init(gpr_timespec now) {} diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h deleted file mode 100644 index 371b0f55dc..0000000000 --- a/src/core/lib/iomgr/workqueue.h +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_H -#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_H - -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/iomgr/pollset.h" -#include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/iomgr/port.h" - -#ifdef GPR_WINDOWS -#include "src/core/lib/iomgr/workqueue_windows.h" -#endif - -/* grpc_workqueue is forward declared in exec_ctx.h */ - -/* Reference counting functions. Use the macro's always - (GRPC_WORKQUEUE_{REF,UNREF}). - - Pass in a descriptive reason string for reffing/unreffing as the last - argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that - string will be printed alongside the refcount. When it is not defined, the - string will be discarded at compilation time. */ - -/*#define GRPC_WORKQUEUE_REFCOUNT_DEBUG*/ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -#define GRPC_WORKQUEUE_REF(p, r) \ - grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \ - grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, - int line, const char *reason); -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason); -#else -#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) -#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p)) -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue); -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); -#endif - -/** Fetch the workqueue closure scheduler. Items added to a work queue will be - started in approximately the order they were enqueued, on some thread that - may or may not be the current thread. Successive closures enqueued onto a - workqueue MAY be executed concurrently. - - It is generally more expensive to add a closure to a workqueue than to the - execution context, both in terms of CPU work and in execution latency. - - Use work queues when it's important that other threads be given a chance to - tackle some workload. */ -grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue); - -#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c deleted file mode 100644 index 4d61b40912..0000000000 --- a/src/core/lib/iomgr/workqueue_uv.c +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/iomgr/port.h" - -#ifdef GRPC_UV - -#include "src/core/lib/iomgr/workqueue.h" - -// Minimal implementation of grpc_workqueue for libuv -// Works by directly enqueuing workqueue items onto the current execution -// context, which is at least correct, if not performant or in the spirit of -// workqueues. - -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, - int line, const char *reason) { - return workqueue; -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} -#endif - -grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { - return grpc_schedule_on_exec_ctx; -} - -#endif /* GPR_UV */ diff --git a/src/core/lib/iomgr/workqueue_uv.h b/src/core/lib/iomgr/workqueue_uv.h deleted file mode 100644 index be3f8e4d93..0000000000 --- a/src/core/lib/iomgr/workqueue_uv.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H -#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H - -#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c deleted file mode 100644 index 234b47cdf5..0000000000 --- a/src/core/lib/iomgr/workqueue_windows.c +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_WINDOWS - -#include "src/core/lib/iomgr/workqueue.h" - -// Minimal implementation of grpc_workqueue for Windows -// Works by directly enqueuing workqueue items onto the current execution -// context, which is at least correct, if not performant or in the spirit of -// workqueues. - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, - int line, const char *reason) { - return workqueue; -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} -#endif - -grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) { - return grpc_schedule_on_exec_ctx; -} - -#endif /* GPR_WINDOWS */ diff --git a/src/core/lib/iomgr/workqueue_windows.h b/src/core/lib/iomgr/workqueue_windows.h deleted file mode 100644 index e5d59130bb..0000000000 --- a/src/core/lib/iomgr/workqueue_windows.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H -#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H - -#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index dff05633ec..ac1cce722f 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -65,7 +65,8 @@ typedef struct { */ grpc_polling_entity *pollent; grpc_transport_stream_op_batch op; - uint8_t security_context_set; + gpr_atm security_context_set; + gpr_mu security_context_mu; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; grpc_auth_metadata_context auth_md_context; } call_data; @@ -253,19 +254,26 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_linked_mdelem *l; grpc_client_security_context *sec_ctx = NULL; - if (!op->cancel_stream && calld->security_context_set == 0) { - calld->security_context_set = 1; - GPR_ASSERT(op->payload->context != NULL); - if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { - op->payload->context[GRPC_CONTEXT_SECURITY].value = - grpc_client_security_context_create(); - op->payload->context[GRPC_CONTEXT_SECURITY].destroy = - grpc_client_security_context_destroy; + if (!op->cancel_stream) { + /* double checked lock over security context to ensure it's set once */ + if (gpr_atm_acq_load(&calld->security_context_set) == 0) { + gpr_mu_lock(&calld->security_context_mu); + if (gpr_atm_acq_load(&calld->security_context_set) == 0) { + GPR_ASSERT(op->payload->context != NULL); + if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + op->payload->context[GRPC_CONTEXT_SECURITY].value = + grpc_client_security_context_create(); + op->payload->context[GRPC_CONTEXT_SECURITY].destroy = + grpc_client_security_context_destroy; + } + sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; + GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); + sec_ctx->auth_context = + GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); + gpr_atm_rel_store(&calld->security_context_set, 1); + } + gpr_mu_unlock(&calld->security_context_mu); } - sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; - GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); - sec_ctx->auth_context = - GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); } if (op->send_initial_metadata) { @@ -312,6 +320,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; memset(calld, 0, sizeof(*calld)); + gpr_mu_init(&calld->security_context_mu); return GRPC_ERROR_NONE; } @@ -335,6 +344,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_slice_unref_internal(exec_ctx, calld->method); } reset_auth_metadata_context(&calld->auth_md_context); + gpr_mu_destroy(&calld->security_context_mu); } /* Constructor for channel_data */ diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 48d368a2a7..39b39379a6 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -380,11 +380,6 @@ static int endpoint_get_fd(grpc_endpoint *secure_ep) { return grpc_endpoint_get_fd(ep->wrapped_ep); } -static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) { - secure_endpoint *ep = (secure_endpoint *)secure_ep; - return grpc_endpoint_get_workqueue(ep->wrapped_ep); -} - static grpc_resource_user *endpoint_get_resource_user( grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; @@ -393,7 +388,6 @@ static grpc_resource_user *endpoint_get_resource_user( static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_write, - endpoint_get_workqueue, endpoint_add_to_pollset, endpoint_add_to_pollset_set, endpoint_shutdown, diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c index af1651dae5..4d15553768 100644 --- a/src/core/lib/support/log.c +++ b/src/core/lib/support/log.c @@ -43,7 +43,7 @@ #include <string.h> extern void gpr_default_log(gpr_log_func_args *args); -static gpr_log_func g_log_func = gpr_default_log; +static gpr_atm g_log_func = (gpr_atm)gpr_default_log; static gpr_atm g_min_severity_to_print = GPR_LOG_VERBOSITY_UNSET; const char *gpr_log_severity_string(gpr_log_severity severity) { @@ -70,7 +70,7 @@ void gpr_log_message(const char *file, int line, gpr_log_severity severity, lfargs.line = line; lfargs.severity = severity; lfargs.message = message; - g_log_func(&lfargs); + ((gpr_log_func)gpr_atm_no_barrier_load(&g_log_func))(&lfargs); } void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) { @@ -99,5 +99,5 @@ void gpr_log_verbosity_init() { } void gpr_set_log_function(gpr_log_func f) { - g_log_func = f ? f : gpr_default_log; + gpr_atm_no_barrier_store(&g_log_func, (gpr_atm)(f ? f : gpr_default_log)); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 201969cd45..7e25a5142f 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1468,7 +1468,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *stream_op = &bctl->op; grpc_transport_stream_op_batch_payload *stream_op_payload = &call->stream_op_payload; - stream_op->covered_by_poller = true; /* rewrite batch ops into a transport op */ for (i = 0; i < nops; i++) { @@ -1657,10 +1656,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come - from server.c. In that case, it's coming from accept_stream, and in - that case we're not necessarily covered by a poller. */ - stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = true; call->buffered_metadata[0] = op->data.recv_initial_metadata.recv_initial_metadata; diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 6163776152..86ce4bde61 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -128,6 +128,7 @@ void grpc_init(void) { int i; gpr_once_init(&g_basic_init, do_basic_init); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); @@ -154,8 +155,7 @@ void grpc_init(void) { grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); #endif grpc_security_pre_init(); - grpc_iomgr_init(); - grpc_executor_init(); + grpc_iomgr_init(&exec_ctx); gpr_timers_global_init(); grpc_handshaker_factory_registry_init(); grpc_security_init(); @@ -171,19 +171,20 @@ void grpc_init(void) { grpc_tracer_init("GRPC_TRACE"); /* no more changes to channel init pipelines */ grpc_channel_init_finalize(); - grpc_iomgr_start(); + grpc_iomgr_start(&exec_ctx); } gpr_mu_unlock(&g_init_mu); + grpc_exec_ctx_finish(&exec_ctx); GRPC_API_TRACE("grpc_init(void)", 0, ()); } void grpc_shutdown(void) { int i; GRPC_API_TRACE("grpc_shutdown(void)", 0, ()); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { - grpc_executor_shutdown(&exec_ctx); grpc_iomgr_shutdown(&exec_ctx); gpr_timers_global_destroy(); grpc_tracer_shutdown(); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 93369cc689..dd3a987eff 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -127,10 +127,6 @@ typedef struct grpc_transport_stream_op_batch { /** Values for the stream op (fields set are determined by flags above) */ grpc_transport_stream_op_batch_payload *payload; - /** Is the completion of this op covered by a poller (if false: the op should - complete independently of some pollset being polled) */ - bool covered_by_poller : 1; - /** Send initial metadata to the peer, from the provided metadata batch. */ bool send_initial_metadata : 1; diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index 3a2a793e01..5e7ac5b626 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -79,9 +79,6 @@ char *grpc_transport_stream_op_batch_string( gpr_strvec b; gpr_strvec_init(&b); - gpr_strvec_add( - &b, gpr_strdup(op->covered_by_poller ? "[COVERED]" : "[UNCOVERED]")); - if (op->send_initial_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{")); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 8a1a58bb86..30529c70b1 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -158,8 +158,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/wakeup_fd_nospecial.c', 'src/core/lib/iomgr/wakeup_fd_pipe.c', 'src/core/lib/iomgr/wakeup_fd_posix.c', - 'src/core/lib/iomgr/workqueue_uv.c', - 'src/core/lib/iomgr/workqueue_windows.c', 'src/core/lib/json/json.c', 'src/core/lib/json/json_reader.c', 'src/core/lib/json/json_string.c', |