diff options
Diffstat (limited to 'src/core')
30 files changed, 1614 insertions, 1419 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 22c2bc8880..ea5e076c3b 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1205,6 +1205,9 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, "Pick cancelled", &error, 1)); } +static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem); + static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -1228,7 +1231,7 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, chand, calld); } async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); - } else { + } else if (chand->lb_policy != NULL) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); @@ -1242,6 +1245,30 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE); } } + // TODO(roth): It should be impossible for chand->lb_policy to be NULL + // here, so the rest of this code should never actually be executed. + // However, we have reports of a crash on iOS that triggers this case, + // so we are temporarily adding this to restore branches that were + // removed in https://github.com/grpc/grpc/pull/12297. Need to figure + // out what is actually causing this to occur and then figure out the + // right way to deal with it. + else if (chand->resolver != NULL) { + // No LB policy, so try again. + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: resolver returned but no LB policy, " + "trying again", + chand, calld); + } + pick_after_resolver_result_start_locked(exec_ctx, elem); + } else { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, + calld); + } + async_pick_done_locked( + exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } } static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, @@ -1599,8 +1626,8 @@ int grpc_client_channel_num_external_connectivity_watchers( return count; } -static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_external_watch_complete_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { external_connectivity_watcher *w = (external_connectivity_watcher *)arg; grpc_closure *follow_up = w->on_complete; grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, @@ -1619,8 +1646,8 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, if (w->state != NULL) { external_connectivity_watcher_list_append(w->chand, w); GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); - GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, + grpc_combiner_scheduler(w->chand->combiner)); grpc_connectivity_state_notify_on_state_change( exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); } else { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ffd58129c6..d6bdc13ba9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1027,15 +1027,19 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + gpr_free(pp); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + gpr_free(pping); pping = next; } } @@ -1803,9 +1807,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, break; } case GRPC_CHANNEL_IDLE: - // lb channel inactive (probably shutdown prior to update). Restart lb - // call to kick the lb channel into gear. - GPR_ASSERT(glb_policy->lb_call == NULL); + // lb channel inactive (probably shutdown prior to update). Restart lb + // call to kick the lb channel into gear. /* fallthrough */ case GRPC_CHANNEL_READY: if (glb_policy->lb_call != NULL) { diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index b07fc3b720..56c261ba34 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -102,12 +102,23 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } +static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + pending_pick *pp; + while ((pp = p->pending_picks) != NULL) { + p->pending_picks = pp->next; + *pp->target = NULL; + GRPC_CLOSURE_SCHED( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + gpr_free(pp); + } +} + static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; p->shutdown = true; - pp = p->pending_picks; - p->pending_picks = NULL; + fail_pending_picks_for_shutdown(exec_ctx, p); grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown"); @@ -120,13 +131,6 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); } - while (pp != NULL) { - pending_pick *next = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - pp = next; - } } static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -637,12 +641,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick first exhausted channels", &error, 1), "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } + fail_pending_picks_for_shutdown(exec_ctx, p); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } else { diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 6812bb50cd..de163f6300 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -315,15 +315,10 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(p); } -static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p", - (void *)pol, (void *)pol); - } - p->shutdown = true; +static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx, + round_robin_lb_policy *p) { pending_pick *pp; - while ((pp = p->pending_picks)) { + while ((pp = p->pending_picks) != NULL) { p->pending_picks = pp->next; *pp->target = NULL; GRPC_CLOSURE_SCHED( @@ -331,6 +326,16 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); gpr_free(pp); } +} + +static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p", + (void *)pol, (void *)pol); + } + p->shutdown = true; + fail_pending_picks_for_shutdown(exec_ctx, p); grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); @@ -621,14 +626,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, sd->user_data = NULL; } if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - // the policy is shutting down. Flush all the pending picks... - pending_pick *pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } + // The policy is shutting down. Fail all of the pending picks. + fail_pending_picks_for_shutdown(exec_ctx, p); } rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown+started_picking"); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 202bcd47f5..74839f2156 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -115,6 +115,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } memset(c->result, 0, sizeof(*c->result)); } else { + grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint, + c->args.interested_parties); c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); GPR_ASSERT(c->result->transport); @@ -136,6 +138,8 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx, c->handshake_mgr = grpc_handshake_manager_create(); grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args, c->handshake_mgr); + grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint, + c->args.interested_parties); grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, NULL /* acceptor */, on_handshake_done, c); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 9462d1085e..02fc53122d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -54,7 +54,6 @@ #include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" -#define DEFAULT_WINDOW 65535 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) @@ -222,7 +221,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, t->write_cb_pool = next; } - t->flow_control.bdp_estimator.Destroy(); + t->flow_control.Destroy(); GRPC_ERROR_UNREF(t->closed_with_error); gpr_free(t->ping_acks); @@ -282,10 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->endpoint_reading = 1; t->next_stream_id = is_client ? 1 : 2; t->is_client = is_client; - t->flow_control.remote_window = DEFAULT_WINDOW; - t->flow_control.announced_window = DEFAULT_WINDOW; - t->flow_control.target_initial_window_size = DEFAULT_WINDOW; - t->flow_control.t = t; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -325,8 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); - t->flow_control.bdp_estimator.Init(t->peer_string); - grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); @@ -350,8 +343,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, window -- this should by rights be 0 */ t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->sent_local_settings = 0; - t->write_buffer_size = DEFAULT_WINDOW; - t->flow_control.enable_bdp_probe = true; + t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( @@ -396,6 +388,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + bool enable_bdp = true; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -456,8 +450,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { - t->flow_control.enable_bdp_probe = - grpc_channel_arg_get_integer(&channel_args->args[i], {1, 0, 1}); + enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { const int value = grpc_channel_arg_get_integer( @@ -552,6 +545,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + t->flow_control.Init(exec_ctx, t, enable_bdp); + /* No pings allowed before receiving a header or data frame. */ t->ping_state.pings_before_data_required = 0; t->ping_state.is_delayed_ping_timer_set = false; @@ -572,15 +567,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - if (t->flow_control.enable_bdp_probe) { + if (enable_bdp) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); schedule_bdp_ping_locked(exec_ctx, t); - } - grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, - NULL); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL); + } grpc_chttp2_initiate_write(exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); @@ -718,7 +711,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, post_destructive_reclaimer(exec_ctx, t); } - s->flow_control.s = s; + s->flow_control.Init(t->flow_control.get(), s); GPR_TIMER_END("init_stream", 0); return 0; @@ -769,7 +762,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->byte_stream_error); - grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control); + s->flow_control.Destroy(); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -1638,13 +1631,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (s->id != 0) { if (!s->read_closed) { already_received = s->frame_storage.length; - grpc_chttp2_flowctl_incoming_bs_update( - &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES, - already_received); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - &s->flow_control), - t, s); + s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES, + already_received); + grpc_chttp2_act_on_flowctl_action(exec_ctx, + s->flow_control->MakeAction(), t, s); } } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); @@ -2420,49 +2410,44 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, * INPUT PROCESSING - PARSING */ -void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, - grpc_chttp2_flowctl_action action, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - switch (action.send_stream_update) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: - break; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); - grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL); +template <class F> +static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_core::chttp2::FlowControlAction::Urgency urgency, + grpc_chttp2_initiate_write_reason reason, F action) { + switch (urgency) { + case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED: break; - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY: + grpc_chttp2_initiate_write(exec_ctx, t, reason); + // fallthrough + case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE: + action(); break; } - switch (action.send_transport_update) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: - break; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); - break; - // this is the same as no action b/c every time the transport enters the - // writing path it will maybe do an update - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - break; - } - if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - if (action.initial_window_size > 0) { - queue_setting_update(exec_ctx, t, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - (uint32_t)action.initial_window_size); - } - if (action.max_frame_size > 0) { - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - (uint32_t)action.max_frame_size); - } - if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); - } - } +} + +void grpc_chttp2_act_on_flowctl_action( + grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action, + grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + WithUrgency( + exec_ctx, t, action.send_stream_update(), + GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, + [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); }); + WithUrgency(exec_ctx, t, action.send_transport_update(), + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {}); + WithUrgency(exec_ctx, t, action.send_initial_window_update(), + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, + [exec_ctx, t, &action]() { + queue_setting_update(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + action.initial_window_size()); + }); + WithUrgency( + exec_ctx, t, action.send_max_frame_size_update(), + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() { + queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + action.max_frame_size()); + }); } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2518,7 +2503,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { - t->flow_control.bdp_estimator->AddIncomingBytes( + t->flow_control->bdp_estimator()->AddIncomingBytes( (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); errors[1] = grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); @@ -2535,8 +2520,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action.parse", 0); GPR_TIMER_BEGIN("post_parse_locked", 0); - if (t->flow_control.initial_window_update != 0) { - if (t->flow_control.initial_window_update > 0) { + if (t->initial_window_update != 0) { + if (t->initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); @@ -2545,7 +2530,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } - t->flow_control.initial_window_update = 0; + t->initial_window_update = 0; } GPR_TIMER_END("post_parse_locked", 0); } @@ -2568,10 +2553,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, - NULL); + grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(), + t, NULL); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -2588,7 +2571,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, // that kicks off finishes, it's unreffed static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - t->flow_control.bdp_estimator->SchedulePing(); + t->flow_control->bdp_estimator()->SchedulePing(); send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); } @@ -2604,7 +2587,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); } - t->flow_control.bdp_estimator->StartPing(); + t->flow_control->bdp_estimator()->StartPing(); } static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, @@ -2618,7 +2601,10 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); return; } - grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx); + grpc_millis next_ping = + t->flow_control->bdp_estimator()->CompletePing(exec_ctx); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr); GPR_ASSERT(!t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = true; grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping, @@ -2844,13 +2830,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, size_t cur_length = s->frame_storage.length; if (!s->read_closed) { - grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control, - bs->next_action.max_size_hint, - cur_length); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - &s->flow_control), - t, s); + s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint, + cur_length); + grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(), + t, s); } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 716cd71490..40545bc74b 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -16,7 +16,7 @@ * */ -#include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/ext/transport/chttp2/transport/flow_control.h" #include <inttypes.h> #include <limits.h> @@ -28,38 +28,15 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/support/string.h" -static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport_flowctl* tfc); - -#ifndef NDEBUG - -typedef struct { - int64_t remote_window; - int64_t target_window; - int64_t announced_window; - int64_t remote_window_delta; - int64_t local_window_delta; - int64_t announced_window_delta; - uint32_t local_init_window; - uint32_t local_max_frame; -} shadow_flow_control; - -static void pretrace(shadow_flow_control* shadow_fc, - grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - shadow_fc->remote_window = tfc->remote_window; - shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc); - shadow_fc->announced_window = tfc->announced_window; - if (sfc != NULL) { - shadow_fc->remote_window_delta = sfc->remote_window_delta; - shadow_fc->local_window_delta = sfc->local_window_delta; - shadow_fc->announced_window_delta = sfc->announced_window_delta; - } -} +namespace grpc_core { +namespace chttp2 { + +namespace { -#define TRACE_PADDING 30 +static constexpr const int kTracePadding = 30; static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { char* str; @@ -68,7 +45,7 @@ static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { } else { gpr_asprintf(&str, "%" PRId64 "", old_val); } - char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); + char* str_lp = gpr_leftpad(str, ' ', kTracePadding); gpr_free(str); return str_lp; } @@ -80,47 +57,58 @@ static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) { } else { gpr_asprintf(&str, "%" PRIu32 "", old_val); } - char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); + char* str_lp = gpr_leftpad(str, ' ', kTracePadding); gpr_free(str); return str_lp; } +} // namespace + +void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc, + StreamFlowControl* sfc) { + tfc_ = tfc; + sfc_ = sfc; + reason_ = reason; + remote_window_ = tfc->remote_window(); + target_window_ = tfc->target_window(); + announced_window_ = tfc->announced_window(); + if (sfc != nullptr) { + remote_window_delta_ = sfc->remote_window_delta(); + local_window_delta_ = sfc->local_window_delta(); + announced_window_delta_ = sfc->announced_window_delta(); + } +} -static void posttrace(shadow_flow_control* shadow_fc, - grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, const char* reason) { +void FlowControlTrace::Finish() { uint32_t acked_local_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; uint32_t remote_window = - tfc->t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - char* trw_str = - fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window); - char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window, - grpc_chttp2_target_announced_window(tfc)); + tfc_->transport()->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window()); + char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window()); char* taw_str = - fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window); + fmt_int64_diff_str(announced_window_, tfc_->announced_window()); char* srw_str; char* slw_str; char* saw_str; - if (sfc != NULL) { - srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window, - sfc->remote_window_delta + remote_window); - slw_str = - fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window, - sfc->local_window_delta + acked_local_window); - saw_str = fmt_int64_diff_str( - shadow_fc->announced_window_delta + acked_local_window, - sfc->announced_window_delta + acked_local_window); + if (sfc_ != nullptr) { + srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window, + sfc_->remote_window_delta() + remote_window); + slw_str = fmt_int64_diff_str(local_window_delta_ + acked_local_window, + local_window_delta_ + acked_local_window); + saw_str = fmt_int64_diff_str(announced_window_delta_ + acked_local_window, + announced_window_delta_ + acked_local_window); } else { - srw_str = gpr_leftpad("", ' ', TRACE_PADDING); - slw_str = gpr_leftpad("", ' ', TRACE_PADDING); - saw_str = gpr_leftpad("", ' ', TRACE_PADDING); + srw_str = gpr_leftpad("", ' ', kTracePadding); + slw_str = gpr_leftpad("", ' ', kTracePadding); + saw_str = gpr_leftpad("", ' ', kTracePadding); } gpr_log(GPR_DEBUG, "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", - tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr", - reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str); + tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0, + tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str, + tlw_str, taw_str, srw_str, slw_str, saw_str); gpr_free(trw_str); gpr_free(tlw_str); gpr_free(taw_str); @@ -129,13 +117,13 @@ static void posttrace(shadow_flow_control* shadow_fc, gpr_free(saw_str); } -static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { - switch (urgency) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: +const char* FlowControlAction::UrgencyString(Urgency u) { + switch (u) { + case Urgency::NO_ACTION_NEEDED: return "no action"; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: + case Urgency::UPDATE_IMMEDIATELY: return "update immediately"; - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: + case Urgency::QUEUE_UPDATE: return "queue update"; default: GPR_UNREACHABLE_CODE(return "unknown"); @@ -143,209 +131,132 @@ static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { GPR_UNREACHABLE_CODE(return "unknown"); } -static void trace_action(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_flowctl_action action) { +void FlowControlAction::Trace(grpc_chttp2_transport* t) const { char* iw_str = fmt_uint32_diff_str( - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - action.initial_window_size); + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + initial_window_size_); char* mf_str = fmt_uint32_diff_str( - tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - action.max_frame_size); - gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s", - urgency_to_string(action.send_transport_update), - urgency_to_string(action.send_stream_update), - urgency_to_string(action.send_setting_update), iw_str, mf_str); + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + max_frame_size_); + gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s", + UrgencyString(send_transport_update_), + UrgencyString(send_stream_update_), + UrgencyString(send_initial_window_update_), iw_str, + UrgencyString(send_max_frame_size_update_), mf_str); gpr_free(iw_str); gpr_free(mf_str); } -#define PRETRACE(tfc, sfc) \ - shadow_flow_control shadow_fc; \ - GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc)) -#define POSTTRACE(tfc, sfc, reason) \ - GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason)) -#define TRACEACTION(tfc, action) \ - GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action)) -#else -#define PRETRACE(tfc, sfc) -#define POSTTRACE(tfc, sfc, reason) -#define TRACEACTION(tfc, action) -#endif - -/* How many bytes of incoming flow control would we like to advertise */ -static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport_flowctl* tfc) { - return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), - tfc->announced_stream_total_over_incoming_window + - tfc->target_initial_window_size); -} - -// we have sent data on the wire, we must track this in our bookkeeping for the -// remote peer's flow control. -void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - int64_t size) { - PRETRACE(tfc, sfc); - tfc->remote_window -= size; - sfc->remote_window_delta -= size; - POSTTRACE(tfc, sfc, " data sent"); -} - -static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - if (sfc->announced_window_delta > 0) { - tfc->announced_stream_total_over_incoming_window -= - sfc->announced_window_delta; - } else { - tfc->announced_stream_total_under_incoming_window += - -sfc->announced_window_delta; - } -} - -static void announced_window_delta_postupdate( - grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { - if (sfc->announced_window_delta > 0) { - tfc->announced_stream_total_over_incoming_window += - sfc->announced_window_delta; - } else { - tfc->announced_stream_total_under_incoming_window -= - -sfc->announced_window_delta; +TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx, + const grpc_chttp2_transport* t, + bool enable_bdp_probe) + : t_(t), + enable_bdp_probe_(enable_bdp_probe), + bdp_estimator_(t->peer_string), + pid_controller_(grpc_core::PidController::Args() + .set_gain_p(4) + .set_gain_i(8) + .set_gain_d(0) + .set_initial_control_value(TargetLogBdp()) + .set_min_control_value(-1) + .set_max_control_value(25) + .set_integral_range(10)), + last_pid_update_(grpc_exec_ctx_now(exec_ctx)) {} + +uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) { + FlowControlTrace trace("t updt sent", this, nullptr); + const uint32_t target_announced_window = (const uint32_t)target_window(); + if ((writing_anyway || announced_window_ <= target_announced_window / 2) && + announced_window_ != target_announced_window) { + const uint32_t announce = (uint32_t)GPR_CLAMP( + target_announced_window - announced_window_, 0, UINT32_MAX); + announced_window_ += announce; + return announce; } + return 0; } -// We have received data from the wire. We must track this in our own flow -// control bookkeeping. -// Returns an error if the incoming frame violates our flow control. -grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - int64_t incoming_frame_size) { - uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - uint32_t acked_init_window = - tfc->t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - PRETRACE(tfc, sfc); - if (incoming_frame_size > tfc->announced_window) { +grpc_error* TransportFlowControl::ValidateRecvData( + int64_t incoming_frame_size) { + if (incoming_frame_size > announced_window_) { char* msg; gpr_asprintf(&msg, "frame of size %" PRId64 " overflows local window of %" PRId64, - incoming_frame_size, tfc->announced_window); + incoming_frame_size, announced_window_); grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return err; } + return GRPC_ERROR_NONE; +} - if (sfc != NULL) { - int64_t acked_stream_window = - sfc->announced_window_delta + acked_init_window; - int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window; - if (incoming_frame_size > acked_stream_window) { - if (incoming_frame_size <= sent_stream_window) { - gpr_log( - GPR_ERROR, - "Incoming frame of size %" PRId64 - " exceeds local window size of %" PRId64 - ".\n" - "The (un-acked, future) window size would be %" PRId64 - " which is not exceeded.\n" - "This would usually cause a disconnection, but allowing it due to" - "broken HTTP2 implementations in the wild.\n" - "See (for example) https://github.com/netty/netty/issues/6520.", - incoming_frame_size, acked_stream_window, sent_stream_window); - } else { - char* msg; - gpr_asprintf(&msg, "frame of size %" PRId64 - " overflows local window of %" PRId64, - incoming_frame_size, acked_stream_window); - grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - gpr_free(msg); - return err; - } - } +StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc, + const grpc_chttp2_stream* s) + : tfc_(tfc), s_(s) {} - announced_window_delta_preupdate(tfc, sfc); - sfc->announced_window_delta -= incoming_frame_size; - announced_window_delta_postupdate(tfc, sfc); - sfc->local_window_delta -= incoming_frame_size; - } +grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) { + FlowControlTrace trace(" data recv", tfc_, this); - tfc->announced_window -= incoming_frame_size; + grpc_error* error = GRPC_ERROR_NONE; + error = tfc_->ValidateRecvData(incoming_frame_size); + if (error != GRPC_ERROR_NONE) return error; - POSTTRACE(tfc, sfc, " data recv"); - return GRPC_ERROR_NONE; -} - -// Returns a non zero announce integer if we should send a transport window -// update -uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl* tfc, bool writing_anyway) { - PRETRACE(tfc, NULL); - uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); - uint32_t threshold_to_send_transport_window_update = - tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4 - : target_announced_window / 2; - if ((writing_anyway || - tfc->announced_window <= threshold_to_send_transport_window_update) && - tfc->announced_window != target_announced_window) { - uint32_t announce = (uint32_t)GPR_CLAMP( - target_announced_window - tfc->announced_window, 0, UINT32_MAX); - tfc->announced_window += announce; - POSTTRACE(tfc, NULL, "t updt sent"); - return announce; + uint32_t sent_init_window = + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + uint32_t acked_init_window = + tfc_->transport()->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + + int64_t acked_stream_window = announced_window_delta_ + acked_init_window; + int64_t sent_stream_window = announced_window_delta_ + sent_init_window; + if (incoming_frame_size > acked_stream_window) { + if (incoming_frame_size <= sent_stream_window) { + gpr_log(GPR_ERROR, + "Incoming frame of size %" PRId64 + " exceeds local window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to" + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + incoming_frame_size, acked_stream_window, sent_stream_window); + } else { + char* msg; + gpr_asprintf(&msg, "frame of size %" PRId64 + " overflows local window of %" PRId64, + incoming_frame_size, acked_stream_window); + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + return err; + } } - GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc, - tfc->t->is_client ? "cli" : "svr")); - return 0; + + UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size); + local_window_delta_ -= incoming_frame_size; + tfc_->CommitRecvData(incoming_frame_size); + return GRPC_ERROR_NONE; } -// Returns a non zero announce integer if we should send a stream window update -uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( - grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { - PRETRACE(tfc, sfc); - if (sfc->local_window_delta > sfc->announced_window_delta) { +uint32_t StreamFlowControl::MaybeSendUpdate() { + FlowControlTrace trace("s updt sent", tfc_, this); + if (local_window_delta_ > announced_window_delta_) { uint32_t announce = (uint32_t)GPR_CLAMP( - sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX); - announced_window_delta_preupdate(tfc, sfc); - sfc->announced_window_delta += announce; - announced_window_delta_postupdate(tfc, sfc); - POSTTRACE(tfc, sfc, "s updt sent"); + local_window_delta_ - announced_window_delta_, 0, UINT32_MAX); + UpdateAnnouncedWindowDelta(tfc_, announce); return announce; } - GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc, - sfc->s->id, tfc->t->is_client ? "cli" : "svr")); return 0; } -// we have received a WINDOW_UPDATE frame for a transport -void grpc_chttp2_flowctl_recv_transport_update( - grpc_chttp2_transport_flowctl* tfc, uint32_t size) { - PRETRACE(tfc, NULL); - tfc->remote_window += size; - POSTTRACE(tfc, NULL, "t updt recv"); -} - -// we have received a WINDOW_UPDATE frame for a stream -void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - uint32_t size) { - PRETRACE(tfc, sfc); - sfc->remote_window_delta += size; - POSTTRACE(tfc, sfc, "s updt recv"); -} - -void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - size_t max_size_hint, - size_t have_already) { - PRETRACE(tfc, sfc); +void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) { + FlowControlTrace trace("app st recv", tfc_, this); uint32_t max_recv_bytes; uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; /* clamp max recv hint to an allowable size */ if (max_size_hint >= UINT32_MAX - sent_init_window) { @@ -363,65 +274,18 @@ void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, /* add some small lookahead to keep pipelines flowing */ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window); - if (sfc->local_window_delta < max_recv_bytes) { + if (local_window_delta_ < max_recv_bytes) { uint32_t add_max_recv_bytes = - (uint32_t)(max_recv_bytes - sfc->local_window_delta); - sfc->local_window_delta += add_max_recv_bytes; - } - POSTTRACE(tfc, sfc, "app st recv"); -} - -void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - announced_window_delta_preupdate(tfc, sfc); -} - -// Returns an urgency with which to make an update -static grpc_chttp2_flowctl_urgency delta_is_significant( - const grpc_chttp2_transport_flowctl* tfc, int32_t value, - grpc_chttp2_setting_id setting_id) { - int64_t delta = (int64_t)value - - (int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id]; - // TODO(ncteisen): tune this - if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) { - return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; - } else { - return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED; - } -} - -// Takes in a target and uses the pid controller to return a stabilized -// guess at the new bdp. -static double get_pid_controller_guess(grpc_exec_ctx* exec_ctx, - grpc_chttp2_transport_flowctl* tfc, - double target) { - grpc_millis now = grpc_exec_ctx_now(exec_ctx); - if (!tfc->pid_controller_initialized) { - tfc->last_pid_update = now; - tfc->pid_controller_initialized = true; - tfc->pid_controller.Init(grpc_core::PidController::Args() - .set_gain_p(4) - .set_gain_i(8) - .set_gain_d(0) - .set_initial_control_value(target) - .set_min_control_value(-1) - .set_max_control_value(25) - .set_integral_range(10)); - return pow(2, target); + (uint32_t)(max_recv_bytes - local_window_delta_); + local_window_delta_ += add_max_recv_bytes; } - double bdp_error = target - tfc->pid_controller->last_control_value(); - double dt = (double)(now - tfc->last_pid_update) * 1e-3; - double log2_bdp_guess = tfc->pid_controller->Update(bdp_error, dt); - tfc->last_pid_update = now; - return pow(2, log2_bdp_guess); } // Take in a target and modifies it based on the memory pressure of the system -static double get_target_under_memory_pressure( - grpc_chttp2_transport_flowctl* tfc, double target) { +static double AdjustForMemoryPressure(grpc_resource_quota* quota, + double target) { // do not increase window under heavy memory pressure. - double memory_pressure = grpc_resource_quota_get_memory_pressure( - grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep))); + double memory_pressure = grpc_resource_quota_get_memory_pressure(quota); static const double kLowMemPressure = 0.1; static const double kZeroTarget = 22; static const double kHighMemPressure = 0.8; @@ -436,75 +300,82 @@ static double get_target_under_memory_pressure( return target; } -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - grpc_exec_ctx* exec_ctx, grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - grpc_chttp2_flowctl_action action; - memset(&action, 0, sizeof(action)); +double TransportFlowControl::TargetLogBdp() { + return AdjustForMemoryPressure( + grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)), + 1 + log2(bdp_estimator_.EstimateBdp())); +} + +double TransportFlowControl::SmoothLogBdp(grpc_exec_ctx* exec_ctx, + double value) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + double bdp_error = value - pid_controller_.last_control_value(); + const double dt = (double)(now - last_pid_update_) * 1e-3; + last_pid_update_ = now; + return pid_controller_.Update(bdp_error, dt); +} + +FlowControlAction::Urgency TransportFlowControl::DeltaUrgency( + int32_t value, grpc_chttp2_setting_id setting_id) { + int64_t delta = + (int64_t)value - (int64_t)t_->settings[GRPC_LOCAL_SETTINGS][setting_id]; // TODO(ncteisen): tune this - if (sfc != NULL && !sfc->s->read_closed) { - uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - if ((int64_t)sfc->local_window_delta > - (int64_t)sfc->announced_window_delta && - (int64_t)sfc->announced_window_delta + sent_init_window <= - sent_init_window / 2) { - action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; - } else if (sfc->local_window_delta > sfc->announced_window_delta) { - action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; - } + if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) { + return FlowControlAction::Urgency::QUEUE_UPDATE; + } else { + return FlowControlAction::Urgency::NO_ACTION_NEEDED; } - if (tfc->enable_bdp_probe) { +} + +FlowControlAction TransportFlowControl::PeriodicUpdate( + grpc_exec_ctx* exec_ctx) { + FlowControlAction action; + if (enable_bdp_probe_) { // get bdp estimate and update initial_window accordingly. - int64_t estimate = -1; - if (tfc->bdp_estimator->EstimateBdp(&estimate)) { - double target = 1 + log2((double)estimate); - - // target might change based on how much memory pressure we are under - // TODO(ncteisen): experiment with setting target to be huge under low - // memory pressure. - target = get_target_under_memory_pressure(tfc, target); - - // run our target through the pid controller to stabilize change. - // TODO(ncteisen): experiment with other controllers here. - double bdp_guess = get_pid_controller_guess(exec_ctx, tfc, target); - - // Though initial window 'could' drop to 0, we keep the floor at 128 - tfc->target_initial_window_size = - (int32_t)GPR_CLAMP(bdp_guess, 128, INT32_MAX); - - grpc_chttp2_flowctl_urgency init_window_update_urgency = - delta_is_significant(tfc, tfc->target_initial_window_size, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE); - if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - action.send_setting_update = init_window_update_urgency; - action.initial_window_size = (uint32_t)tfc->target_initial_window_size; - } - } + // target might change based on how much memory pressure we are under + // TODO(ncteisen): experiment with setting target to be huge under low + // memory pressure. + const double target = pow(2, SmoothLogBdp(exec_ctx, TargetLogBdp())); + + // Though initial window 'could' drop to 0, we keep the floor at 128 + target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX); + + action.set_send_initial_window_update( + DeltaUrgency(target_initial_window_size_, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE), + target_initial_window_size_); // get bandwidth estimate and update max_frame accordingly. - double bw_dbl = -1; - if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) { - // we target the max of BDP or bandwidth in microseconds. - int32_t frame_size = (int32_t)GPR_CLAMP( - GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, - tfc->target_initial_window_size), - 16384, 16777215); - grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant( - tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE); - if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - if (frame_size_urgency > action.send_setting_update) { - action.send_setting_update = frame_size_urgency; - } - action.max_frame_size = (uint32_t)frame_size; - } - } + double bw_dbl = bdp_estimator_.EstimateBandwidth(); + // we target the max of BDP or bandwidth in microseconds. + int32_t frame_size = (int32_t)GPR_CLAMP( + GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, + target_initial_window_size_), + 16384, 16777215); + action.set_send_max_frame_size_update( + DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE), + frame_size); } - uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); - if (tfc->announced_window < target_announced_window / 2) { - action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; + return UpdateAction(action); +} + +FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) { + // TODO(ncteisen): tune this + if (!s_->read_closed) { + uint32_t sent_init_window = + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + if (local_window_delta_ > announced_window_delta_ && + announced_window_delta_ + sent_init_window <= sent_init_window / 2) { + action.set_send_stream_update( + FlowControlAction::Urgency::UPDATE_IMMEDIATELY); + } else if (local_window_delta_ > announced_window_delta_) { + action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE); + } } - TRACEACTION(tfc, action); + return action; } + +} // namespace chttp2 +} // namespace grpc_core diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h new file mode 100644 index 0000000000..d5107d467b --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -0,0 +1,328 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H + +#include <stdint.h> + +#include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/http2_settings.h" +#include "src/core/lib/support/manual_constructor.h" +#include "src/core/lib/transport/bdp_estimator.h" +#include "src/core/lib/transport/pid_controller.h" + +struct grpc_chttp2_transport; +struct grpc_chttp2_stream; + +extern "C" grpc_tracer_flag grpc_flowctl_trace; + +namespace grpc_core { +namespace chttp2 { + +static constexpr uint32_t kDefaultWindow = 65535; + +class TransportFlowControl; +class StreamFlowControl; + +class FlowControlAction { + public: + enum class Urgency : uint8_t { + // Nothing to be done. + NO_ACTION_NEEDED = 0, + // Initiate a write to update the initial window immediately. + UPDATE_IMMEDIATELY, + // Push the flow control update into a send buffer, to be sent + // out the next time a write is initiated. + QUEUE_UPDATE, + }; + + Urgency send_stream_update() const { return send_stream_update_; } + Urgency send_transport_update() const { return send_transport_update_; } + Urgency send_initial_window_update() const { + return send_initial_window_update_; + } + Urgency send_max_frame_size_update() const { + return send_max_frame_size_update_; + } + uint32_t initial_window_size() const { return initial_window_size_; } + uint32_t max_frame_size() const { return max_frame_size_; } + + FlowControlAction& set_send_stream_update(Urgency u) { + send_stream_update_ = u; + return *this; + } + FlowControlAction& set_send_transport_update(Urgency u) { + send_transport_update_ = u; + return *this; + } + FlowControlAction& set_send_initial_window_update(Urgency u, + uint32_t update) { + send_initial_window_update_ = u; + initial_window_size_ = update; + return *this; + } + FlowControlAction& set_send_max_frame_size_update(Urgency u, + uint32_t update) { + send_max_frame_size_update_ = u; + max_frame_size_ = update; + return *this; + } + + static const char* UrgencyString(Urgency u); + void Trace(grpc_chttp2_transport* t) const; + + private: + Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED; + Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED; + Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED; + Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED; + uint32_t initial_window_size_ = 0; + uint32_t max_frame_size_ = 0; +}; + +class FlowControlTrace { + public: + FlowControlTrace(const char* reason, TransportFlowControl* tfc, + StreamFlowControl* sfc) { + if (enabled_) Init(reason, tfc, sfc); + } + + ~FlowControlTrace() { + if (enabled_) Finish(); + } + + private: + void Init(const char* reason, TransportFlowControl* tfc, + StreamFlowControl* sfc); + void Finish(); + + const bool enabled_ = GRPC_TRACER_ON(grpc_flowctl_trace); + + TransportFlowControl* tfc_; + StreamFlowControl* sfc_; + const char* reason_; + int64_t remote_window_; + int64_t target_window_; + int64_t announced_window_; + int64_t remote_window_delta_; + int64_t local_window_delta_; + int64_t announced_window_delta_; +}; + +class TransportFlowControl { + public: + TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t, + bool enable_bdp_probe); + ~TransportFlowControl() {} + + bool bdp_probe() const { return enable_bdp_probe_; } + + // returns an announce if we should send a transport update to our peer, + // else returns zero; writing_anyway indicates if a write would happen + // regardless of the send - if it is false and this function returns non-zero, + // this announce will cause a write to occur + uint32_t MaybeSendUpdate(bool writing_anyway); + + // Reads the flow control data and returns and actionable struct that will + // tell chttp2 exactly what it needs to do + FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); } + + // Call periodically (at a low-ish rate, 100ms - 10s makes sense) + // to perform more complex flow control calculations and return an action + // to let chttp2 change its parameters + FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx); + + void StreamSentData(int64_t size) { remote_window_ -= size; } + + grpc_error* ValidateRecvData(int64_t incoming_frame_size); + void CommitRecvData(int64_t incoming_frame_size) { + announced_window_ -= incoming_frame_size; + } + + grpc_error* RecvData(int64_t incoming_frame_size) { + FlowControlTrace trace(" data recv", this, nullptr); + grpc_error* error = ValidateRecvData(incoming_frame_size); + if (error != GRPC_ERROR_NONE) return error; + CommitRecvData(incoming_frame_size); + return GRPC_ERROR_NONE; + } + + // we have received a WINDOW_UPDATE frame for a transport + void RecvUpdate(uint32_t size) { + FlowControlTrace trace("t updt recv", this, nullptr); + remote_window_ += size; + } + + int64_t remote_window() const { return remote_window_; } + int64_t target_window() const { + return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), + announced_stream_total_over_incoming_window_ + + target_initial_window_size_); + } + int64_t announced_window() const { return announced_window_; } + + const grpc_chttp2_transport* transport() const { return t_; } + + void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { + if (delta > 0) { + announced_stream_total_over_incoming_window_ -= delta; + } else { + announced_stream_total_under_incoming_window_ += -delta; + } + } + + void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { + if (delta > 0) { + announced_stream_total_over_incoming_window_ += delta; + } else { + announced_stream_total_under_incoming_window_ -= -delta; + } + } + + BdpEstimator* bdp_estimator() { return &bdp_estimator_; } + + void TestOnlyForceHugeWindow() { + announced_window_ = 1024 * 1024 * 1024; + remote_window_ = 1024 * 1024 * 1024; + } + + private: + double TargetLogBdp(); + double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value); + FlowControlAction::Urgency DeltaUrgency(int32_t value, + grpc_chttp2_setting_id setting_id); + + FlowControlAction UpdateAction(FlowControlAction action) { + if (announced_window_ < target_window() / 2) { + action.set_send_transport_update( + FlowControlAction::Urgency::UPDATE_IMMEDIATELY); + } + return action; + } + + const grpc_chttp2_transport* const t_; + + /** Our bookkeeping for the remote peer's available window */ + int64_t remote_window_ = kDefaultWindow; + + /** calculating what we should give for local window: + we track the total amount of flow control over initial window size + across all streams: this is data that we want to receive right now (it + has an outstanding read) + and the total amount of flow control under initial window size across all + streams: this is data we've read early + we want to adjust incoming_window such that: + incoming_window = total_over - max(bdp - total_under, 0) */ + int64_t announced_stream_total_over_incoming_window_ = 0; + int64_t announced_stream_total_under_incoming_window_ = 0; + + /** This is out window according to what we have sent to our remote peer. The + * difference between this and target window is what we use to decide when + * to send WINDOW_UPDATE frames. */ + int64_t announced_window_ = kDefaultWindow; + + int32_t target_initial_window_size_ = kDefaultWindow; + + /** should we probe bdp? */ + const bool enable_bdp_probe_; + + /* bdp estimation */ + grpc_core::BdpEstimator bdp_estimator_; + + /* pid controller */ + grpc_core::PidController pid_controller_; + grpc_millis last_pid_update_ = 0; +}; + +class StreamFlowControl { + public: + StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); + ~StreamFlowControl() { + tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); + } + + FlowControlAction UpdateAction(FlowControlAction action); + FlowControlAction MakeAction() { return UpdateAction(tfc_->MakeAction()); } + + // we have sent data on the wire, we must track this in our bookkeeping for + // the remote peer's flow control. + void SentData(int64_t outgoing_frame_size) { + FlowControlTrace tracer(" data sent", tfc_, this); + tfc_->StreamSentData(outgoing_frame_size); + remote_window_delta_ -= outgoing_frame_size; + } + + // we have received data from the wire + grpc_error* RecvData(int64_t incoming_frame_size); + + // returns an announce if we should send a stream update to our peer, else + // returns zero + uint32_t MaybeSendUpdate(); + + // we have received a WINDOW_UPDATE frame for a stream + void RecvUpdate(uint32_t size) { + FlowControlTrace trace("s updt recv", tfc_, this); + remote_window_delta_ += size; + } + + // the application is asking for a certain amount of bytes + void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already); + + int64_t remote_window_delta() const { return remote_window_delta_; } + int64_t local_window_delta() const { return local_window_delta_; } + int64_t announced_window_delta() const { return announced_window_delta_; } + + const grpc_chttp2_stream* stream() const { return s_; } + + void TestOnlyForceHugeWindow() { + announced_window_delta_ = 1024 * 1024 * 1024; + local_window_delta_ = 1024 * 1024 * 1024; + remote_window_delta_ = 1024 * 1024 * 1024; + } + + private: + TransportFlowControl* const tfc_; + const grpc_chttp2_stream* const s_; + + void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) { + tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); + announced_window_delta_ += change; + tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); + } + + /** window available for us to send to peer, over or under the initial + * window + * size of the transport... ie: + * remote_window = remote_window_delta + transport.initial_window_size */ + int64_t remote_window_delta_ = 0; + + /** window available for peer to send to us (as a delta on + * transport.initial_window_size) + * local_window = local_window_delta + transport.initial_window_size */ + int64_t local_window_delta_ = 0; + + /** window available for peer to send to us over this stream that we have + * announced to the peer */ + int64_t announced_window_delta_ = 0; +}; + +} // namespace chttp2 +} // namespace grpc_core + +#endif diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index 2995bf7310..db0245bb57 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -202,13 +202,13 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, } if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && parser->incoming_settings[id] != parser->value) { - t->flow_control.initial_window_update += + t->initial_window_update += (int64_t)parser->value - parser->incoming_settings[id]; if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_flowctl_trace)) { gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change", t, t->is_client ? "cli" : "svr", - (int)t->flow_control.initial_window_update); + (int)t->initial_window_update); } } parser->incoming_settings[id] = parser->value; diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.cc b/src/core/ext/transport/chttp2/transport/frame_window_update.cc index c9ab8d1b50..15eaf59285 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.cc +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.cc @@ -96,8 +96,7 @@ grpc_error *grpc_chttp2_window_update_parser_parse( if (t->incoming_stream_id != 0) { if (s != NULL) { - grpc_chttp2_flowctl_recv_stream_update( - &t->flow_control, &s->flow_control, received_update); + s->flow_control->RecvUpdate(received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); grpc_chttp2_initiate_write( @@ -106,10 +105,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse( } } } else { - bool was_zero = t->flow_control.remote_window <= 0; - grpc_chttp2_flowctl_recv_transport_update(&t->flow_control, - received_update); - bool is_zero = t->flow_control.remote_window <= 0; + bool was_zero = t->flow_control->remote_window() <= 0; + t->flow_control->RecvUpdate(received_update); + bool is_zero = t->flow_control->remote_window() <= 0; if (was_zero && !is_zero) { grpc_chttp2_initiate_write( exec_ctx, t, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index 3d1df19bc3..7c17229122 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -33,6 +33,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/http2_errors.h" @@ -650,9 +651,14 @@ static const uint8_t inverse_base64[256] = { /* emission helpers */ static grpc_error *on_hdr(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_mdelem md, int add_to_table) { - if (GRPC_TRACER_ON(grpc_http_trace) && !GRPC_MDELEM_IS_INTERNED(md)) { + if (GRPC_TRACER_ON(grpc_http_trace)) { char *k = grpc_slice_to_c_string(GRPC_MDKEY(md)); - char *v = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char *v = NULL; + if (grpc_is_binary_header(GRPC_MDKEY(md))) { + v = grpc_dump_slice(GRPC_MDVALUE(md), GPR_DUMP_HEX); + } else { + v = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + } gpr_log( GPR_DEBUG, "Decode: '%s: %s', elem_interned=%d [%d], k_interned=%d, v_interned=%d", diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index c75f813393..9e0796e820 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -22,6 +22,7 @@ #include <assert.h> #include <stdbool.h> +#include "src/core/ext/transport/chttp2/transport/flow_control.h" #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/ext/transport/chttp2/transport/frame_data.h" #include "src/core/ext/transport/chttp2/transport/frame_goaway.h" @@ -38,9 +39,7 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/manual_constructor.h" -#include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" -#include "src/core/lib/transport/pid_controller.h" #include "src/core/lib/transport/transport_impl.h" #ifdef __cplusplus @@ -238,48 +237,6 @@ typedef enum { GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, } grpc_chttp2_keepalive_state; -typedef struct { - /** initial window change. This is tracked as we parse settings frames from - * the remote peer. If there is a positive delta, then we will make all - * streams readable since they may have become unstalled */ - int64_t initial_window_update; - - /** Our bookkeeping for the remote peer's available window */ - int64_t remote_window; - - /** calculating what we should give for local window: - we track the total amount of flow control over initial window size - across all streams: this is data that we want to receive right now (it - has an outstanding read) - and the total amount of flow control under initial window size across all - streams: this is data we've read early - we want to adjust incoming_window such that: - incoming_window = total_over - max(bdp - total_under, 0) */ - int64_t announced_stream_total_over_incoming_window; - int64_t announced_stream_total_under_incoming_window; - - /** This is out window according to what we have sent to our remote peer. The - * difference between this and target window is what we use to decide when - * to send WINDOW_UPDATE frames. */ - int64_t announced_window; - - int32_t target_initial_window_size; - - /** should we probe bdp? */ - bool enable_bdp_probe; - - /* bdp estimation */ - grpc_core::ManualConstructor<grpc_core::BdpEstimator> bdp_estimator; - - /* pid controller */ - bool pid_controller_initialized; - grpc_core::ManualConstructor<grpc_core::PidController> pid_controller; - grpc_millis last_pid_update; - - // pointer back to transport for tracing - const grpc_chttp2_transport *t; -} grpc_chttp2_transport_flowctl; - struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -395,7 +352,12 @@ struct grpc_chttp2_transport { /** parser for goaway frames */ grpc_chttp2_goaway_parser goaway_parser; - grpc_chttp2_transport_flowctl flow_control; + grpc_core::ManualConstructor<grpc_core::chttp2::TransportFlowControl> + flow_control; + /** initial window change. This is tracked as we parse settings frames from + * the remote peer. If there is a positive delta, then we will make all + * streams readable since they may have become unstalled */ + int64_t initial_window_update = 0; /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; @@ -477,25 +439,6 @@ typedef enum { GPRC_METADATA_PUBLISHED_AT_CLOSE } grpc_published_metadata_method; -typedef struct { - /** window available for us to send to peer, over or under the initial window - * size of the transport... ie: - * remote_window = remote_window_delta + transport.initial_window_size */ - int64_t remote_window_delta; - - /** window available for peer to send to us (as a delta on - * transport.initial_window_size) - * local_window = local_window_delta + transport.initial_window_size */ - int64_t local_window_delta; - - /** window available for peer to send to us over this stream that we have - * announced to the peer */ - int64_t announced_window_delta; - - // read only pointer back to stream for data - const grpc_chttp2_stream *s; -} grpc_chttp2_stream_flowctl; - struct grpc_chttp2_stream { grpc_chttp2_transport *t; grpc_stream_refcount *refcount; @@ -589,7 +532,8 @@ struct grpc_chttp2_stream { bool sent_initial_metadata; bool sent_trailing_metadata; - grpc_chttp2_stream_flowctl flow_control; + grpc_core::ManualConstructor<grpc_core::chttp2::StreamFlowControl> + flow_control; grpc_slice_buffer flow_controlled_buffer; @@ -700,73 +644,10 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t, /********* Flow Control ***************/ -// we have sent data on the wire -void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc, - int64_t size); - -// we have received data from the wire -grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc, - int64_t incoming_frame_size); - -// returns an announce if we should send a transport update to our peer, -// else returns zero -uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl *tfc, bool writing_anyway); - -// returns an announce if we should send a stream update to our peer, else -// returns zero -uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( - grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc); - -// we have received a WINDOW_UPDATE frame for a transport -void grpc_chttp2_flowctl_recv_transport_update( - grpc_chttp2_transport_flowctl *tfc, uint32_t size); - -// we have received a WINDOW_UPDATE frame for a stream -void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc, - uint32_t size); - -// the application is asking for a certain amount of bytes -void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc, - size_t max_size_hint, - size_t have_already); - -void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc); - -typedef enum { - // Nothing to be done. - GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0, - // Initiate a write to update the initial window immediately. - GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY, - // Push the flow control update into a send buffer, to be sent - // out the next time a write is initiated. - GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE, -} grpc_chttp2_flowctl_urgency; - -typedef struct { - grpc_chttp2_flowctl_urgency send_stream_update; - grpc_chttp2_flowctl_urgency send_transport_update; - grpc_chttp2_flowctl_urgency send_setting_update; - uint32_t initial_window_size; - uint32_t max_frame_size; -} grpc_chttp2_flowctl_action; - -// Reads the flow control data and returns and actionable struct that will tell -// chttp2 exactly what it needs to do -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc); - // Takes in a flow control action and performs all the needed operations. -void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, - grpc_chttp2_flowctl_action action, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +void grpc_chttp2_act_on_flowctl_action( + grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action, + grpc_chttp2_transport *t, grpc_chttp2_stream *s); /********* End of Flow Control ***************/ @@ -800,16 +681,6 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, extern grpc_tracer_flag grpc_http_trace; extern grpc_tracer_flag grpc_flowctl_trace; -#ifndef NDEBUG -#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) \ - if (!(GRPC_TRACER_ON(grpc_flowctl_trace))) \ - ; \ - else \ - stmt -#else -#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) -#endif - #define GRPC_CHTTP2_IF_TRACING(stmt) \ if (!(GRPC_TRACER_ON(grpc_http_trace))) \ ; \ diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 78886b497a..efa5791d3f 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -355,14 +355,15 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); grpc_error *err = GRPC_ERROR_NONE; - err = grpc_chttp2_flowctl_recv_data(&t->flow_control, - s == NULL ? NULL : &s->flow_control, - t->incoming_frame_size); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - s == NULL ? NULL : &s->flow_control), - t, s); + grpc_core::chttp2::FlowControlAction action; + if (s == nullptr) { + err = t->flow_control->RecvData(t->incoming_frame_size); + action = t->flow_control->MakeAction(); + } else { + err = s->flow_control->RecvData(t->incoming_frame_size); + action = s->flow_control->MakeAction(); + } + grpc_chttp2_act_on_flowctl_action(exec_ctx, action, t, s); if (err != GRPC_ERROR_NONE) { goto error_handler; } @@ -434,19 +435,21 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_millis *cached_timeout = static_cast<grpc_millis *>(grpc_mdelem_get_user_data(md, free_timeout)); grpc_millis timeout; - if (cached_timeout == NULL) { - /* not already parsed: parse it now, and store the result away */ - cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis)); - if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), cached_timeout)) { + if (cached_timeout != NULL) { + timeout = *cached_timeout; + } else { + if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), &timeout)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val); gpr_free(val); - *cached_timeout = GRPC_MILLIS_INF_FUTURE; + timeout = GRPC_MILLIS_INF_FUTURE; + } + if (GRPC_MDELEM_IS_INTERNED(md)) { + /* store the result */ + cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis)); + *cached_timeout = timeout; + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } - timeout = *cached_timeout; - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); - } else { - timeout = *cached_timeout; } if (timeout != GRPC_MILLIS_INF_FUTURE) { grpc_chttp2_incoming_metadata_buffer_set_deadline( diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index c6fecf2ee9..ff76a5fcdb 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -146,13 +146,13 @@ static void report_stall(grpc_chttp2_transport *t, grpc_chttp2_stream *s, s->flow_controlled_bytes_flowed, t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - t->flow_control.remote_window, + t->flow_control->remote_window(), (uint32_t)GPR_MAX( 0, - s->flow_control.remote_window_delta + + s->flow_control->remote_window_delta() + (int64_t)t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), - s->flow_control.remote_window_delta); + s->flow_control->remote_window_delta()); } static bool stream_ref_if_not_destroyed(gpr_refcount *r) { @@ -216,8 +216,7 @@ class WriteContext { void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { uint32_t transport_announce = - grpc_chttp2_flowctl_maybe_send_transport_update(&t_->flow_control, - t_->outbuf.count > 0); + t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0); if (transport_announce) { grpc_transport_one_way_stats throwaway_stats; grpc_slice_buffer_add( @@ -312,7 +311,7 @@ class DataSendContext { uint32_t stream_remote_window() const { return (uint32_t)GPR_MAX( - 0, s_->flow_control.remote_window_delta + + 0, s_->flow_control->remote_window_delta() + (int64_t)t_->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); } @@ -320,7 +319,7 @@ class DataSendContext { uint32_t max_outgoing() const { return (uint32_t)GPR_MIN( t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - GPR_MIN(stream_remote_window(), t_->flow_control.remote_window)); + GPR_MIN(stream_remote_window(), t_->flow_control->remote_window())); } bool AnyOutgoing() const { return max_outgoing() != 0; } @@ -352,8 +351,7 @@ class DataSendContext { grpc_metadata_batch_is_empty(s_->send_trailing_metadata); grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes, is_last_frame_, &s_->stats.outgoing, &t_->outbuf); - grpc_chttp2_flowctl_sent_data(&t_->flow_control, &s_->flow_control, - send_bytes); + s_->flow_control->SentData(send_bytes); if (s_->compressed_data_buffer.length == 0) { s_->sending_bytes += s_->uncompressed_data_size; } @@ -400,8 +398,8 @@ class StreamWriteContext { gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_, t_->is_client ? "CLIENT" : "SERVER", s->id, s->sent_initial_metadata, s->send_initial_metadata != NULL, - (int)(s->flow_control.local_window_delta - - s->flow_control.announced_window_delta))); + (int)(s->flow_control->local_window_delta() - + s->flow_control->announced_window_delta()))); } void FlushInitialMetadata(grpc_exec_ctx *exec_ctx) { @@ -447,8 +445,7 @@ class StreamWriteContext { void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { /* send any window updates */ - uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update( - &t_->flow_control, &s_->flow_control); + const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate(); if (stream_announce == 0) return; grpc_slice_buffer_add( @@ -469,10 +466,10 @@ class StreamWriteContext { DataSendContext data_send_context(write_context_, t_, s_); if (!data_send_context.AnyOutgoing()) { - if (t_->flow_control.remote_window == 0) { + if (t_->flow_control->remote_window() <= 0) { report_stall(t_, s_, "transport"); grpc_chttp2_list_add_stalled_by_transport(t_, s_); - } else if (data_send_context.stream_remote_window() == 0) { + } else if (data_send_context.stream_remote_window() <= 0) { report_stall(t_, s_, "stream"); grpc_chttp2_list_add_stalled_by_stream(t_, s_); } @@ -588,7 +585,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( ctx.FlushQueuedBuffers(exec_ctx); ctx.EnactHpackSettings(exec_ctx); - if (t->flow_control.remote_window > 0) { + if (t->flow_control->remote_window() > 0) { ctx.UpdateStreamsNoLongerStalled(); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index ff1367fb28..97e4f7d72b 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -692,7 +692,7 @@ static void create_grpc_frame(grpc_exec_ctx *exec_ctx, uint8_t *p = (uint8_t *)write_buffer; /* Append 5 byte header */ /* Compressed flag */ - *p++ = (flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0; + *p++ = (uint8_t)((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0); /* Message length */ *p++ = (uint8_t)(length >> 24); *p++ = (uint8_t)(length >> 16); diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 67a8358927..1551f5e988 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -623,7 +623,7 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; } else { - if (other && !other->closed) { + if (!other || !other->closed) { fill_in_metadata(exec_ctx, s, s->send_trailing_md_op->payload->send_trailing_metadata .send_trailing_metadata, @@ -925,7 +925,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, INPROC_LOG(GPR_DEBUG, "Extra initial metadata %p", s); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata"); } else { - if (!other->closed) { + if (!other || !other->closed) { fill_in_metadata( exec_ctx, s, op->payload->send_initial_metadata.send_initial_metadata, @@ -976,7 +976,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, (other->recv_trailing_md_op != NULL))) || (op->send_trailing_metadata && !op->send_message) || (op->recv_initial_metadata && s->to_read_initial_md_filled) || - (op->recv_message && (other && other->send_message_op != NULL)) || + (op->recv_message && other && (other->send_message_op != NULL)) || (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { if (!s->op_closure_scheduled) { GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE); diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 37cce335ca..5eab1d3158 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -39,6 +39,12 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx, ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set); } +void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_endpoint* ep, + grpc_pollset_set* pollset_set) { + ep->vtable->delete_from_pollset_set(exec_ctx, ep, pollset_set); +} + void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_error* why) { ep->vtable->shutdown(exec_ctx, ep, why); diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 21347d9023..92964e0f2d 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -45,6 +45,8 @@ struct grpc_endpoint_vtable { grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset); + void (*delete_from_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset_set *pollset); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep); @@ -85,14 +87,19 @@ void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why); void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); -/* Add an endpoint to a pollset, so that when the pollset is polled, events from - this endpoint are considered */ +/* Add an endpoint to a pollset or pollset_set, so that when the pollset is + polled, events from this endpoint are considered */ void grpc_endpoint_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset_set); +/* Delete an endpoint from a pollset_set */ +void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset_set); + grpc_resource_user *grpc_endpoint_get_resource_user(grpc_endpoint *endpoint); struct grpc_endpoint { diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 59dd8fd2fe..fa6d79cbfc 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -30,6 +30,7 @@ #include <pthread.h> #include <string.h> #include <sys/socket.h> +#include <sys/syscall.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -49,100 +50,97 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/spinlock.h" -/******************************************************************************* - * Polling object - */ -typedef enum { - PO_POLLING_GROUP, - PO_POLLSET_SET, - PO_POLLSET, - PO_FD, - /* ordering is important: we always want to lock pollsets before fds: - this guarantees that using an fd as a pollable is safe */ - PO_EMPTY_POLLABLE, - PO_COUNT -} polling_obj_type; - -typedef struct polling_obj polling_obj; -typedef struct polling_group polling_group; - -struct polling_obj { - gpr_mu mu; - polling_obj_type type; - polling_group *group; - struct polling_obj *next; - struct polling_obj *prev; -}; +// debug aid: create workers on the heap (allows asan to spot +// use-after-destruction) +//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 -struct polling_group { - polling_obj po; - gpr_refcount refs; -}; - -static void po_init(polling_obj *po, polling_obj_type type); -static void po_destroy(polling_obj *po); -static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b); -static int po_cmp(polling_obj *a, polling_obj *b); +#define MAX_EPOLL_EVENTS 100 +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 -static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, - size_t initial_po_count); -static polling_group *pg_ref(polling_group *pg); -static void pg_unref(polling_group *pg); -static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, - polling_group *b); -static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, - polling_obj *po); +#ifndef NDEBUG +grpc_tracer_flag grpc_trace_pollable_refcount = + GRPC_TRACER_INITIALIZER(false, "pollable_refcount"); +#endif /******************************************************************************* * pollable Declarations */ -typedef struct pollable { - polling_obj po; +typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; + +typedef struct pollable pollable; + +/// A pollable is something that can be polled: it has an epoll set to poll on, +/// and a wakeup fd for kicks +/// There are three broad types: +/// - PO_EMPTY - the empty pollable, used before file descriptors are added to +/// a pollset +/// - PO_FD - a pollable containing only one FD - used to optimize single-fd +/// pollsets (which are common with synchronous api usage) +/// - PO_MULTI - a pollable containing many fds +struct pollable { + pollable_type type; // immutable + gpr_refcount refs; + int epfd; grpc_wakeup_fd wakeup; + + // only for type fd... one ref to the owner fd + grpc_fd *owner_fd; + + grpc_pollset_set *pollset_set; + pollable *next; + pollable *prev; + + gpr_mu mu; grpc_pollset_worker *root_worker; -} pollable; -static const char *polling_obj_type_string(polling_obj_type t) { + int event_cursor; + int event_count; + struct epoll_event events[MAX_EPOLL_EVENTS]; +}; + +static const char *pollable_type_string(pollable_type t) { switch (t) { - case PO_POLLING_GROUP: - return "polling_group"; - case PO_POLLSET_SET: - return "pollset_set"; - case PO_POLLSET: + case PO_MULTI: return "pollset"; case PO_FD: return "fd"; - case PO_EMPTY_POLLABLE: - return "empty_pollable"; - case PO_COUNT: - return "<invalid:count>"; + case PO_EMPTY: + return "empty"; } return "<invalid>"; } static char *pollable_desc(pollable *p) { char *out; - gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", - polling_obj_type_string(p->po.type), p->po.group, p->epfd, - p->wakeup.read_fd); + gpr_asprintf(&out, "type=%s epfd=%d wakeup=%d", pollable_type_string(p->type), + p->epfd, p->wakeup.read_fd); return out; } -static pollable g_empty_pollable; +/// Shared empty pollable - used by pollset to poll on until the first fd is +/// added +static pollable *g_empty_pollable; -static void pollable_init(pollable *p, polling_obj_type type); -static void pollable_destroy(pollable *p); -/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable *p); +static grpc_error *pollable_create(pollable_type type, pollable **p); +#ifdef NDEBUG +static pollable *pollable_ref(pollable *p); +static void pollable_unref(pollable *p); +#define POLLABLE_REF(p, r) pollable_ref(p) +#define POLLABLE_UNREF(p, r) pollable_unref(p) +#else +static pollable *pollable_ref(pollable *p, int line, const char *reason); +static void pollable_unref(pollable *p, int line, const char *reason); +#define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r)) +#define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r)) +#endif /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pollable pollable_obj; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -150,11 +148,10 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - /* The fd is either closed or we relinquished control of it. In either - cases, this indicates that the 'fd' on this structure is no longer - valid */ - gpr_mu orphaned_mu; - bool orphaned; + gpr_mu orphan_mu; + + gpr_mu pollable_mu; + pollable *pollable_obj; gpr_atm read_closure; gpr_atm write_closure; @@ -176,47 +173,52 @@ static void fd_global_shutdown(void); * Pollset Declarations */ -typedef struct pollset_worker_link { +typedef struct { grpc_pollset_worker *next; grpc_pollset_worker *prev; -} pollset_worker_link; +} pwlink; -typedef enum { - PWL_POLLSET, - PWL_POLLABLE, - POLLSET_WORKER_LINK_COUNT -} pollset_worker_links; +typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks; struct grpc_pollset_worker { bool kicked; bool initialized_cv; - pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; +#ifndef NDEBUG + // debug aid: which thread started this worker + pid_t originator; +#endif gpr_cv cv; grpc_pollset *pollset; pollable *pollable_obj; -}; -#define MAX_EPOLL_EVENTS 100 -#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 + pwlink links[PWLINK_COUNT]; +}; struct grpc_pollset { - pollable pollable_obj; - pollable *current_pollable_obj; - int kick_alls_pending; + gpr_mu mu; + pollable *active_pollable; bool kicked_without_poller; grpc_closure *shutdown_closure; grpc_pollset_worker *root_worker; - - int event_cursor; - int event_count; - struct epoll_event events[MAX_EPOLL_EVENTS]; + int containing_pollset_set_count; }; /******************************************************************************* * Pollset-set Declarations */ + struct grpc_pollset_set { - polling_obj po; + gpr_refcount refs; + gpr_mu mu; + grpc_pollset_set *parent; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; }; /******************************************************************************* @@ -250,11 +252,6 @@ static bool append_error(grpc_error **composite, grpc_error *error, * becomes a spurious read notification on a reused fd. */ -/* The alarm system needs to be able to wakeup 'some poller' sometimes - * (specifically when a new alarm needs to be triggered earlier than the next - * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a - * case occurs. */ - static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; @@ -282,8 +279,9 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable_obj); - gpr_mu_destroy(&fd->orphaned_mu); + POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); + gpr_mu_destroy(&fd->pollable_mu); + gpr_mu_destroy(&fd->orphan_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -343,12 +341,11 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } - pollable_init(&new_fd->pollable_obj, PO_FD); - + gpr_mu_init(&new_fd->pollable_mu); + gpr_mu_init(&new_fd->orphan_mu); + new_fd->pollable_obj = NULL; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; - gpr_mu_init(&new_fd->orphaned_mu); - new_fd->orphaned = false; grpc_lfev_init(&new_fd->read_closure); grpc_lfev_init(&new_fd->write_closure); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); @@ -369,24 +366,17 @@ static grpc_fd *fd_create(int fd, const char *name) { } static int fd_wrapped_fd(grpc_fd *fd) { - int ret_fd = -1; - gpr_mu_lock(&fd->orphaned_mu); - if (!fd->orphaned) { - ret_fd = fd->fd; - } - gpr_mu_unlock(&fd->orphaned_mu); - - return ret_fd; + int ret_fd = fd->fd; + return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1; } static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, bool already_closed, const char *reason) { bool is_fd_closed = already_closed; - grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->pollable_obj.po.mu); - gpr_mu_lock(&fd->orphaned_mu); + gpr_mu_lock(&fd->orphan_mu); + fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file @@ -398,8 +388,6 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, is_fd_closed = true; } - fd->orphaned = true; - if (!is_fd_closed) { gpr_log(GPR_DEBUG, "TODO: handle fd removal?"); } @@ -408,13 +396,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, to be alive (and not added to freelist) until the end of this function */ REF_BY(fd, 1, reason); - GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE); + + gpr_mu_unlock(&fd->orphan_mu); - gpr_mu_unlock(&fd->orphaned_mu); - gpr_mu_unlock(&fd->pollable_obj.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ - GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); - GRPC_ERROR_UNREF(error); } static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, @@ -451,63 +437,87 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static void pollable_init(pollable *p, polling_obj_type type) { - po_init(&p->po, type); - p->root_worker = NULL; - p->epfd = -1; +static grpc_error *pollable_create(pollable_type type, pollable **p) { + *p = NULL; + + int epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd == -1) { + return GRPC_OS_ERROR(errno, "epoll_create1"); + } + *p = (pollable *)gpr_malloc(sizeof(**p)); + grpc_error *err = grpc_wakeup_fd_init(&(*p)->wakeup); + if (err != GRPC_ERROR_NONE) { + close(epfd); + gpr_free(*p); + *p = NULL; + return err; + } + struct epoll_event ev; + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = (void *)(1 | (intptr_t) & (*p)->wakeup); + if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) { + err = GRPC_OS_ERROR(errno, "epoll_ctl"); + close(epfd); + grpc_wakeup_fd_destroy(&(*p)->wakeup); + gpr_free(*p); + *p = NULL; + return err; + } + + (*p)->type = type; + gpr_ref_init(&(*p)->refs, 1); + gpr_mu_init(&(*p)->mu); + (*p)->epfd = epfd; + (*p)->owner_fd = NULL; + (*p)->pollset_set = NULL; + (*p)->next = (*p)->prev = *p; + (*p)->root_worker = NULL; + (*p)->event_cursor = 0; + (*p)->event_count = 0; + return GRPC_ERROR_NONE; } -static void pollable_destroy(pollable *p) { - po_destroy(&p->po); - if (p->epfd != -1) { - close(p->epfd); - grpc_wakeup_fd_destroy(&p->wakeup); +#ifdef NDEBUG +static pollable *pollable_ref(pollable *p) { +#else +static pollable *pollable_ref(pollable *p, int line, const char *reason) { + if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { + int r = (int)gpr_atm_no_barrier_load(&p->refs.count); + gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, + "POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason); } +#endif + gpr_ref(&p->refs); + return p; } -/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable *p) { - if (p->epfd == -1) { - int new_epfd = epoll_create1(EPOLL_CLOEXEC); - if (new_epfd < 0) { - return GRPC_OS_ERROR(errno, "epoll_create1"); - } - grpc_error *err = grpc_wakeup_fd_init(&p->wakeup); - if (err != GRPC_ERROR_NONE) { - close(new_epfd); - return err; - } - struct epoll_event ev; - ev.events = (uint32_t)(EPOLLIN | EPOLLET); - ev.data.ptr = (void *)(1 | (intptr_t)&p->wakeup); - if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) { - err = GRPC_OS_ERROR(errno, "epoll_ctl"); - close(new_epfd); - grpc_wakeup_fd_destroy(&p->wakeup); - return err; - } - - p->epfd = new_epfd; +#ifdef NDEBUG +static void pollable_unref(pollable *p) { +#else +static void pollable_unref(pollable *p, int line, const char *reason) { + if (p == NULL) return; + if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { + int r = (int)gpr_atm_no_barrier_load(&p->refs.count); + gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, + "POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason); + } +#endif + if (p != NULL && gpr_unref(&p->refs)) { + close(p->epfd); + grpc_wakeup_fd_destroy(&p->wakeup); + gpr_free(p); } - return GRPC_ERROR_NONE; } -/* pollable must be materialized */ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; - GPR_ASSERT(epfd != -1); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } - gpr_mu_lock(&fd->orphaned_mu); - if (fd->orphaned) { - gpr_mu_unlock(&fd->orphaned_mu); - return GRPC_ERROR_NONE; - } struct epoll_event ev_fd; ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); ev_fd.data.ptr = fd; @@ -519,7 +529,6 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); } } - gpr_mu_unlock(&fd->orphaned_mu); return error; } @@ -535,128 +544,67 @@ GPR_TLS_DECL(g_current_thread_worker); static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); - pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE); - return GRPC_ERROR_NONE; + return pollable_create(PO_EMPTY, &g_empty_pollable); } static void pollset_global_shutdown(void) { - pollable_destroy(&g_empty_pollable); + POLLABLE_UNREF(g_empty_pollable, "g_empty_pollable"); gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); } +/* pollset->mu must be held while calling this function */ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) " + "rw=%p (target:NULL) cpsc=%d (target:0)", + pollset, pollset->active_pollable, pollset->shutdown_closure, + pollset->root_worker, pollset->containing_pollset_set_count); + } if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && - pollset->kick_alls_pending == 0) { + pollset->containing_pollset_set_count == 0) { GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } } -static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_unused) { - grpc_error *error = GRPC_ERROR_NONE; - grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable_obj.po.mu); - if (pollset->root_worker != NULL) { - grpc_pollset_worker *worker = pollset->root_worker; - do { - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&worker->pollable_obj->po.mu); - } - if (worker->initialized_cv && worker != pollset->root_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable_obj, - worker->pollable_obj); - } - worker->kicked = true; - gpr_cv_signal(&worker->cv); - } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable_obj, - worker->pollable_obj); - } - append_error(&error, - grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), - "pollset_shutdown"); - } - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker->pollable_obj->po.mu); - } - - worker = worker->links[PWL_POLLSET].next; - } while (worker != pollset->root_worker); - } - pollset->kick_alls_pending--; - pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - GRPC_LOG_IF_ERROR("kick_all", error); -} - -static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollset->kick_alls_pending++; - GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); -} - -static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, - grpc_pollset_worker *specific_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p kick %p tls_pollset=%p tls_worker=%p " - "root_worker=(pollset:%p pollable:%p)", - p, specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset), - (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker, - p->root_worker); - } - if (specific_worker == NULL) { - if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { - if (pollset->root_worker == NULL) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p); - } - pollset->kicked_without_poller = true; - return GRPC_ERROR_NONE; - } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p); - } - grpc_error *err = pollable_materialize(p); - if (err != GRPC_ERROR_NONE) return err; - return grpc_wakeup_fd_wakeup(&p->wakeup); - } - } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p); - } - return GRPC_ERROR_NONE; - } - } else if (specific_worker->kicked) { +/* pollset->mu must be held before calling this function, + * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be + * held */ +static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx, + grpc_pollset_worker *specific_worker) { + pollable *p = specific_worker->pollable_obj; + grpc_core::mu_guard lock(&p->mu); + GRPC_STATS_INC_POLLSET_KICK(exec_ctx); + GPR_ASSERT(specific_worker != NULL); + if (specific_worker->kicked) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); } + GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); return GRPC_ERROR_NONE; - } else if (gpr_tls_get(&g_current_thread_worker) == - (intptr_t)specific_worker) { + } + if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); } + GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); specific_worker->kicked = true; return GRPC_ERROR_NONE; - } else if (specific_worker == p->root_worker) { + } + if (specific_worker == p->root_worker) { + GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); } - grpc_error *err = pollable_materialize(p); - if (err != GRPC_ERROR_NONE) return err; specific_worker->kicked = true; - return grpc_wakeup_fd_wakeup(&p->wakeup); - } else { + grpc_error *error = grpc_wakeup_fd_wakeup(&p->wakeup); + return error; + } + if (specific_worker->initialized_cv) { + GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); } @@ -664,30 +612,79 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } + // we can get here during end_worker after removing specific_worker from the + // pollable list but before removing it from the pollset list + return GRPC_ERROR_NONE; } -/* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - pollable *p = pollset->current_pollable_obj; - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (p != &pollset->pollable_obj) { - gpr_mu_lock(&p->po.mu); + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p", + pollset, specific_worker, + (void *)gpr_tls_get(&g_current_thread_pollset), + (void *)gpr_tls_get(&g_current_thread_worker), + pollset->root_worker); } - grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); - if (p != &pollset->pollable_obj) { - gpr_mu_unlock(&p->po.mu); + if (specific_worker == NULL) { + if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { + if (pollset->root_worker == NULL) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset); + } + GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx); + pollset->kicked_without_poller = true; + return GRPC_ERROR_NONE; + } else { + // We've been asked to kick a poller, but we haven't been told which one + // ... any will do + // We look at the pollset worker list because: + // 1. the pollable list may include workers from other pollers, so we'd + // need to do an O(N) search + // 2. we'd additionally need to take the pollable lock, which we've so + // far avoided + // Now, we would prefer to wake a poller in cv_wait, and not in + // epoll_wait (since the latter would imply the need to do an additional + // wakeup) + // We know that if a worker is at the root of a pollable, it's (likely) + // also the root of a pollset, and we know that if a worker is NOT at + // the root of a pollset, it's (likely) not at the root of a pollable, + // so we take our chances and choose the SECOND worker enqueued against + // the pollset as a worker that's likely to be in cv_wait + return kick_one_worker( + exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next); + } + } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset); + } + GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); + return GRPC_ERROR_NONE; + } + } else { + return kick_one_worker(exec_ctx, specific_worker); + } +} + +static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + grpc_error *error = GRPC_ERROR_NONE; + const char *err_desc = "pollset_kick_all"; + grpc_pollset_worker *w = pollset->root_worker; + if (w != NULL) { + do { + append_error(&error, kick_one_worker(exec_ctx, w), err_desc); + w = w->links[PWLINK_POLLSET].next; + } while (w != pollset->root_worker); } return error; } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pollable_init(&pollset->pollable_obj, PO_POLLSET); - pollset->current_pollable_obj = &g_empty_pollable; - pollset->kicked_without_poller = false; - pollset->shutdown_closure = NULL; - pollset->root_worker = NULL; - *mu = &pollset->pollable_obj.po.mu; + gpr_mu_init(&pollset->mu); + pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); + *mu = &pollset->mu; } static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, @@ -719,12 +716,29 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } -static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { +static grpc_error *fd_get_or_become_pollable(grpc_fd *fd, pollable **p) { + gpr_mu_lock(&fd->pollable_mu); grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "fd_become_pollable"; - if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) { - append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc); + static const char *err_desc = "fd_get_or_become_pollable"; + if (fd->pollable_obj == NULL) { + if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj), + err_desc)) { + fd->pollable_obj->owner_fd = fd; + if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd), + err_desc)) { + POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); + fd->pollable_obj = NULL; + } + } } + if (error == GRPC_ERROR_NONE) { + GPR_ASSERT(fd->pollable_obj != NULL); + *p = POLLABLE_REF(fd->pollable_obj, "pollset"); + } else { + GPR_ASSERT(fd->pollable_obj == NULL); + *p = NULL; + } + gpr_mu_unlock(&fd->pollable_mu); return error; } @@ -733,23 +747,20 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; - pollset_kick_all(exec_ctx, pollset); + GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset)); pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { - return p != &g_empty_pollable && p != &pollset->pollable_obj; -} - -static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, bool drain) { +static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + pollable *pollable_obj, bool drain) { static const char *err_desc = "pollset_process_events"; grpc_error *error = GRPC_ERROR_NONE; for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && - pollset->event_cursor != pollset->event_count; + pollable_obj->event_cursor != pollable_obj->event_count; i++) { - int n = pollset->event_cursor++; - struct epoll_event *ev = &pollset->events[n]; + int n = pollable_obj->event_cursor++; + struct epoll_event *ev = &pollable_obj->events[n]; void *data_ptr = ev->data.ptr; if (1 & (intptr_t)data_ptr) { if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -784,22 +795,17 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable_destroy(&pollset->pollable_obj); - if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, - "pollset_pollable"); - } - GRPC_LOG_IF_ERROR("pollset_process_events", - pollset_process_events(exec_ctx, pollset, true)); + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = NULL; } -static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable *p, grpc_millis deadline) { +static grpc_error *pollable_epoll(grpc_exec_ctx *exec_ctx, pollable *p, + grpc_millis deadline) { int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { char *desc = pollable_desc(p); - gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout); + gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout); gpr_free(desc); } @@ -809,7 +815,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int r; do { GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); - r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); + r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); @@ -818,24 +824,24 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); + gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r); } - pollset->event_cursor = 0; - pollset->event_count = r; + p->event_cursor = 0; + p->event_count = r; return GRPC_ERROR_NONE; } /* Return true if first in list */ -static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link, - grpc_pollset_worker *worker) { - if (*root == NULL) { - *root = worker; +static bool worker_insert(grpc_pollset_worker **root_worker, + grpc_pollset_worker *worker, pwlinks link) { + if (*root_worker == NULL) { + *root_worker = worker; worker->links[link].next = worker->links[link].prev = worker; return true; } else { - worker->links[link].next = *root; + worker->links[link].next = *root_worker; worker->links[link].prev = worker->links[link].next->links[link].prev; worker->links[link].next->links[link].prev = worker; worker->links[link].prev->links[link].next = worker; @@ -843,26 +849,26 @@ static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link, } } -/* Return true if last in list */ -typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result; +/* returns the new root IFF the root changed */ +typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result; -static worker_remove_result worker_remove(grpc_pollset_worker **root, - pollset_worker_links link, - grpc_pollset_worker *worker) { - if (worker == *root) { +static worker_remove_result worker_remove(grpc_pollset_worker **root_worker, + grpc_pollset_worker *worker, + pwlinks link) { + if (worker == *root_worker) { if (worker == worker->links[link].next) { - *root = NULL; - return EMPTIED; + *root_worker = NULL; + return WRR_EMPTIED; } else { - *root = worker->links[link].next; + *root_worker = worker->links[link].next; worker->links[link].prev->links[link].next = worker->links[link].next; worker->links[link].next->links[link].prev = worker->links[link].prev; - return NEW_ROOT; + return WRR_NEW_ROOT; } } else { worker->links[link].prev->links[link].next = worker->links[link].next; worker->links[link].next->links[link].prev = worker->links[link].prev; - return REMOVED; + return WRR_REMOVED; } } @@ -871,25 +877,20 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, grpc_millis deadline) { - bool do_poll = true; + bool do_poll = (pollset->shutdown_closure == nullptr); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; worker->pollset = pollset; - worker->pollable_obj = pollset->current_pollable_obj; - - if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { - REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); - } - - worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE, - worker)) { + worker->pollable_obj = + POLLABLE_REF(pollset->active_pollable, "pollset_worker"); + worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET); + gpr_mu_lock(&worker->pollable_obj->mu); + if (!worker_insert(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - } + gpr_mu_unlock(&pollset->mu); if (GRPC_TRACER_ON(grpc_polling_trace) && worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, @@ -897,7 +898,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, poll_deadline_to_millis_timeout(exec_ctx, deadline)); } while (do_poll && worker->pollable_obj->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, @@ -916,158 +917,232 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker->pollable_obj, worker); } } - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker->pollable_obj->po.mu); - gpr_mu_lock(&pollset->pollable_obj.po.mu); - gpr_mu_lock(&worker->pollable_obj->po.mu); - } grpc_exec_ctx_invalidate_now(exec_ctx); + } else { + gpr_mu_unlock(&pollset->mu); } + gpr_mu_unlock(&worker->pollable_obj->mu); - return do_poll && pollset->shutdown_closure == NULL && - pollset->current_pollable_obj == worker->pollable_obj; + return do_poll; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { - if (NEW_ROOT == - worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&worker->pollable_obj->root_worker->cv); + gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&worker->pollable_obj->mu); + switch (worker_remove(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE)) { + case WRR_NEW_ROOT: { + // wakeup new poller + grpc_pollset_worker *new_root = worker->pollable_obj->root_worker; + GPR_ASSERT(new_root->initialized_cv); + gpr_cv_signal(&new_root->cv); + break; + } + case WRR_EMPTIED: + if (pollset->active_pollable != worker->pollable_obj) { + // pollable no longer being polled: flush events + pollable_process_events(exec_ctx, pollset, worker->pollable_obj, true); + } + break; + case WRR_REMOVED: + break; + } + gpr_mu_unlock(&worker->pollable_obj->mu); + POLLABLE_UNREF(worker->pollable_obj, "pollset_worker"); + if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) == + WRR_EMPTIED) { + pollset_maybe_finish_shutdown(exec_ctx, pollset); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { - UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll"); - } - if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { - pollset_maybe_finish_shutdown(exec_ctx, pollset); - } } -/* pollset->po.mu lock must be held by the caller before calling this. +#ifndef NDEBUG +static long gettid(void) { return syscall(__NR_gettid); } +#endif + +/* pollset->mu lock must be held by the caller before calling this. The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, grpc_millis deadline) { +#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP + grpc_pollset_worker *worker = + (grpc_pollset_worker *)gpr_malloc(sizeof(*worker)); +#define WORKER_PTR (worker) +#else grpc_pollset_worker worker; - if (0 && GRPC_TRACER_ON(grpc_polling_trace)) { +#define WORKER_PTR (&worker) +#endif +#ifndef NDEBUG + WORKER_PTR->originator = gettid(); +#endif + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR - " deadline=%" PRIdPTR " kwp=%d root_worker=%p", - pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline, - pollset->kicked_without_poller, pollset->root_worker); + " deadline=%" PRIdPTR " kwp=%d pollable=%p", + pollset, worker_hdl, WORKER_PTR, grpc_exec_ctx_now(exec_ctx), + deadline, pollset->kicked_without_poller, pollset->active_pollable); } - grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; + grpc_error *error = GRPC_ERROR_NONE; if (pollset->kicked_without_poller) { pollset->kicked_without_poller = false; - return GRPC_ERROR_NONE; - } - if (pollset->current_pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&pollset->current_pollable_obj->po.mu); - } - if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) { - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); - gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - GPR_ASSERT(!pollset->shutdown_closure); - append_error(&error, pollable_materialize(worker.pollable_obj), err_desc); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker.pollable_obj->po.mu); - } - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - if (pollset->event_cursor == pollset->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, - deadline), + } else { + if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); + gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); + if (WORKER_PTR->pollable_obj->event_cursor == + WORKER_PTR->pollable_obj->event_count) { + append_error(&error, pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj, + deadline), + err_desc); + } + append_error(&error, + pollable_process_events(exec_ctx, pollset, + WORKER_PTR->pollable_obj, false), err_desc); + grpc_exec_ctx_flush(exec_ctx); + gpr_tls_set(&g_current_thread_pollset, 0); + gpr_tls_set(&g_current_thread_worker, 0); } - append_error(&error, pollset_process_events(exec_ctx, pollset, false), - err_desc); - gpr_mu_lock(&pollset->pollable_obj.po.mu); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&worker.pollable_obj->po.mu); - } - gpr_tls_set(&g_current_thread_pollset, 0); - gpr_tls_set(&g_current_thread_worker, 0); - pollset_maybe_finish_shutdown(exec_ctx, pollset); + end_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl); } - end_worker(exec_ctx, pollset, &worker, worker_hdl); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker.pollable_obj->po.mu); - } - if (grpc_exec_ctx_has_work(exec_ctx)) { - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->pollable_obj.po.mu); +#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP + gpr_free(worker); +#endif +#undef WORKER_PTR + return error; +} + +static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { + static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd"; + grpc_error *error = GRPC_ERROR_NONE; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p add fd %p (%d); transition pollable from empty to fd", + pollset, fd, fd->fd); } + append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable), + err_desc); return error; } -static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_fd *fd = (grpc_fd *)arg; - UNREF_BY(exec_ctx, fd, 2, "pollset_pollable"); +static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) { + static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi"; + grpc_error *error = GRPC_ERROR_NONE; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log( + GPR_DEBUG, + "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller", + pollset, and_add_fd, and_add_fd ? and_add_fd->fd : -1, + pollset->active_pollable->owner_fd); + } + append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); + grpc_fd *initial_fd = pollset->active_pollable->owner_fd; + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = NULL; + if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable), + err_desc)) { + append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd), + err_desc); + if (and_add_fd != NULL) { + append_error(&error, + pollable_add_fd(pollset->active_pollable, and_add_fd), + err_desc); + } + } + return error; } /* expects pollsets locked, flag whether fd is locked or not */ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd *fd, - bool fd_locked) { - static const char *err_desc = "pollset_add_fd"; + grpc_pollset *pollset, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; - if (pollset->current_pollable_obj == &g_empty_pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p add fd %p; transition pollable from empty to fd", pollset, - fd); - } - /* empty pollable --> single fd pollable */ - pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable_obj = &fd->pollable_obj; - if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); - append_error(&error, fd_become_pollable_locked(fd), err_desc); - if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); - REF_BY(fd, 2, "pollset_pollable"); - } else if (pollset->current_pollable_obj == &pollset->pollable_obj) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); - } - append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), - err_desc); - } else if (pollset->current_pollable_obj != &fd->pollable_obj) { - grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p add fd %p; transition pollable from fd %p to multipoller", - pollset, fd, had_fd); - } - /* Introduce a spurious completion. - If we do not, then it may be that the fd-specific epoll set consumed - a completion without being polled, leading to a missed edge going up. */ - grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); - grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); - pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable_obj = &pollset->pollable_obj; - if (append_error(&error, pollable_materialize(&pollset->pollable_obj), - err_desc)) { - pollable_add_fd(&pollset->pollable_obj, had_fd); - pollable_add_fd(&pollset->pollable_obj, fd); - } - GRPC_CLOSURE_SCHED(exec_ctx, - GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + pollable *po_at_start = + POLLABLE_REF(pollset->active_pollable, "pollset_add_fd"); + switch (pollset->active_pollable->type) { + case PO_EMPTY: + /* empty pollable --> single fd pollable */ + error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx, + pollset, fd); + break; + case PO_FD: + gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); + if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & + 1) == 0) { + error = pollset_transition_pollable_from_empty_to_fd_locked( + exec_ctx, pollset, fd); + } else { + /* fd --> multipoller */ + error = pollset_transition_pollable_from_fd_to_multi_locked( + exec_ctx, pollset, fd); + } + gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); + break; + case PO_MULTI: + error = pollable_add_fd(pollset->active_pollable, fd); + break; + } + if (error != GRPC_ERROR_NONE) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = po_at_start; + } else { + POLLABLE_UNREF(po_at_start, "pollset_add_fd"); + } + return error; +} + +static grpc_error *pollset_as_multipollable_locked(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + pollable **pollable_obj) { + grpc_error *error = GRPC_ERROR_NONE; + pollable *po_at_start = + POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable"); + switch (pollset->active_pollable->type) { + case PO_EMPTY: + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + error = pollable_create(PO_MULTI, &pollset->active_pollable); + break; + case PO_FD: + gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); + if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & + 1) == 0) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + error = pollable_create(PO_MULTI, &pollset->active_pollable); + } else { + error = pollset_transition_pollable_from_fd_to_multi_locked( + exec_ctx, pollset, NULL); + } + gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); + break; + case PO_MULTI: + break; + } + if (error != GRPC_ERROR_NONE) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = po_at_start; + *pollable_obj = NULL; + } else { + *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set"); + POLLABLE_UNREF(po_at_start, "pollset_as_multipollable"); } return error; } static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->pollable_obj.po.mu); - grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); - gpr_mu_unlock(&pollset->pollable_obj.po.mu); + gpr_mu_lock(&pollset->mu); + grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd); + gpr_mu_unlock(&pollset->mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1075,301 +1150,255 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * Pollset-set Definitions */ +static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) { + gpr_mu_lock(&pss->mu); + while (pss->parent != NULL) { + gpr_mu_unlock(&pss->mu); + pss = pss->parent; + gpr_mu_lock(&pss->mu); + } + return pss; +} + static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss)); - po_init(&pss->po, PO_POLLSET_SET); + gpr_mu_init(&pss->mu); + gpr_ref_init(&pss->refs, 1); return pss; } -static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss) { - po_destroy(&pss->po); +static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { + if (pss == NULL) return; + if (!gpr_unref(&pss->refs)) return; + pollset_set_unref(exec_ctx, pss->parent); + gpr_mu_destroy(&pss->mu); + for (size_t i = 0; i < pss->pollset_count; i++) { + gpr_mu_lock(&pss->pollsets[i]->mu); + if (0 == --pss->pollsets[i]->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]); + } + gpr_mu_unlock(&pss->pollsets[i]->mu); + } + for (size_t i = 0; i < pss->fd_count; i++) { + UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set"); + } + gpr_free(pss->pollsets); + gpr_free(pss->fds); gpr_free(pss); } static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - po_join(exec_ctx, &pss->po, &fd->pollable_obj.po); + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd); + } + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_fd"; + pss = pss_lock_adam(pss); + for (size_t i = 0; i < pss->pollset_count; i++) { + append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd), + err_desc); + } + if (pss->fd_count == pss->fd_capacity) { + pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8); + pss->fds = + (grpc_fd **)gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds)); + } + REF_BY(fd, 2, "pollset_set"); + pss->fds[pss->fd_count++] = fd; + gpr_mu_unlock(&pss->mu); + + GRPC_LOG_IF_ERROR(err_desc, error); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - grpc_fd *fd) {} - -static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) { - po_join(exec_ctx, &pss->po, &ps->pollable_obj.po); + grpc_fd *fd) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd); + } + pss = pss_lock_adam(pss); + size_t i; + for (i = 0; i < pss->fd_count; i++) { + if (pss->fds[i] == fd) { + UNREF_BY(exec_ctx, fd, 2, "pollset_set"); + break; + } + } + GPR_ASSERT(i != pss->fd_count); + for (; i < pss->fd_count - 1; i++) { + pss->fds[i] = pss->fds[i + 1]; + } + pss->fd_count--; + gpr_mu_unlock(&pss->mu); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) {} - -static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) { - po_join(exec_ctx, &bag->po, &item->po); -} - -static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} - -static void po_init(polling_obj *po, polling_obj_type type) { - gpr_mu_init(&po->mu); - po->type = type; - po->group = NULL; - po->next = po; - po->prev = po; -} - -static polling_group *pg_lock_latest(polling_group *pg) { - /* assumes pg unlocked; consumes ref, returns ref */ - gpr_mu_lock(&pg->po.mu); - while (pg->po.group != NULL) { - polling_group *new_pg = pg_ref(pg->po.group); - gpr_mu_unlock(&pg->po.mu); - pg_unref(pg); - pg = new_pg; - gpr_mu_lock(&pg->po.mu); - } - return pg; -} - -static void po_destroy(polling_obj *po) { - if (po->group != NULL) { - polling_group *pg = pg_lock_latest(po->group); - po->prev->next = po->next; - po->next->prev = po->prev; - gpr_mu_unlock(&pg->po.mu); - pg_unref(pg); + grpc_pollset_set *pss, grpc_pollset *ps) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps); } - gpr_mu_destroy(&po->mu); -} - -static polling_group *pg_ref(polling_group *pg) { - gpr_ref(&pg->refs); - return pg; -} - -static void pg_unref(polling_group *pg) { - if (gpr_unref(&pg->refs)) { - po_destroy(&pg->po); - gpr_free(pg); + pss = pss_lock_adam(pss); + size_t i; + for (i = 0; i < pss->pollset_count; i++) { + if (pss->pollsets[i] == ps) { + break; + } } -} - -static int po_cmp(polling_obj *a, polling_obj *b) { - if (a == b) return 0; - if (a->type < b->type) return -1; - if (a->type > b->type) return 1; - if (a < b) return -1; - assert(a > b); - return 1; -} - -static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { - switch (po_cmp(a, b)) { - case 0: - return; - case 1: - GPR_SWAP(polling_obj *, a, b); - /* fall through */ - case -1: - gpr_mu_lock(&a->mu); - gpr_mu_lock(&b->mu); - - if (a->group == NULL) { - if (b->group == NULL) { - polling_obj *initial_po[] = {a, b}; - pg_create(exec_ctx, initial_po, GPR_ARRAY_SIZE(initial_po)); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - } else { - polling_group *b_group = pg_ref(b->group); - gpr_mu_unlock(&b->mu); - gpr_mu_unlock(&a->mu); - pg_join(exec_ctx, b_group, a); - } - } else if (b->group == NULL) { - polling_group *a_group = pg_ref(a->group); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - pg_join(exec_ctx, a_group, b); - } else if (a->group == b->group) { - /* nothing to do */ - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - } else { - polling_group *a_group = pg_ref(a->group); - polling_group *b_group = pg_ref(b->group); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - pg_merge(exec_ctx, a_group, b_group); - } + GPR_ASSERT(i != pss->pollset_count); + for (; i < pss->pollset_count - 1; i++) { + pss->pollsets[i] = pss->pollsets[i + 1]; } -} - -static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { - if (a->type == PO_FD && b->type == PO_POLLSET) { - pollset_add_fd_locked(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a, true); - } else if (a->type == PO_POLLSET && b->type == PO_FD) { - pollset_add_fd_locked(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b, true); + pss->pollset_count--; + gpr_mu_unlock(&pss->mu); + gpr_mu_lock(&ps->mu); + if (0 == --ps->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, ps); } + gpr_mu_unlock(&ps->mu); } -static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from, - polling_group *to) { - for (polling_obj *a = from->po.next; a != &from->po; a = a->next) { - for (polling_obj *b = to->po.next; b != &to->po; b = b->next) { - if (po_cmp(a, b) < 0) { - gpr_mu_lock(&a->mu); - gpr_mu_lock(&b->mu); - } else { - GPR_ASSERT(po_cmp(a, b) != 0); - gpr_mu_lock(&b->mu); - gpr_mu_lock(&a->mu); +// add all fds to pollables, and output a new array of unorphaned out_fds +// assumes pollsets are multipollable +static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, + size_t fd_count, grpc_pollset **pollsets, + size_t pollset_count, + const char *err_desc, grpc_fd **out_fds, + size_t *out_fd_count) { + grpc_error *error = GRPC_ERROR_NONE; + for (size_t i = 0; i < fd_count; i++) { + gpr_mu_lock(&fds[i]->orphan_mu); + if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) { + gpr_mu_unlock(&fds[i]->orphan_mu); + UNREF_BY(exec_ctx, fds[i], 2, "pollset_set"); + } else { + for (size_t j = 0; j < pollset_count; j++) { + append_error(&error, + pollable_add_fd(pollsets[j]->active_pollable, fds[i]), + err_desc); } - pg_notify(exec_ctx, a, b); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); + gpr_mu_unlock(&fds[i]->orphan_mu); + out_fds[(*out_fd_count)++] = fds[i]; } } + return error; } -static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, - size_t initial_po_count) { - /* assumes all polling objects in initial_po are locked */ - polling_group *pg = (polling_group *)gpr_malloc(sizeof(*pg)); - po_init(&pg->po, PO_POLLING_GROUP); - gpr_ref_init(&pg->refs, (int)initial_po_count); - for (size_t i = 0; i < initial_po_count; i++) { - GPR_ASSERT(initial_po[i]->group == NULL); - initial_po[i]->group = pg; - } - for (size_t i = 1; i < initial_po_count; i++) { - initial_po[i]->prev = initial_po[i - 1]; - } - for (size_t i = 0; i < initial_po_count - 1; i++) { - initial_po[i]->next = initial_po[i + 1]; - } - initial_po[0]->prev = &pg->po; - initial_po[initial_po_count - 1]->next = &pg->po; - pg->po.next = initial_po[0]; - pg->po.prev = initial_po[initial_po_count - 1]; - for (size_t i = 1; i < initial_po_count; i++) { - for (size_t j = 0; j < i; j++) { - pg_notify(exec_ctx, initial_po[i], initial_po[j]); - } +static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pss, grpc_pollset *ps) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps); } -} - -static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, - polling_obj *po) { - /* assumes neither pg nor po are locked; consumes one ref to pg */ - pg = pg_lock_latest(pg); - /* pg locked */ - for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */; - existing != &pg->po; existing = existing->next) { - if (po_cmp(po, existing) < 0) { - gpr_mu_lock(&po->mu); - gpr_mu_lock(&existing->mu); - } else { - GPR_ASSERT(po_cmp(po, existing) != 0); - gpr_mu_lock(&existing->mu); - gpr_mu_lock(&po->mu); - } - /* pg, po, existing locked */ - if (po->group != NULL) { - gpr_mu_unlock(&pg->po.mu); - polling_group *po_group = pg_ref(po->group); - gpr_mu_unlock(&po->mu); - gpr_mu_unlock(&existing->mu); - pg_merge(exec_ctx, pg, po_group); - /* early exit: polling obj picked up a group during joining: we needed - to do a full merge */ - return; - } - pg_notify(exec_ctx, po, existing); - gpr_mu_unlock(&po->mu); - gpr_mu_unlock(&existing->mu); - } - gpr_mu_lock(&po->mu); - if (po->group != NULL) { - gpr_mu_unlock(&pg->po.mu); - polling_group *po_group = pg_ref(po->group); - gpr_mu_unlock(&po->mu); - pg_merge(exec_ctx, pg, po_group); - /* early exit: polling obj picked up a group during joining: we needed - to do a full merge */ + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_pollset"; + pollable *pollable_obj = NULL; + gpr_mu_lock(&ps->mu); + if (!GRPC_LOG_IF_ERROR(err_desc, pollset_as_multipollable_locked( + exec_ctx, ps, &pollable_obj))) { + GPR_ASSERT(pollable_obj == NULL); + gpr_mu_unlock(&ps->mu); return; } - po->group = pg; - po->next = &pg->po; - po->prev = pg->po.prev; - po->prev->next = po->next->prev = po; - gpr_mu_unlock(&pg->po.mu); - gpr_mu_unlock(&po->mu); + ps->containing_pollset_set_count++; + gpr_mu_unlock(&ps->mu); + pss = pss_lock_adam(pss); + size_t initial_fd_count = pss->fd_count; + pss->fd_count = 0; + append_error(&error, + add_fds_to_pollsets(exec_ctx, pss->fds, initial_fd_count, &ps, 1, + err_desc, pss->fds, &pss->fd_count), + err_desc); + if (pss->pollset_count == pss->pollset_capacity) { + pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8); + pss->pollsets = (grpc_pollset **)gpr_realloc( + pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets)); + } + pss->pollsets[pss->pollset_count++] = ps; + gpr_mu_unlock(&pss->mu); + POLLABLE_UNREF(pollable_obj, "pollset_set"); + + GRPC_LOG_IF_ERROR(err_desc, error); } -static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, - polling_group *b) { +static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *a, + grpc_pollset_set *b) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b); + } + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_fd"; for (;;) { if (a == b) { - pg_unref(a); - pg_unref(b); + // pollset ancestors are the same: nothing to do return; } - if (a > b) GPR_SWAP(polling_group *, a, b); - gpr_mu_lock(&a->po.mu); - gpr_mu_lock(&b->po.mu); - if (a->po.group != NULL) { - polling_group *m2 = pg_ref(a->po.group); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pg_unref(a); - a = m2; - } else if (b->po.group != NULL) { - polling_group *m2 = pg_ref(b->po.group); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pg_unref(b); - b = m2; + if (a > b) { + GPR_SWAP(grpc_pollset_set *, a, b); + } + gpr_mu *a_mu = &a->mu; + gpr_mu *b_mu = &b->mu; + gpr_mu_lock(a_mu); + gpr_mu_lock(b_mu); + if (a->parent != NULL) { + a = a->parent; + } else if (b->parent != NULL) { + b = b->parent; } else { - break; + break; // exit loop, both pollsets locked } + gpr_mu_unlock(a_mu); + gpr_mu_unlock(b_mu); } - polling_group **unref = NULL; - size_t unref_count = 0; - size_t unref_cap = 0; - b->po.group = a; - pg_broadcast(exec_ctx, a, b); - pg_broadcast(exec_ctx, b, a); - while (b->po.next != &b->po) { - polling_obj *po = b->po.next; - gpr_mu_lock(&po->mu); - if (unref_count == unref_cap) { - unref_cap = GPR_MAX(8, 3 * unref_cap / 2); - unref = (polling_group **)gpr_realloc(unref, unref_cap * sizeof(*unref)); - } - unref[unref_count++] = po->group; - po->group = pg_ref(a); - // unlink from b - po->prev->next = po->next; - po->next->prev = po->prev; - // link to a - po->next = &a->po; - po->prev = a->po.prev; - po->next->prev = po->prev->next = po; - gpr_mu_unlock(&po->mu); - } - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - for (size_t i = 0; i < unref_count; i++) { - pg_unref(unref[i]); - } - gpr_free(unref); - pg_unref(b); + // try to do the least copying possible + // TODO(ctiller): there's probably a better heuristic here + const size_t a_size = a->fd_count + a->pollset_count; + const size_t b_size = b->fd_count + b->pollset_count; + if (b_size > a_size) { + GPR_SWAP(grpc_pollset_set *, a, b); + } + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a); + } + gpr_ref(&a->refs); + b->parent = a; + if (a->fd_capacity < a->fd_count + b->fd_count) { + a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count); + a->fds = (grpc_fd **)gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds)); + } + size_t initial_a_fd_count = a->fd_count; + a->fd_count = 0; + append_error(&error, add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count, + b->pollsets, b->pollset_count, + "merge_a2b", a->fds, &a->fd_count), + err_desc); + append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, + a->pollsets, a->pollset_count, + "merge_b2a", a->fds, &a->fd_count), + err_desc); + if (a->pollset_capacity < a->pollset_count + b->pollset_count) { + a->pollset_capacity = + GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count); + a->pollsets = (grpc_pollset **)gpr_realloc( + a->pollsets, a->pollset_capacity * sizeof(*a->pollsets)); + } + if (b->pollset_count > 0) { + memcpy(a->pollsets + a->pollset_count, b->pollsets, + b->pollset_count * sizeof(*b->pollsets)); + } + a->pollset_count += b->pollset_count; + gpr_free(b->fds); + gpr_free(b->pollsets); + b->fds = NULL; + b->pollsets = NULL; + b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0; + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); } +static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + /******************************************************************************* * Event engine binding */ @@ -1399,7 +1428,7 @@ static const grpc_event_engine_vtable vtable = { pollset_add_fd, pollset_set_create, - pollset_set_destroy, + pollset_set_unref, // destroy ==> unref 1 public ref pollset_set_add_pollset, pollset_set_del_pollset, pollset_set_add_pollset_set, @@ -1412,6 +1441,10 @@ static const grpc_event_engine_vtable vtable = { const grpc_event_engine_vtable *grpc_init_epollex_linux( bool explicitly_requested) { + if (!explicitly_requested) { + return NULL; + } + if (!grpc_has_wakeup_fd()) { return NULL; } @@ -1420,6 +1453,10 @@ const grpc_event_engine_vtable *grpc_init_epollex_linux( return NULL; } +#ifndef NDEBUG + grpc_register_tracer(&grpc_trace_pollable_refcount); +#endif + fd_global_init(); if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 3a1dd8d30b..677ee675a6 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -92,12 +92,9 @@ const grpc_event_engine_vtable *init_non_polling(bool explicit_request) { } // namespace static const event_engine_factory g_factories[] = { - {"epoll1", grpc_init_epoll1_linux}, - {"epollsig", grpc_init_epollsig_linux}, - {"poll", grpc_init_poll_posix}, - {"poll-cv", grpc_init_poll_cv_posix}, - {"epollex", grpc_init_epollex_linux}, - {"none", init_non_polling}, + {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux}, + {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix}, + {"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 7fcaef7679..b7c1803ded 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -186,7 +186,7 @@ static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } if (old_count == 0) { GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx); - p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); + p = (backup_poller *)gpr_zalloc(sizeof(*p) + grpc_pollset_size()); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); } @@ -704,6 +704,13 @@ static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd); } +static void tcp_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_set_del_fd(exec_ctx, pollset_set, tcp->em_fd); +} + static char *tcp_get_peer(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return gpr_strdup(tcp->peer_string); @@ -719,10 +726,16 @@ static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { return tcp->resource_user; } -static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer, - tcp_get_fd}; +static const grpc_endpoint_vtable vtable = {tcp_read, + tcp_write, + tcp_add_to_pollset, + tcp_add_to_pollset_set, + tcp_delete_from_pollset_set, + tcp_shutdown, + tcp_destroy, + tcp_get_resource_user, + tcp_get_peer, + tcp_get_fd}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index e311964dbc..99b9f1ea2d 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -304,6 +304,15 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, (void)pollset; } +static void uv_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) { + // No-op. We're ignoring pollsets currently + (void)exec_ctx; + (void)ep; + (void)pollset; +} + static void shutdown_callback(uv_shutdown_t *req, int status) {} static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -340,10 +349,16 @@ static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { static int uv_get_fd(grpc_endpoint *ep) { return -1; } -static grpc_endpoint_vtable vtable = { - uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset, - uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy, - uv_get_resource_user, uv_get_peer, uv_get_fd}; +static grpc_endpoint_vtable vtable = {uv_endpoint_read, + uv_endpoint_write, + uv_add_to_pollset, + uv_add_to_pollset_set, + uv_delete_from_pollset_set, + uv_endpoint_shutdown, + uv_destroy, + uv_get_resource_user, + uv_get_peer, + uv_get_fd}; grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index dc84e564a9..5aba5078a8 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -371,6 +371,10 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_iocp_add_socket(tcp->socket); } +static void win_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pss) {} + /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks for the potential read and write operations. It is up to the caller to guarantee this isn't called in parallel to a read or write request, so @@ -412,10 +416,16 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { static int win_get_fd(grpc_endpoint *ep) { return -1; } -static grpc_endpoint_vtable vtable = { - win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_resource_user, win_get_peer, - win_get_fd}; +static grpc_endpoint_vtable vtable = {win_read, + win_write, + win_add_to_pollset, + win_add_to_pollset_set, + win_delete_from_pollset_set, + win_shutdown, + win_destroy, + win_get_resource_user, + win_get_peer, + win_get_fd}; grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_channel_args *channel_args, diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc index 290336adc0..8e47aebedb 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc @@ -31,18 +31,21 @@ // SSL Channel Credentials. // -static void ssl_config_pem_key_cert_pair_destroy( - tsi_ssl_pem_key_cert_pair *kp) { +void grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_ssl_pem_key_cert_pair *kp, + size_t num_key_cert_pairs) { if (kp == NULL) return; - gpr_free((void *)kp->private_key); - gpr_free((void *)kp->cert_chain); + for (size_t i = 0; i < num_key_cert_pairs; i++) { + gpr_free((void *)kp[i].private_key); + gpr_free((void *)kp[i].cert_chain); + } + gpr_free(kp); } static void ssl_destruct(grpc_exec_ctx *exec_ctx, grpc_channel_credentials *creds) { grpc_ssl_credentials *c = (grpc_ssl_credentials *)creds; gpr_free(c->config.pem_root_certs); - ssl_config_pem_key_cert_pair_destroy(&c->config.pem_key_cert_pair); + grpc_tsi_ssl_pem_key_cert_pairs_destroy(c->config.pem_key_cert_pair, 1); } static grpc_security_status ssl_create_security_connector( @@ -85,9 +88,11 @@ static void ssl_build_config(const char *pem_root_certs, if (pem_key_cert_pair != NULL) { GPR_ASSERT(pem_key_cert_pair->private_key != NULL); GPR_ASSERT(pem_key_cert_pair->cert_chain != NULL); - config->pem_key_cert_pair.cert_chain = + config->pem_key_cert_pair = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( + sizeof(tsi_ssl_pem_key_cert_pair)); + config->pem_key_cert_pair->cert_chain = gpr_strdup(pem_key_cert_pair->cert_chain); - config->pem_key_cert_pair.private_key = + config->pem_key_cert_pair->private_key = gpr_strdup(pem_key_cert_pair->private_key); } } @@ -117,11 +122,8 @@ grpc_channel_credentials *grpc_ssl_credentials_create( static void ssl_server_destruct(grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds) { grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; - size_t i; - for (i = 0; i < c->config.num_key_cert_pairs; i++) { - ssl_config_pem_key_cert_pair_destroy(&c->config.pem_key_cert_pairs[i]); - } - gpr_free(c->config.pem_key_cert_pairs); + grpc_tsi_ssl_pem_key_cert_pairs_destroy(c->config.pem_key_cert_pairs, + c->config.num_key_cert_pairs); gpr_free(c->config.pem_root_certs); } @@ -136,30 +138,36 @@ static grpc_security_status ssl_server_create_security_connector( static grpc_server_credentials_vtable ssl_server_vtable = { ssl_server_destruct, ssl_server_create_security_connector}; +tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs( + const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs) { + tsi_ssl_pem_key_cert_pair *tsi_pairs = NULL; + if (num_key_cert_pairs > 0) { + GPR_ASSERT(pem_key_cert_pairs != NULL); + tsi_pairs = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( + num_key_cert_pairs * sizeof(tsi_ssl_pem_key_cert_pair)); + } + for (size_t i = 0; i < num_key_cert_pairs; i++) { + GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL); + GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL); + tsi_pairs[i].cert_chain = gpr_strdup(pem_key_cert_pairs[i].cert_chain); + tsi_pairs[i].private_key = gpr_strdup(pem_key_cert_pairs[i].private_key); + } + return tsi_pairs; +} + static void ssl_build_server_config( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_config *config) { - size_t i; config->client_certificate_request = client_certificate_request; if (pem_root_certs != NULL) { config->pem_root_certs = gpr_strdup(pem_root_certs); } - if (num_key_cert_pairs > 0) { - GPR_ASSERT(pem_key_cert_pairs != NULL); - config->pem_key_cert_pairs = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( - num_key_cert_pairs * sizeof(tsi_ssl_pem_key_cert_pair)); - } + config->pem_key_cert_pairs = grpc_convert_grpc_to_tsi_cert_pairs( + pem_key_cert_pairs, num_key_cert_pairs); config->num_key_cert_pairs = num_key_cert_pairs; - for (i = 0; i < num_key_cert_pairs; i++) { - GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL); - GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL); - config->pem_key_cert_pairs[i].cert_chain = - gpr_strdup(pem_key_cert_pairs[i].cert_chain); - config->pem_key_cert_pairs[i].private_key = - gpr_strdup(pem_key_cert_pairs[i].private_key); - } } grpc_server_credentials *grpc_ssl_server_credentials_create( diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.h b/src/core/lib/security/credentials/ssl/ssl_credentials.h index b43c656cd7..42e425d9f1 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.h +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.h @@ -20,6 +20,10 @@ #include "src/core/lib/security/credentials/credentials.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { grpc_channel_credentials base; grpc_ssl_config config; @@ -30,4 +34,15 @@ typedef struct { grpc_ssl_server_config config; } grpc_ssl_server_credentials; +tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs( + const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs); + +void grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_ssl_pem_key_cert_pair *kp, + size_t num_key_cert_pairs); + +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_SSL_SSL_CREDENTIALS_H */ diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index ae5633b82c..859d04ae5a 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -379,6 +379,13 @@ static void endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); } +static void endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *secure_ep, + grpc_pollset_set *pollset_set) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_delete_from_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); +} + static char *endpoint_get_peer(grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; return grpc_endpoint_get_peer(ep->wrapped_ep); @@ -399,6 +406,7 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_write, endpoint_add_to_pollset, endpoint_add_to_pollset_set, + endpoint_delete_from_pollset_set, endpoint_shutdown, endpoint_destroy, endpoint_get_resource_user, diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc index 80d9a7b77f..b050be2129 100644 --- a/src/core/lib/security/transport/security_connector.cc +++ b/src/core/lib/security/transport/security_connector.cc @@ -942,10 +942,11 @@ grpc_security_status grpc_ssl_channel_security_connector_create( c->overridden_target_name = gpr_strdup(overridden_target_name); } - has_key_cert_pair = config->pem_key_cert_pair.private_key != NULL && - config->pem_key_cert_pair.cert_chain != NULL; + has_key_cert_pair = config->pem_key_cert_pair != NULL && + config->pem_key_cert_pair->private_key != NULL && + config->pem_key_cert_pair->cert_chain != NULL; result = tsi_create_ssl_client_handshaker_factory( - has_key_cert_pair ? &config->pem_key_cert_pair : NULL, pem_root_certs, + has_key_cert_pair ? config->pem_key_cert_pair : NULL, pem_root_certs, ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols, &c->client_handshaker_factory); if (result != TSI_OK) { diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 216bb35e81..8287151f44 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -204,7 +204,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create( /* Config for ssl clients. */ typedef struct { - tsi_ssl_pem_key_cert_pair pem_key_cert_pair; + tsi_ssl_pem_key_cert_pair *pem_key_cert_pair; char *pem_root_certs; } grpc_ssl_config; diff --git a/src/core/lib/support/cpu_linux.cc b/src/core/lib/support/cpu_linux.cc index 53619caa5f..2280668442 100644 --- a/src/core/lib/support/cpu_linux.cc +++ b/src/core/lib/support/cpu_linux.cc @@ -38,9 +38,8 @@ static int ncpus = 0; static void init_num_cpus() { /* This must be signed. sysconf returns -1 when the number cannot be determined */ - int cpu = sched_getcpu(); ncpus = (int)sysconf(_SC_NPROCESSORS_ONLN); - if (ncpus < 1 || cpu < 0) { + if (ncpus < 1) { gpr_log(GPR_ERROR, "Cannot determine number of CPUs: assuming 1"); ncpus = 1; } @@ -57,9 +56,6 @@ unsigned gpr_cpu_current_cpu(void) { // sched_getcpu() is undefined on musl return 0; #else - if (gpr_cpu_num_cores() == 1) { - return 0; - } int cpu = sched_getcpu(); if (cpu < 0) { gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno)); diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index 470c127f7f..750da39599 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -40,15 +40,8 @@ class BdpEstimator { explicit BdpEstimator(const char *name); ~BdpEstimator() {} - // Returns true if a reasonable estimate could be obtained - bool EstimateBdp(int64_t *estimate_out) const { - *estimate_out = estimate_; - return true; - } - bool EstimateBandwidth(double *bw_out) const { - *bw_out = bw_est_; - return true; - } + int64_t EstimateBdp() const { return estimate_; } + double EstimateBandwidth() const { return bw_est_; } void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; } |