diff options
Diffstat (limited to 'src/core')
30 files changed, 1993 insertions, 1585 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 0ef06ae6e0..02fc53122d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -54,7 +54,6 @@ #include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" -#define DEFAULT_WINDOW 65535 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) @@ -152,10 +151,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); +static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, + void *tp, grpc_error *error); static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); @@ -218,8 +221,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, t->write_cb_pool = next; } - t->flow_control.bdp_estimator.Destroy(); + t->flow_control.Destroy(); + GRPC_ERROR_UNREF(t->closed_with_error); gpr_free(t->ping_acks); gpr_free(t->peer_string); gpr_free(t); @@ -277,10 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->endpoint_reading = 1; t->next_stream_id = is_client ? 1 : 2; t->is_client = is_client; - t->flow_control.remote_window = DEFAULT_WINDOW; - t->flow_control.announced_window = DEFAULT_WINDOW; - t->flow_control.target_initial_window_size = DEFAULT_WINDOW; - t->flow_control.t = t; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -305,6 +305,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, + next_bdp_ping_timer_expired_locked, t, + grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, @@ -317,8 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); - t->flow_control.bdp_estimator.Init(t->peer_string); - grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); @@ -342,8 +343,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, window -- this should by rights be 0 */ t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->sent_local_settings = 0; - t->write_buffer_size = DEFAULT_WINDOW; - t->flow_control.enable_bdp_probe = true; + t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( @@ -388,6 +388,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + bool enable_bdp = true; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -448,8 +450,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { - t->flow_control.enable_bdp_probe = - grpc_channel_arg_get_integer(&channel_args->args[i], {1, 0, 1}); + enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { const int value = grpc_channel_arg_get_integer( @@ -544,6 +545,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + t->flow_control.Init(exec_ctx, t, enable_bdp); + /* No pings allowed before receiving a header or data frame. */ t->ping_state.pings_before_data_required = 0; t->ping_state.is_delayed_ping_timer_set = false; @@ -564,10 +567,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, - NULL); + if (enable_bdp) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + schedule_bdp_ping_locked(exec_ctx, t); + + grpc_chttp2_act_on_flowctl_action( + exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL); + } grpc_chttp2_initiate_write(exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); @@ -597,7 +603,9 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { - if (!t->closed) { + end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); + cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + if (t->closed_with_error == GRPC_ERROR_NONE) { if (!grpc_error_has_clear_grpc_status(error)) { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); @@ -612,13 +620,16 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - t->closed = 1; + GPR_ASSERT(error != GRPC_ERROR_NONE); + t->closed_with_error = GRPC_ERROR_REF(error); connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); - grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); if (t->ping_state.is_delayed_ping_timer_set) { grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); } + if (t->have_next_bdp_ping_timer) { + grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer); + } switch (t->keepalive_state) { case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); @@ -638,8 +649,8 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, while (grpc_chttp2_list_pop_writable_stream(t, &s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } - end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); - cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE); + grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -700,7 +711,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, post_destructive_reclaimer(exec_ctx, t); } - s->flow_control.s = s; + s->flow_control.Init(t->flow_control.get(), s); GPR_TIMER_END("init_stream", 0); return 0; @@ -751,7 +762,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->byte_stream_error); - grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control); + s->flow_control.Destroy(); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -942,7 +953,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { + if (t->closed_with_error == GRPC_ERROR_NONE && + grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } } @@ -995,7 +1007,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; - if (t->closed) { + if (t->closed_with_error != GRPC_ERROR_NONE) { r.writing = false; } else { r = grpc_chttp2_begin_write(exec_ctx, t); @@ -1458,7 +1470,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (!s->write_closed) { if (t->is_client) { - if (!t->closed) { + if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_ASSERT(s->id == 0); grpc_chttp2_list_add_waiting_for_concurrency(t, s); maybe_start_some_streams(exec_ctx, t); @@ -1466,7 +1478,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_cancel_stream( exec_ctx, t, s, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"), + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } else { @@ -1618,13 +1631,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (s->id != 0) { if (!s->read_closed) { already_received = s->frame_storage.length; - grpc_chttp2_flowctl_incoming_bs_update( - &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES, - already_received); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - &s->flow_control), - t, s); + s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES, + already_received); + grpc_chttp2_act_on_flowctl_action(exec_ctx, + s->flow_control->MakeAction(), t, s); } } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); @@ -1688,6 +1698,7 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ grpc_chttp2_ping_queue *pq = &t->ping_queue; + GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); @@ -1697,6 +1708,12 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure *on_initiate, grpc_closure *on_ack) { + if (t->closed_with_error != GRPC_ERROR_NONE) { + GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, + GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error)); + return; + } grpc_chttp2_ping_queue *pq = &t->ping_queue; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); @@ -1755,7 +1772,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); /*The transport will be closed after the write is done */ close_transport_locked( - exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings")); + exec_ctx, t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } @@ -2391,57 +2410,46 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, * INPUT PROCESSING - PARSING */ -void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, - grpc_chttp2_flowctl_action action, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - switch (action.send_stream_update) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: +template <class F> +static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_core::chttp2::FlowControlAction::Urgency urgency, + grpc_chttp2_initiate_write_reason reason, F action) { + switch (urgency) { + case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED: break; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); - grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL); + case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY: + grpc_chttp2_initiate_write(exec_ctx, t, reason); + // fallthrough + case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE: + action(); break; - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); - break; - } - switch (action.send_transport_update) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: - break; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); - break; - // this is the same as no action b/c every time the transport enters the - // writing path it will maybe do an update - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - break; - } - if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - if (action.initial_window_size > 0) { - queue_setting_update(exec_ctx, t, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - (uint32_t)action.initial_window_size); - } - if (action.max_frame_size > 0) { - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - (uint32_t)action.max_frame_size); - } - if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); - } - } - if (action.need_ping) { - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - t->flow_control.bdp_estimator->SchedulePing(); - send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, - &t->finish_bdp_ping_locked); } } +void grpc_chttp2_act_on_flowctl_action( + grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action, + grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + WithUrgency( + exec_ctx, t, action.send_stream_update(), + GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, + [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); }); + WithUrgency(exec_ctx, t, action.send_transport_update(), + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {}); + WithUrgency(exec_ctx, t, action.send_initial_window_update(), + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, + [exec_ctx, t, &action]() { + queue_setting_update(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + action.initial_window_size()); + }); + WithUrgency( + exec_ctx, t, action.send_max_frame_size_update(), + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() { + queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + action.max_frame_size()); + }); +} + static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_http_parser parser; @@ -2489,13 +2497,13 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } GPR_SWAP(grpc_error *, err, error); GRPC_ERROR_UNREF(err); - if (!t->closed) { + if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { - t->flow_control.bdp_estimator->AddIncomingBytes( + t->flow_control->bdp_estimator()->AddIncomingBytes( (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); errors[1] = grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); @@ -2512,8 +2520,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action.parse", 0); GPR_TIMER_BEGIN("post_parse_locked", 0); - if (t->flow_control.initial_window_update != 0) { - if (t->flow_control.initial_window_update > 0) { + if (t->initial_window_update != 0) { + if (t->initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); @@ -2522,20 +2530,21 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } - t->flow_control.initial_window_update = 0; + t->initial_window_update = 0; } GPR_TIMER_END("post_parse_locked", 0); } GPR_TIMER_BEGIN("post_reading_action_locked", 0); bool keep_reading = false; - if (error == GRPC_ERROR_NONE && t->closed) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"); + if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1); } if (error != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; - } else if (!t->closed) { + } else if (t->closed_with_error == GRPC_ERROR_NONE) { keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } @@ -2544,10 +2553,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, - NULL); + grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(), + t, NULL); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -2560,28 +2567,60 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action_locked", 0); } +// t is reffed prior to calling the first time, and once the callback chain +// that kicks off finishes, it's unreffed +static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + t->flow_control->bdp_estimator()->SchedulePing(); + send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, + &t->finish_bdp_ping_locked); +} + static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { - gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string); + gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string, + grpc_error_string(error)); } /* Reset the keepalive ping timer */ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); } - t->flow_control.bdp_estimator->StartPing(); + t->flow_control->bdp_estimator()->StartPing(); } static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { - gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); + gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string, + grpc_error_string(error)); } - t->flow_control.bdp_estimator->CompletePing(exec_ctx); + if (error != GRPC_ERROR_NONE) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + return; + } + grpc_millis next_ping = + t->flow_control->bdp_estimator()->CompletePing(exec_ctx); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr); + GPR_ASSERT(!t->have_next_bdp_ping_timer); + t->have_next_bdp_ping_timer = true; + grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping, + &t->next_bdp_ping_timer_expired_locked); +} - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, + void *tp, grpc_error *error) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; + GPR_ASSERT(t->have_next_bdp_ping_timer); + t->have_next_bdp_ping_timer = false; + if (error != GRPC_ERROR_NONE) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + return; + } + schedule_bdp_ping_locked(exec_ctx, t); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, @@ -2646,7 +2685,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (t->destroying || t->closed) { + if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; } else if (error == GRPC_ERROR_NONE) { if (t->keepalive_permit_without_calls || @@ -2704,8 +2743,11 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout")); + close_transport_locked( + exec_ctx, t, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL)); } } else { /* The watchdog timer should have been cancelled by @@ -2788,13 +2830,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, size_t cur_length = s->frame_storage.length; if (!s->read_closed) { - grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control, - bs->next_action.max_size_hint, - cur_length); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - &s->flow_control), - t, s); + s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint, + cur_length); + grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(), + t, s); } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 60c43d840a..dd80036530 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -16,7 +16,7 @@ * */ -#include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/ext/transport/chttp2/transport/flow_control.h" #include <inttypes.h> #include <limits.h> @@ -28,38 +28,15 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/support/string.h" -static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport_flowctl* tfc); - -#ifndef NDEBUG - -typedef struct { - int64_t remote_window; - int64_t target_window; - int64_t announced_window; - int64_t remote_window_delta; - int64_t local_window_delta; - int64_t announced_window_delta; - uint32_t local_init_window; - uint32_t local_max_frame; -} shadow_flow_control; - -static void pretrace(shadow_flow_control* shadow_fc, - grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - shadow_fc->remote_window = tfc->remote_window; - shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc); - shadow_fc->announced_window = tfc->announced_window; - if (sfc != NULL) { - shadow_fc->remote_window_delta = sfc->remote_window_delta; - shadow_fc->local_window_delta = sfc->local_window_delta; - shadow_fc->announced_window_delta = sfc->announced_window_delta; - } -} +namespace grpc_core { +namespace chttp2 { + +namespace { -#define TRACE_PADDING 30 +static constexpr const int kTracePadding = 30; static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { char* str; @@ -68,7 +45,7 @@ static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { } else { gpr_asprintf(&str, "%" PRId64 "", old_val); } - char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); + char* str_lp = gpr_leftpad(str, ' ', kTracePadding); gpr_free(str); return str_lp; } @@ -80,47 +57,58 @@ static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) { } else { gpr_asprintf(&str, "%" PRIu32 "", old_val); } - char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); + char* str_lp = gpr_leftpad(str, ' ', kTracePadding); gpr_free(str); return str_lp; } +} // namespace + +void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc, + StreamFlowControl* sfc) { + tfc_ = tfc; + sfc_ = sfc; + reason_ = reason; + remote_window_ = tfc->remote_window(); + target_window_ = tfc->target_window(); + announced_window_ = tfc->announced_window(); + if (sfc != nullptr) { + remote_window_delta_ = sfc->remote_window_delta(); + local_window_delta_ = sfc->local_window_delta(); + announced_window_delta_ = sfc->announced_window_delta(); + } +} -static void posttrace(shadow_flow_control* shadow_fc, - grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, const char* reason) { +void FlowControlTrace::Finish() { uint32_t acked_local_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; uint32_t remote_window = - tfc->t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - char* trw_str = - fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window); - char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window, - grpc_chttp2_target_announced_window(tfc)); + tfc_->transport()->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window()); + char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window()); char* taw_str = - fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window); + fmt_int64_diff_str(announced_window_, tfc_->announced_window()); char* srw_str; char* slw_str; char* saw_str; - if (sfc != NULL) { - srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window, - sfc->remote_window_delta + remote_window); - slw_str = - fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window, - sfc->local_window_delta + acked_local_window); - saw_str = fmt_int64_diff_str( - shadow_fc->announced_window_delta + acked_local_window, - sfc->announced_window_delta + acked_local_window); + if (sfc_ != nullptr) { + srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window, + sfc_->remote_window_delta() + remote_window); + slw_str = fmt_int64_diff_str(local_window_delta_ + acked_local_window, + local_window_delta_ + acked_local_window); + saw_str = fmt_int64_diff_str(announced_window_delta_ + acked_local_window, + announced_window_delta_ + acked_local_window); } else { - srw_str = gpr_leftpad("", ' ', TRACE_PADDING); - slw_str = gpr_leftpad("", ' ', TRACE_PADDING); - saw_str = gpr_leftpad("", ' ', TRACE_PADDING); + srw_str = gpr_leftpad("", ' ', kTracePadding); + slw_str = gpr_leftpad("", ' ', kTracePadding); + saw_str = gpr_leftpad("", ' ', kTracePadding); } gpr_log(GPR_DEBUG, "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", - tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr", - reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str); + tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0, + tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str, + tlw_str, taw_str, srw_str, slw_str, saw_str); gpr_free(trw_str); gpr_free(tlw_str); gpr_free(taw_str); @@ -129,13 +117,13 @@ static void posttrace(shadow_flow_control* shadow_fc, gpr_free(saw_str); } -static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { - switch (urgency) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: +const char* FlowControlAction::UrgencyString(Urgency u) { + switch (u) { + case Urgency::NO_ACTION_NEEDED: return "no action"; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: + case Urgency::UPDATE_IMMEDIATELY: return "update immediately"; - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: + case Urgency::QUEUE_UPDATE: return "queue update"; default: GPR_UNREACHABLE_CODE(return "unknown"); @@ -143,209 +131,132 @@ static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { GPR_UNREACHABLE_CODE(return "unknown"); } -static void trace_action(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_flowctl_action action) { +void FlowControlAction::Trace(grpc_chttp2_transport* t) const { char* iw_str = fmt_uint32_diff_str( - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - action.initial_window_size); + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + initial_window_size_); char* mf_str = fmt_uint32_diff_str( - tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - action.max_frame_size); - gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s", - urgency_to_string(action.send_transport_update), - urgency_to_string(action.send_stream_update), - urgency_to_string(action.send_setting_update), iw_str, mf_str); + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + max_frame_size_); + gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s", + UrgencyString(send_transport_update_), + UrgencyString(send_stream_update_), + UrgencyString(send_initial_window_update_), iw_str, + UrgencyString(send_max_frame_size_update_), mf_str); gpr_free(iw_str); gpr_free(mf_str); } -#define PRETRACE(tfc, sfc) \ - shadow_flow_control shadow_fc; \ - GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc)) -#define POSTTRACE(tfc, sfc, reason) \ - GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason)) -#define TRACEACTION(tfc, action) \ - GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action)) -#else -#define PRETRACE(tfc, sfc) -#define POSTTRACE(tfc, sfc, reason) -#define TRACEACTION(tfc, action) -#endif - -/* How many bytes of incoming flow control would we like to advertise */ -static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport_flowctl* tfc) { - return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), - tfc->announced_stream_total_over_incoming_window + - tfc->target_initial_window_size); -} - -// we have sent data on the wire, we must track this in our bookkeeping for the -// remote peer's flow control. -void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - int64_t size) { - PRETRACE(tfc, sfc); - tfc->remote_window -= size; - sfc->remote_window_delta -= size; - POSTTRACE(tfc, sfc, " data sent"); -} - -static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - if (sfc->announced_window_delta > 0) { - tfc->announced_stream_total_over_incoming_window -= - sfc->announced_window_delta; - } else { - tfc->announced_stream_total_under_incoming_window += - -sfc->announced_window_delta; - } -} - -static void announced_window_delta_postupdate( - grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { - if (sfc->announced_window_delta > 0) { - tfc->announced_stream_total_over_incoming_window += - sfc->announced_window_delta; - } else { - tfc->announced_stream_total_under_incoming_window -= - -sfc->announced_window_delta; +TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx, + const grpc_chttp2_transport* t, + bool enable_bdp_probe) + : t_(t), + enable_bdp_probe_(enable_bdp_probe), + bdp_estimator_(t->peer_string), + pid_controller_(grpc_core::PidController::Args() + .set_gain_p(4) + .set_gain_i(8) + .set_gain_d(0) + .set_initial_control_value(TargetLogBdp()) + .set_min_control_value(-1) + .set_max_control_value(25) + .set_integral_range(10)), + last_pid_update_(grpc_exec_ctx_now(exec_ctx)) {} + +uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) { + FlowControlTrace trace("t updt sent", this, nullptr); + const uint32_t target_announced_window = target_window(); + if ((writing_anyway || announced_window_ <= target_announced_window / 2) && + announced_window_ != target_announced_window) { + const uint32_t announce = (uint32_t)GPR_CLAMP( + target_announced_window - announced_window_, 0, UINT32_MAX); + announced_window_ += announce; + return announce; } + return 0; } -// We have received data from the wire. We must track this in our own flow -// control bookkeeping. -// Returns an error if the incoming frame violates our flow control. -grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - int64_t incoming_frame_size) { - uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - uint32_t acked_init_window = - tfc->t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - PRETRACE(tfc, sfc); - if (incoming_frame_size > tfc->announced_window) { +grpc_error* TransportFlowControl::ValidateRecvData( + int64_t incoming_frame_size) { + if (incoming_frame_size > announced_window_) { char* msg; gpr_asprintf(&msg, "frame of size %" PRId64 " overflows local window of %" PRId64, - incoming_frame_size, tfc->announced_window); + incoming_frame_size, announced_window_); grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return err; } + return GRPC_ERROR_NONE; +} - if (sfc != NULL) { - int64_t acked_stream_window = - sfc->announced_window_delta + acked_init_window; - int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window; - if (incoming_frame_size > acked_stream_window) { - if (incoming_frame_size <= sent_stream_window) { - gpr_log( - GPR_ERROR, - "Incoming frame of size %" PRId64 - " exceeds local window size of %" PRId64 - ".\n" - "The (un-acked, future) window size would be %" PRId64 - " which is not exceeded.\n" - "This would usually cause a disconnection, but allowing it due to" - "broken HTTP2 implementations in the wild.\n" - "See (for example) https://github.com/netty/netty/issues/6520.", - incoming_frame_size, acked_stream_window, sent_stream_window); - } else { - char* msg; - gpr_asprintf(&msg, "frame of size %" PRId64 - " overflows local window of %" PRId64, - incoming_frame_size, acked_stream_window); - grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - gpr_free(msg); - return err; - } - } - - announced_window_delta_preupdate(tfc, sfc); - sfc->announced_window_delta -= incoming_frame_size; - announced_window_delta_postupdate(tfc, sfc); - sfc->local_window_delta -= incoming_frame_size; - } +StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc, + const grpc_chttp2_stream* s) + : tfc_(tfc), s_(s) {} - tfc->announced_window -= incoming_frame_size; +grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) { + FlowControlTrace trace(" data recv", tfc_, this); - POSTTRACE(tfc, sfc, " data recv"); - return GRPC_ERROR_NONE; -} + grpc_error* error = GRPC_ERROR_NONE; + error = tfc_->ValidateRecvData(incoming_frame_size); + if (error != GRPC_ERROR_NONE) return error; -// Returns a non zero announce integer if we should send a transport window -// update -uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl* tfc, bool writing_anyway) { - PRETRACE(tfc, NULL); - uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); - uint32_t threshold_to_send_transport_window_update = - tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4 - : target_announced_window / 2; - if ((writing_anyway || - tfc->announced_window <= threshold_to_send_transport_window_update) && - tfc->announced_window != target_announced_window) { - uint32_t announce = (uint32_t)GPR_CLAMP( - target_announced_window - tfc->announced_window, 0, UINT32_MAX); - tfc->announced_window += announce; - POSTTRACE(tfc, NULL, "t updt sent"); - return announce; + uint32_t sent_init_window = + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + uint32_t acked_init_window = + tfc_->transport()->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + + int64_t acked_stream_window = announced_window_delta_ + acked_init_window; + int64_t sent_stream_window = announced_window_delta_ + sent_init_window; + if (incoming_frame_size > acked_stream_window) { + if (incoming_frame_size <= sent_stream_window) { + gpr_log(GPR_ERROR, + "Incoming frame of size %" PRId64 + " exceeds local window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to" + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + incoming_frame_size, acked_stream_window, sent_stream_window); + } else { + char* msg; + gpr_asprintf(&msg, "frame of size %" PRId64 + " overflows local window of %" PRId64, + incoming_frame_size, acked_stream_window); + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + return err; + } } - GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc, - tfc->t->is_client ? "cli" : "svr")); - return 0; + + UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size); + local_window_delta_ -= incoming_frame_size; + tfc_->CommitRecvData(incoming_frame_size); + return GRPC_ERROR_NONE; } -// Returns a non zero announce integer if we should send a stream window update -uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( - grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { - PRETRACE(tfc, sfc); - if (sfc->local_window_delta > sfc->announced_window_delta) { +uint32_t StreamFlowControl::MaybeSendUpdate() { + FlowControlTrace trace("s updt sent", tfc_, this); + if (local_window_delta_ > announced_window_delta_) { uint32_t announce = (uint32_t)GPR_CLAMP( - sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX); - announced_window_delta_preupdate(tfc, sfc); - sfc->announced_window_delta += announce; - announced_window_delta_postupdate(tfc, sfc); - POSTTRACE(tfc, sfc, "s updt sent"); + local_window_delta_ - announced_window_delta_, 0, UINT32_MAX); + UpdateAnnouncedWindowDelta(tfc_, announce); return announce; } - GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc, - sfc->s->id, tfc->t->is_client ? "cli" : "svr")); return 0; } -// we have received a WINDOW_UPDATE frame for a transport -void grpc_chttp2_flowctl_recv_transport_update( - grpc_chttp2_transport_flowctl* tfc, uint32_t size) { - PRETRACE(tfc, NULL); - tfc->remote_window += size; - POSTTRACE(tfc, NULL, "t updt recv"); -} - -// we have received a WINDOW_UPDATE frame for a stream -void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - uint32_t size) { - PRETRACE(tfc, sfc); - sfc->remote_window_delta += size; - POSTTRACE(tfc, sfc, "s updt recv"); -} - -void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - size_t max_size_hint, - size_t have_already) { - PRETRACE(tfc, sfc); +void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) { + FlowControlTrace trace("app st recv", tfc_, this); uint32_t max_recv_bytes; uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; /* clamp max recv hint to an allowable size */ if (max_size_hint >= UINT32_MAX - sent_init_window) { @@ -363,68 +274,18 @@ void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, /* add some small lookahead to keep pipelines flowing */ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window); - if (sfc->local_window_delta < max_recv_bytes) { + if (local_window_delta_ < max_recv_bytes) { uint32_t add_max_recv_bytes = - (uint32_t)(max_recv_bytes - sfc->local_window_delta); - sfc->local_window_delta += add_max_recv_bytes; - } - POSTTRACE(tfc, sfc, "app st recv"); -} - -void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - announced_window_delta_preupdate(tfc, sfc); -} - -// Returns an urgency with which to make an update -static grpc_chttp2_flowctl_urgency delta_is_significant( - const grpc_chttp2_transport_flowctl* tfc, int32_t value, - grpc_chttp2_setting_id setting_id) { - int64_t delta = (int64_t)value - - (int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id]; - // TODO(ncteisen): tune this - if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) { - return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; - } else { - return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED; + (uint32_t)(max_recv_bytes - local_window_delta_); + local_window_delta_ += add_max_recv_bytes; } } -// Takes in a target and uses the pid controller to return a stabilized -// guess at the new bdp. -static double get_pid_controller_guess(grpc_exec_ctx* exec_ctx, - grpc_chttp2_transport_flowctl* tfc, - double target) { - grpc_millis now = grpc_exec_ctx_now(exec_ctx); - if (!tfc->pid_controller_initialized) { - tfc->last_pid_update = now; - tfc->pid_controller_initialized = true; - grpc_pid_controller_args args; - memset(&args, 0, sizeof(args)); - args.gain_p = 4; - args.gain_i = 8; - args.gain_d = 0; - args.initial_control_value = target; - args.min_control_value = -1; - args.max_control_value = 25; - args.integral_range = 10; - grpc_pid_controller_init(&tfc->pid_controller, args); - return pow(2, target); - } - double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller); - double dt = (double)(now - tfc->last_pid_update) * 1e-3; - double log2_bdp_guess = - grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt); - tfc->last_pid_update = now; - return pow(2, log2_bdp_guess); -} - // Take in a target and modifies it based on the memory pressure of the system -static double get_target_under_memory_pressure( - grpc_chttp2_transport_flowctl* tfc, double target) { +static double AdjustForMemoryPressure(grpc_resource_quota* quota, + double target) { // do not increase window under heavy memory pressure. - double memory_pressure = grpc_resource_quota_get_memory_pressure( - grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep))); + double memory_pressure = grpc_resource_quota_get_memory_pressure(quota); static const double kLowMemPressure = 0.1; static const double kZeroTarget = 22; static const double kHighMemPressure = 0.8; @@ -439,77 +300,82 @@ static double get_target_under_memory_pressure( return target; } -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - grpc_exec_ctx* exec_ctx, grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - grpc_chttp2_flowctl_action action; - memset(&action, 0, sizeof(action)); +double TransportFlowControl::TargetLogBdp() { + return AdjustForMemoryPressure( + grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)), + 1 + log2(bdp_estimator_.EstimateBdp())); +} + +double TransportFlowControl::SmoothLogBdp(grpc_exec_ctx* exec_ctx, + double value) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + double bdp_error = value - pid_controller_.last_control_value(); + const double dt = (double)(now - last_pid_update_) * 1e-3; + last_pid_update_ = now; + return pid_controller_.Update(bdp_error, dt); +} + +FlowControlAction::Urgency TransportFlowControl::DeltaUrgency( + int32_t value, grpc_chttp2_setting_id setting_id) { + int64_t delta = + (int64_t)value - (int64_t)t_->settings[GRPC_LOCAL_SETTINGS][setting_id]; // TODO(ncteisen): tune this - if (sfc != NULL && !sfc->s->read_closed) { - uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - if ((int64_t)sfc->local_window_delta > - (int64_t)sfc->announced_window_delta && - (int64_t)sfc->announced_window_delta + sent_init_window <= - sent_init_window / 2) { - action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; - } else if (sfc->local_window_delta > sfc->announced_window_delta) { - action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; - } + if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) { + return FlowControlAction::Urgency::QUEUE_UPDATE; + } else { + return FlowControlAction::Urgency::NO_ACTION_NEEDED; } - if (tfc->enable_bdp_probe) { - action.need_ping = tfc->bdp_estimator->NeedPing(exec_ctx); +} +FlowControlAction TransportFlowControl::PeriodicUpdate( + grpc_exec_ctx* exec_ctx) { + FlowControlAction action; + if (enable_bdp_probe_) { // get bdp estimate and update initial_window accordingly. - int64_t estimate = -1; - if (tfc->bdp_estimator->EstimateBdp(&estimate)) { - double target = 1 + log2((double)estimate); - - // target might change based on how much memory pressure we are under - // TODO(ncteisen): experiment with setting target to be huge under low - // memory pressure. - target = get_target_under_memory_pressure(tfc, target); - - // run our target through the pid controller to stabilize change. - // TODO(ncteisen): experiment with other controllers here. - double bdp_guess = get_pid_controller_guess(exec_ctx, tfc, target); - - // Though initial window 'could' drop to 0, we keep the floor at 128 - tfc->target_initial_window_size = - (int32_t)GPR_CLAMP(bdp_guess, 128, INT32_MAX); - - grpc_chttp2_flowctl_urgency init_window_update_urgency = - delta_is_significant(tfc, tfc->target_initial_window_size, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE); - if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - action.send_setting_update = init_window_update_urgency; - action.initial_window_size = (uint32_t)tfc->target_initial_window_size; - } - } + // target might change based on how much memory pressure we are under + // TODO(ncteisen): experiment with setting target to be huge under low + // memory pressure. + const double target = pow(2, SmoothLogBdp(exec_ctx, TargetLogBdp())); + + // Though initial window 'could' drop to 0, we keep the floor at 128 + target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX); + + action.set_send_initial_window_update( + DeltaUrgency(target_initial_window_size_, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE), + target_initial_window_size_); // get bandwidth estimate and update max_frame accordingly. - double bw_dbl = -1; - if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) { - // we target the max of BDP or bandwidth in microseconds. - int32_t frame_size = (int32_t)GPR_CLAMP( - GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, - tfc->target_initial_window_size), - 16384, 16777215); - grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant( - tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE); - if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - if (frame_size_urgency > action.send_setting_update) { - action.send_setting_update = frame_size_urgency; - } - action.max_frame_size = (uint32_t)frame_size; - } - } + double bw_dbl = bdp_estimator_.EstimateBandwidth(); + // we target the max of BDP or bandwidth in microseconds. + int32_t frame_size = (int32_t)GPR_CLAMP( + GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, + target_initial_window_size_), + 16384, 16777215); + action.set_send_max_frame_size_update( + DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE), + frame_size); } - uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); - if (tfc->announced_window < target_announced_window / 2) { - action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; + return UpdateAction(action); +} + +FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) { + // TODO(ncteisen): tune this + if (!s_->read_closed) { + uint32_t sent_init_window = + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + if (local_window_delta_ > announced_window_delta_ && + announced_window_delta_ + sent_init_window <= sent_init_window / 2) { + action.set_send_stream_update( + FlowControlAction::Urgency::UPDATE_IMMEDIATELY); + } else if (local_window_delta_ > announced_window_delta_) { + action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE); + } } - TRACEACTION(tfc, action); + return action; } + +} // namespace chttp2 +} // namespace grpc_core diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h new file mode 100644 index 0000000000..d5107d467b --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -0,0 +1,328 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H + +#include <stdint.h> + +#include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/http2_settings.h" +#include "src/core/lib/support/manual_constructor.h" +#include "src/core/lib/transport/bdp_estimator.h" +#include "src/core/lib/transport/pid_controller.h" + +struct grpc_chttp2_transport; +struct grpc_chttp2_stream; + +extern "C" grpc_tracer_flag grpc_flowctl_trace; + +namespace grpc_core { +namespace chttp2 { + +static constexpr uint32_t kDefaultWindow = 65535; + +class TransportFlowControl; +class StreamFlowControl; + +class FlowControlAction { + public: + enum class Urgency : uint8_t { + // Nothing to be done. + NO_ACTION_NEEDED = 0, + // Initiate a write to update the initial window immediately. + UPDATE_IMMEDIATELY, + // Push the flow control update into a send buffer, to be sent + // out the next time a write is initiated. + QUEUE_UPDATE, + }; + + Urgency send_stream_update() const { return send_stream_update_; } + Urgency send_transport_update() const { return send_transport_update_; } + Urgency send_initial_window_update() const { + return send_initial_window_update_; + } + Urgency send_max_frame_size_update() const { + return send_max_frame_size_update_; + } + uint32_t initial_window_size() const { return initial_window_size_; } + uint32_t max_frame_size() const { return max_frame_size_; } + + FlowControlAction& set_send_stream_update(Urgency u) { + send_stream_update_ = u; + return *this; + } + FlowControlAction& set_send_transport_update(Urgency u) { + send_transport_update_ = u; + return *this; + } + FlowControlAction& set_send_initial_window_update(Urgency u, + uint32_t update) { + send_initial_window_update_ = u; + initial_window_size_ = update; + return *this; + } + FlowControlAction& set_send_max_frame_size_update(Urgency u, + uint32_t update) { + send_max_frame_size_update_ = u; + max_frame_size_ = update; + return *this; + } + + static const char* UrgencyString(Urgency u); + void Trace(grpc_chttp2_transport* t) const; + + private: + Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED; + Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED; + Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED; + Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED; + uint32_t initial_window_size_ = 0; + uint32_t max_frame_size_ = 0; +}; + +class FlowControlTrace { + public: + FlowControlTrace(const char* reason, TransportFlowControl* tfc, + StreamFlowControl* sfc) { + if (enabled_) Init(reason, tfc, sfc); + } + + ~FlowControlTrace() { + if (enabled_) Finish(); + } + + private: + void Init(const char* reason, TransportFlowControl* tfc, + StreamFlowControl* sfc); + void Finish(); + + const bool enabled_ = GRPC_TRACER_ON(grpc_flowctl_trace); + + TransportFlowControl* tfc_; + StreamFlowControl* sfc_; + const char* reason_; + int64_t remote_window_; + int64_t target_window_; + int64_t announced_window_; + int64_t remote_window_delta_; + int64_t local_window_delta_; + int64_t announced_window_delta_; +}; + +class TransportFlowControl { + public: + TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t, + bool enable_bdp_probe); + ~TransportFlowControl() {} + + bool bdp_probe() const { return enable_bdp_probe_; } + + // returns an announce if we should send a transport update to our peer, + // else returns zero; writing_anyway indicates if a write would happen + // regardless of the send - if it is false and this function returns non-zero, + // this announce will cause a write to occur + uint32_t MaybeSendUpdate(bool writing_anyway); + + // Reads the flow control data and returns and actionable struct that will + // tell chttp2 exactly what it needs to do + FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); } + + // Call periodically (at a low-ish rate, 100ms - 10s makes sense) + // to perform more complex flow control calculations and return an action + // to let chttp2 change its parameters + FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx); + + void StreamSentData(int64_t size) { remote_window_ -= size; } + + grpc_error* ValidateRecvData(int64_t incoming_frame_size); + void CommitRecvData(int64_t incoming_frame_size) { + announced_window_ -= incoming_frame_size; + } + + grpc_error* RecvData(int64_t incoming_frame_size) { + FlowControlTrace trace(" data recv", this, nullptr); + grpc_error* error = ValidateRecvData(incoming_frame_size); + if (error != GRPC_ERROR_NONE) return error; + CommitRecvData(incoming_frame_size); + return GRPC_ERROR_NONE; + } + + // we have received a WINDOW_UPDATE frame for a transport + void RecvUpdate(uint32_t size) { + FlowControlTrace trace("t updt recv", this, nullptr); + remote_window_ += size; + } + + int64_t remote_window() const { return remote_window_; } + int64_t target_window() const { + return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), + announced_stream_total_over_incoming_window_ + + target_initial_window_size_); + } + int64_t announced_window() const { return announced_window_; } + + const grpc_chttp2_transport* transport() const { return t_; } + + void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { + if (delta > 0) { + announced_stream_total_over_incoming_window_ -= delta; + } else { + announced_stream_total_under_incoming_window_ += -delta; + } + } + + void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { + if (delta > 0) { + announced_stream_total_over_incoming_window_ += delta; + } else { + announced_stream_total_under_incoming_window_ -= -delta; + } + } + + BdpEstimator* bdp_estimator() { return &bdp_estimator_; } + + void TestOnlyForceHugeWindow() { + announced_window_ = 1024 * 1024 * 1024; + remote_window_ = 1024 * 1024 * 1024; + } + + private: + double TargetLogBdp(); + double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value); + FlowControlAction::Urgency DeltaUrgency(int32_t value, + grpc_chttp2_setting_id setting_id); + + FlowControlAction UpdateAction(FlowControlAction action) { + if (announced_window_ < target_window() / 2) { + action.set_send_transport_update( + FlowControlAction::Urgency::UPDATE_IMMEDIATELY); + } + return action; + } + + const grpc_chttp2_transport* const t_; + + /** Our bookkeeping for the remote peer's available window */ + int64_t remote_window_ = kDefaultWindow; + + /** calculating what we should give for local window: + we track the total amount of flow control over initial window size + across all streams: this is data that we want to receive right now (it + has an outstanding read) + and the total amount of flow control under initial window size across all + streams: this is data we've read early + we want to adjust incoming_window such that: + incoming_window = total_over - max(bdp - total_under, 0) */ + int64_t announced_stream_total_over_incoming_window_ = 0; + int64_t announced_stream_total_under_incoming_window_ = 0; + + /** This is out window according to what we have sent to our remote peer. The + * difference between this and target window is what we use to decide when + * to send WINDOW_UPDATE frames. */ + int64_t announced_window_ = kDefaultWindow; + + int32_t target_initial_window_size_ = kDefaultWindow; + + /** should we probe bdp? */ + const bool enable_bdp_probe_; + + /* bdp estimation */ + grpc_core::BdpEstimator bdp_estimator_; + + /* pid controller */ + grpc_core::PidController pid_controller_; + grpc_millis last_pid_update_ = 0; +}; + +class StreamFlowControl { + public: + StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); + ~StreamFlowControl() { + tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); + } + + FlowControlAction UpdateAction(FlowControlAction action); + FlowControlAction MakeAction() { return UpdateAction(tfc_->MakeAction()); } + + // we have sent data on the wire, we must track this in our bookkeeping for + // the remote peer's flow control. + void SentData(int64_t outgoing_frame_size) { + FlowControlTrace tracer(" data sent", tfc_, this); + tfc_->StreamSentData(outgoing_frame_size); + remote_window_delta_ -= outgoing_frame_size; + } + + // we have received data from the wire + grpc_error* RecvData(int64_t incoming_frame_size); + + // returns an announce if we should send a stream update to our peer, else + // returns zero + uint32_t MaybeSendUpdate(); + + // we have received a WINDOW_UPDATE frame for a stream + void RecvUpdate(uint32_t size) { + FlowControlTrace trace("s updt recv", tfc_, this); + remote_window_delta_ += size; + } + + // the application is asking for a certain amount of bytes + void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already); + + int64_t remote_window_delta() const { return remote_window_delta_; } + int64_t local_window_delta() const { return local_window_delta_; } + int64_t announced_window_delta() const { return announced_window_delta_; } + + const grpc_chttp2_stream* stream() const { return s_; } + + void TestOnlyForceHugeWindow() { + announced_window_delta_ = 1024 * 1024 * 1024; + local_window_delta_ = 1024 * 1024 * 1024; + remote_window_delta_ = 1024 * 1024 * 1024; + } + + private: + TransportFlowControl* const tfc_; + const grpc_chttp2_stream* const s_; + + void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) { + tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); + announced_window_delta_ += change; + tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); + } + + /** window available for us to send to peer, over or under the initial + * window + * size of the transport... ie: + * remote_window = remote_window_delta + transport.initial_window_size */ + int64_t remote_window_delta_ = 0; + + /** window available for peer to send to us (as a delta on + * transport.initial_window_size) + * local_window = local_window_delta + transport.initial_window_size */ + int64_t local_window_delta_ = 0; + + /** window available for peer to send to us over this stream that we have + * announced to the peer */ + int64_t announced_window_delta_ = 0; +}; + +} // namespace chttp2 +} // namespace grpc_core + +#endif diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index 2995bf7310..db0245bb57 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -202,13 +202,13 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, } if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && parser->incoming_settings[id] != parser->value) { - t->flow_control.initial_window_update += + t->initial_window_update += (int64_t)parser->value - parser->incoming_settings[id]; if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_flowctl_trace)) { gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change", t, t->is_client ? "cli" : "svr", - (int)t->flow_control.initial_window_update); + (int)t->initial_window_update); } } parser->incoming_settings[id] = parser->value; diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.cc b/src/core/ext/transport/chttp2/transport/frame_window_update.cc index c9ab8d1b50..15eaf59285 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.cc +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.cc @@ -96,8 +96,7 @@ grpc_error *grpc_chttp2_window_update_parser_parse( if (t->incoming_stream_id != 0) { if (s != NULL) { - grpc_chttp2_flowctl_recv_stream_update( - &t->flow_control, &s->flow_control, received_update); + s->flow_control->RecvUpdate(received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); grpc_chttp2_initiate_write( @@ -106,10 +105,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse( } } } else { - bool was_zero = t->flow_control.remote_window <= 0; - grpc_chttp2_flowctl_recv_transport_update(&t->flow_control, - received_update); - bool is_zero = t->flow_control.remote_window <= 0; + bool was_zero = t->flow_control->remote_window() <= 0; + t->flow_control->RecvUpdate(received_update); + bool is_zero = t->flow_control->remote_window() <= 0; if (was_zero && !is_zero) { grpc_chttp2_initiate_write( exec_ctx, t, diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index 17b8c4ab85..0ea50e394b 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -178,24 +178,19 @@ static void evict_entry(grpc_chttp2_hpack_compressor *c) { c->table_elems--; } -/* add an element to the decoder table */ -static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, - grpc_mdelem elem) { - GPR_ASSERT(GRPC_MDELEM_IS_INTERNED(elem)); - - uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem)); - uint32_t value_hash = grpc_slice_hash(GRPC_MDVALUE(elem)); - uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash); +// Reserve space in table for the new element, evict entries if needed. +// Return the new index of the element. Return 0 to indicate not adding to +// table. +static uint32_t prepare_space_for_new_elem(grpc_chttp2_hpack_compressor *c, + size_t elem_size) { uint32_t new_index = c->tail_remote_index + c->table_elems + 1; - size_t elem_size = grpc_mdelem_get_size_in_hpack_table(elem); - GPR_ASSERT(elem_size < 65536); if (elem_size > c->max_table_size) { while (c->table_size > 0) { evict_entry(c); } - return; + return 0; } /* Reserve space for this element in the remote table: if this overflows @@ -209,37 +204,26 @@ static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, c->table_size = (uint16_t)(c->table_size + elem_size); c->table_elems++; - /* Store this element into {entries,indices}_elem */ - if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem)) { - /* already there: update with new index */ - c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - } else if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)], - elem)) { - /* already there (cuckoo): update with new index */ - c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_2(elem_hash)])) { - /* not there, but a free element: add */ - c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem); - c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_3(elem_hash)])) { - /* not there (cuckoo), but a free element: add */ - c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem); - c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] < - c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) { - /* not there: replace oldest */ - GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_2(elem_hash)]); - c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem); - c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - } else { - /* not there: replace oldest */ - GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_3(elem_hash)]); - c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem); - c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; + return new_index; +} + +/* dummy function */ +static void add_nothing(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, grpc_mdelem elem, + size_t elem_size) {} + +// Add a key to the dynamic table. Both key and value will be added to table at +// the decoder. +static void add_key_with_index(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, + grpc_mdelem elem, uint32_t new_index) { + if (new_index == 0) { + return; } - /* do exactly the same for the key (so we can find by that again too) */ + uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem)); + /* Store the key into {entries,indices}_keys */ if (grpc_slice_eq(c->entries_keys[HASH_FRAGMENT_2(key_hash)], GRPC_MDKEY(elem))) { c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index; @@ -272,6 +256,63 @@ static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, } } +/* add an element to the decoder table */ +static void add_elem_with_index(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, + grpc_mdelem elem, uint32_t new_index) { + if (new_index == 0) { + return; + } + GPR_ASSERT(GRPC_MDELEM_IS_INTERNED(elem)); + + uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem)); + uint32_t value_hash = grpc_slice_hash(GRPC_MDVALUE(elem)); + uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash); + + /* Store this element into {entries,indices}_elem */ + if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem)) { + /* already there: update with new index */ + c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; + } else if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)], + elem)) { + /* already there (cuckoo): update with new index */ + c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; + } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_2(elem_hash)])) { + /* not there, but a free element: add */ + c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem); + c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; + } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_3(elem_hash)])) { + /* not there (cuckoo), but a free element: add */ + c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem); + c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; + } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] < + c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) { + /* not there: replace oldest */ + GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_2(elem_hash)]); + c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem); + c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; + } else { + /* not there: replace oldest */ + GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_3(elem_hash)]); + c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem); + c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; + } + + add_key_with_index(exec_ctx, c, elem, new_index); +} + +static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, + grpc_mdelem elem, size_t elem_size) { + uint32_t new_index = prepare_space_for_new_elem(c, elem_size); + add_elem_with_index(exec_ctx, c, elem, new_index); +} + +static void add_key(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, + grpc_mdelem elem, size_t elem_size) { + uint32_t new_index = prepare_space_for_new_elem(c, elem_size); + add_key_with_index(exec_ctx, c, elem, new_index); +} + static void emit_indexed(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, uint32_t elem_index, framer_state *st) { @@ -363,7 +404,9 @@ static void emit_lithdr_noidx(grpc_exec_ctx *exec_ctx, static void emit_lithdr_incidx_v(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, - grpc_mdelem elem, framer_state *st) { + uint32_t unused_index, grpc_mdelem elem, + framer_state *st) { + GPR_ASSERT(unused_index == 0); GRPC_STATS_INC_HPACK_SEND_LITHDR_INCIDX_V(exec_ctx); GRPC_STATS_INC_HPACK_SEND_UNCOMPRESSED(exec_ctx); uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); @@ -385,7 +428,9 @@ static void emit_lithdr_incidx_v(grpc_exec_ctx *exec_ctx, static void emit_lithdr_noidx_v(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, - grpc_mdelem elem, framer_state *st) { + uint32_t unused_index, grpc_mdelem elem, + framer_state *st) { + GPR_ASSERT(unused_index == 0); GRPC_STATS_INC_HPACK_SEND_LITHDR_NOTIDX_V(exec_ctx); GRPC_STATS_INC_HPACK_SEND_UNCOMPRESSED(exec_ctx); uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); @@ -430,9 +475,14 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, "Reserved header (colon-prefixed) happening after regular ones."); } - if (GRPC_TRACER_ON(grpc_http_trace) && !GRPC_MDELEM_IS_INTERNED(elem)) { + if (GRPC_TRACER_ON(grpc_http_trace)) { char *k = grpc_slice_to_c_string(GRPC_MDKEY(elem)); - char *v = grpc_slice_to_c_string(GRPC_MDVALUE(elem)); + char *v = NULL; + if (grpc_is_binary_header(GRPC_MDKEY(elem))) { + v = grpc_dump_slice(GRPC_MDVALUE(elem), GPR_DUMP_HEX); + } else { + v = grpc_slice_to_c_string(GRPC_MDVALUE(elem)); + } gpr_log( GPR_DEBUG, "Encode: '%s: %s', elem_interned=%d [%d], k_interned=%d, v_interned=%d", @@ -442,64 +492,70 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, gpr_free(k); gpr_free(v); } - if (!GRPC_MDELEM_IS_INTERNED(elem)) { - emit_lithdr_noidx_v(exec_ctx, c, elem, st); + + bool elem_interned = GRPC_MDELEM_IS_INTERNED(elem); + bool key_interned = elem_interned || grpc_slice_is_interned(GRPC_MDKEY(elem)); + + // Key is not interned, emit literals. + if (!key_interned) { + emit_lithdr_noidx_v(exec_ctx, c, 0, elem, st); return; } - uint32_t key_hash; - uint32_t value_hash; - uint32_t elem_hash; - size_t decoder_space_usage; - uint32_t indices_key; - int should_add_elem; + uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem)); + uint32_t elem_hash = 0; - key_hash = grpc_slice_hash(GRPC_MDKEY(elem)); - value_hash = grpc_slice_hash(GRPC_MDVALUE(elem)); - elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash); + if (elem_interned) { + uint32_t value_hash = grpc_slice_hash(GRPC_MDVALUE(elem)); + elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash); - inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems); + inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, + c->filter_elems); - /* is this elem currently in the decoders table? */ + /* is this elem currently in the decoders table? */ - if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem) && - c->indices_elems[HASH_FRAGMENT_2(elem_hash)] > c->tail_remote_index) { - /* HIT: complete element (first cuckoo hash) */ - emit_indexed(exec_ctx, c, - dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), st); - return; - } + if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem) && + c->indices_elems[HASH_FRAGMENT_2(elem_hash)] > c->tail_remote_index) { + /* HIT: complete element (first cuckoo hash) */ + emit_indexed(exec_ctx, c, + dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), st); + return; + } - if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)], elem) && - c->indices_elems[HASH_FRAGMENT_3(elem_hash)] > c->tail_remote_index) { - /* HIT: complete element (second cuckoo hash) */ - emit_indexed(exec_ctx, c, - dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), st); - return; + if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)], elem) && + c->indices_elems[HASH_FRAGMENT_3(elem_hash)] > c->tail_remote_index) { + /* HIT: complete element (second cuckoo hash) */ + emit_indexed(exec_ctx, c, + dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), st); + return; + } } + uint32_t indices_key; + /* should this elem be in the table? */ - decoder_space_usage = grpc_mdelem_get_size_in_hpack_table(elem); - should_add_elem = decoder_space_usage < MAX_DECODER_SPACE_USAGE && - c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >= - c->filter_elems_sum / ONE_ON_ADD_PROBABILITY; + size_t decoder_space_usage = + grpc_mdelem_get_size_in_hpack_table(elem, st->use_true_binary_metadata); + bool should_add_elem = elem_interned && + decoder_space_usage < MAX_DECODER_SPACE_USAGE && + c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >= + c->filter_elems_sum / ONE_ON_ADD_PROBABILITY; + void (*maybe_add)(grpc_exec_ctx *, grpc_chttp2_hpack_compressor *, + grpc_mdelem, size_t) = + should_add_elem ? add_elem : add_nothing; + void (*emit)(grpc_exec_ctx *, grpc_chttp2_hpack_compressor *, uint32_t, + grpc_mdelem, framer_state *) = + should_add_elem ? emit_lithdr_incidx : emit_lithdr_noidx; /* no hits for the elem... maybe there's a key? */ - indices_key = c->indices_keys[HASH_FRAGMENT_2(key_hash)]; if (grpc_slice_eq(c->entries_keys[HASH_FRAGMENT_2(key_hash)], GRPC_MDKEY(elem)) && indices_key > c->tail_remote_index) { /* HIT: key (first cuckoo hash) */ - if (should_add_elem) { - emit_lithdr_incidx(exec_ctx, c, dynidx(c, indices_key), elem, st); - add_elem(exec_ctx, c, elem); - return; - } else { - emit_lithdr_noidx(exec_ctx, c, dynidx(c, indices_key), elem, st); - return; - } - GPR_UNREACHABLE_CODE(return ); + emit(exec_ctx, c, dynidx(c, indices_key), elem, st); + maybe_add(exec_ctx, c, elem, decoder_space_usage); + return; } indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)]; @@ -507,28 +563,20 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, GRPC_MDKEY(elem)) && indices_key > c->tail_remote_index) { /* HIT: key (first cuckoo hash) */ - if (should_add_elem) { - emit_lithdr_incidx(exec_ctx, c, dynidx(c, indices_key), elem, st); - add_elem(exec_ctx, c, elem); - return; - } else { - emit_lithdr_noidx(exec_ctx, c, dynidx(c, indices_key), elem, st); - return; - } - GPR_UNREACHABLE_CODE(return ); + emit(exec_ctx, c, dynidx(c, indices_key), elem, st); + maybe_add(exec_ctx, c, elem, decoder_space_usage); + return; } /* no elem, key in the table... fall back to literal emission */ - - if (should_add_elem) { - emit_lithdr_incidx_v(exec_ctx, c, elem, st); - add_elem(exec_ctx, c, elem); - return; - } else { - emit_lithdr_noidx_v(exec_ctx, c, elem, st); - return; - } - GPR_UNREACHABLE_CODE(return ); + bool should_add_key = + !elem_interned && decoder_space_usage < MAX_DECODER_SPACE_USAGE; + emit = (should_add_elem || should_add_key) ? emit_lithdr_incidx_v + : emit_lithdr_noidx_v; + maybe_add = + should_add_elem ? add_elem : (should_add_key ? add_key : add_nothing); + emit(exec_ctx, c, 0, elem, st); + maybe_add(exec_ctx, c, elem, decoder_space_usage); } #define STRLEN_LIT(x) (sizeof(x) - 1) diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index 3d1df19bc3..7c17229122 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -33,6 +33,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/http2_errors.h" @@ -650,9 +651,14 @@ static const uint8_t inverse_base64[256] = { /* emission helpers */ static grpc_error *on_hdr(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_mdelem md, int add_to_table) { - if (GRPC_TRACER_ON(grpc_http_trace) && !GRPC_MDELEM_IS_INTERNED(md)) { + if (GRPC_TRACER_ON(grpc_http_trace)) { char *k = grpc_slice_to_c_string(GRPC_MDKEY(md)); - char *v = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + char *v = NULL; + if (grpc_is_binary_header(GRPC_MDKEY(md))) { + v = grpc_dump_slice(GRPC_MDVALUE(md), GPR_DUMP_HEX); + } else { + v = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + } gpr_log( GPR_DEBUG, "Decode: '%s: %s', elem_interned=%d [%d], k_interned=%d, v_interned=%d", diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index f7a57a6543..9e0796e820 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -22,6 +22,7 @@ #include <assert.h> #include <stdbool.h> +#include "src/core/ext/transport/chttp2/transport/flow_control.h" #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/ext/transport/chttp2/transport/frame_data.h" #include "src/core/ext/transport/chttp2/transport/frame_goaway.h" @@ -38,9 +39,7 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/manual_constructor.h" -#include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" -#include "src/core/lib/transport/pid_controller.h" #include "src/core/lib/transport/transport_impl.h" #ifdef __cplusplus @@ -238,48 +237,6 @@ typedef enum { GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, } grpc_chttp2_keepalive_state; -typedef struct { - /** initial window change. This is tracked as we parse settings frames from - * the remote peer. If there is a positive delta, then we will make all - * streams readable since they may have become unstalled */ - int64_t initial_window_update; - - /** Our bookkeeping for the remote peer's available window */ - int64_t remote_window; - - /** calculating what we should give for local window: - we track the total amount of flow control over initial window size - across all streams: this is data that we want to receive right now (it - has an outstanding read) - and the total amount of flow control under initial window size across all - streams: this is data we've read early - we want to adjust incoming_window such that: - incoming_window = total_over - max(bdp - total_under, 0) */ - int64_t announced_stream_total_over_incoming_window; - int64_t announced_stream_total_under_incoming_window; - - /** This is out window according to what we have sent to our remote peer. The - * difference between this and target window is what we use to decide when - * to send WINDOW_UPDATE frames. */ - int64_t announced_window; - - int32_t target_initial_window_size; - - /** should we probe bdp? */ - bool enable_bdp_probe; - - /* bdp estimation */ - grpc_core::ManualConstructor<grpc_core::BdpEstimator> bdp_estimator; - - /* pid controller */ - bool pid_controller_initialized; - grpc_pid_controller pid_controller; - grpc_millis last_pid_update; - - // pointer back to transport for tracing - const grpc_chttp2_transport *t; -} grpc_chttp2_transport_flowctl; - struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -298,7 +255,7 @@ struct grpc_chttp2_transport { /** is the transport destroying itself? */ uint8_t destroying; /** has the upper layer closed the transport? */ - uint8_t closed; + grpc_error *closed_with_error; /** is there a read request to the endpoint outstanding? */ uint8_t endpoint_reading; @@ -340,7 +297,7 @@ struct grpc_chttp2_transport { /** hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; /** is this a client? */ - uint8_t is_client; + bool is_client; /** data to write next write */ grpc_slice_buffer qbuf; @@ -350,14 +307,14 @@ struct grpc_chttp2_transport { uint32_t write_buffer_size; /** have we seen a goaway */ - uint8_t seen_goaway; + bool seen_goaway; /** have we sent a goaway */ grpc_chttp2_sent_goaway_state sent_goaway_state; /** are the local settings dirty and need to be sent? */ - uint8_t dirtied_local_settings; + bool dirtied_local_settings; /** have local settings been sent? */ - uint8_t sent_local_settings; + bool sent_local_settings; /** bitmask of setting indexes to send out */ uint32_t force_send_settings; /** settings values */ @@ -395,7 +352,12 @@ struct grpc_chttp2_transport { /** parser for goaway frames */ grpc_chttp2_goaway_parser goaway_parser; - grpc_chttp2_transport_flowctl flow_control; + grpc_core::ManualConstructor<grpc_core::chttp2::TransportFlowControl> + flow_control; + /** initial window change. This is tracked as we parse settings frames from + * the remote peer. If there is a positive delta, then we will make all + * streams readable since they may have become unstalled */ + int64_t initial_window_update = 0; /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; @@ -422,6 +384,7 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb *write_cb_pool; /* bdp estimator */ + grpc_closure next_bdp_ping_timer_expired_locked; grpc_closure start_bdp_ping_locked; grpc_closure finish_bdp_ping_locked; @@ -442,6 +405,10 @@ struct grpc_chttp2_transport { /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; + /* next bdp ping timer */ + bool have_next_bdp_ping_timer; + grpc_timer next_bdp_ping_timer; + /* keep-alive ping support */ /** Closure to initialize a keepalive ping */ grpc_closure init_keepalive_ping_locked; @@ -472,25 +439,6 @@ typedef enum { GPRC_METADATA_PUBLISHED_AT_CLOSE } grpc_published_metadata_method; -typedef struct { - /** window available for us to send to peer, over or under the initial window - * size of the transport... ie: - * remote_window = remote_window_delta + transport.initial_window_size */ - int64_t remote_window_delta; - - /** window available for peer to send to us (as a delta on - * transport.initial_window_size) - * local_window = local_window_delta + transport.initial_window_size */ - int64_t local_window_delta; - - /** window available for peer to send to us over this stream that we have - * announced to the peer */ - int64_t announced_window_delta; - - // read only pointer back to stream for data - const grpc_chttp2_stream *s; -} grpc_chttp2_stream_flowctl; - struct grpc_chttp2_stream { grpc_chttp2_transport *t; grpc_stream_refcount *refcount; @@ -584,7 +532,8 @@ struct grpc_chttp2_stream { bool sent_initial_metadata; bool sent_trailing_metadata; - grpc_chttp2_stream_flowctl flow_control; + grpc_core::ManualConstructor<grpc_core::chttp2::StreamFlowControl> + flow_control; grpc_slice_buffer flow_controlled_buffer; @@ -695,74 +644,10 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t, /********* Flow Control ***************/ -// we have sent data on the wire -void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc, - int64_t size); - -// we have received data from the wire -grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc, - int64_t incoming_frame_size); - -// returns an announce if we should send a transport update to our peer, -// else returns zero -uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl *tfc, bool writing_anyway); - -// returns an announce if we should send a stream update to our peer, else -// returns zero -uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( - grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc); - -// we have received a WINDOW_UPDATE frame for a transport -void grpc_chttp2_flowctl_recv_transport_update( - grpc_chttp2_transport_flowctl *tfc, uint32_t size); - -// we have received a WINDOW_UPDATE frame for a stream -void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc, - uint32_t size); - -// the application is asking for a certain amount of bytes -void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc, - size_t max_size_hint, - size_t have_already); - -void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc); - -typedef enum { - // Nothing to be done. - GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0, - // Initiate a write to update the initial window immediately. - GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY, - // Push the flow control update into a send buffer, to be sent - // out the next time a write is initiated. - GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE, -} grpc_chttp2_flowctl_urgency; - -typedef struct { - grpc_chttp2_flowctl_urgency send_stream_update; - grpc_chttp2_flowctl_urgency send_transport_update; - grpc_chttp2_flowctl_urgency send_setting_update; - uint32_t initial_window_size; - uint32_t max_frame_size; - bool need_ping; -} grpc_chttp2_flowctl_action; - -// Reads the flow control data and returns and actionable struct that will tell -// chttp2 exactly what it needs to do -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_flowctl *tfc, - grpc_chttp2_stream_flowctl *sfc); - // Takes in a flow control action and performs all the needed operations. -void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, - grpc_chttp2_flowctl_action action, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +void grpc_chttp2_act_on_flowctl_action( + grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action, + grpc_chttp2_transport *t, grpc_chttp2_stream *s); /********* End of Flow Control ***************/ @@ -796,16 +681,6 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, extern grpc_tracer_flag grpc_http_trace; extern grpc_tracer_flag grpc_flowctl_trace; -#ifndef NDEBUG -#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) \ - if (!(GRPC_TRACER_ON(grpc_flowctl_trace))) \ - ; \ - else \ - stmt -#else -#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) -#endif - #define GRPC_CHTTP2_IF_TRACING(stmt) \ if (!(GRPC_TRACER_ON(grpc_http_trace))) \ ; \ diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 78886b497a..efa5791d3f 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -355,14 +355,15 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); grpc_error *err = GRPC_ERROR_NONE; - err = grpc_chttp2_flowctl_recv_data(&t->flow_control, - s == NULL ? NULL : &s->flow_control, - t->incoming_frame_size); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - s == NULL ? NULL : &s->flow_control), - t, s); + grpc_core::chttp2::FlowControlAction action; + if (s == nullptr) { + err = t->flow_control->RecvData(t->incoming_frame_size); + action = t->flow_control->MakeAction(); + } else { + err = s->flow_control->RecvData(t->incoming_frame_size); + action = s->flow_control->MakeAction(); + } + grpc_chttp2_act_on_flowctl_action(exec_ctx, action, t, s); if (err != GRPC_ERROR_NONE) { goto error_handler; } @@ -434,19 +435,21 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_millis *cached_timeout = static_cast<grpc_millis *>(grpc_mdelem_get_user_data(md, free_timeout)); grpc_millis timeout; - if (cached_timeout == NULL) { - /* not already parsed: parse it now, and store the result away */ - cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis)); - if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), cached_timeout)) { + if (cached_timeout != NULL) { + timeout = *cached_timeout; + } else { + if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), &timeout)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val); gpr_free(val); - *cached_timeout = GRPC_MILLIS_INF_FUTURE; + timeout = GRPC_MILLIS_INF_FUTURE; + } + if (GRPC_MDELEM_IS_INTERNED(md)) { + /* store the result */ + cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis)); + *cached_timeout = timeout; + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } - timeout = *cached_timeout; - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); - } else { - timeout = *cached_timeout; } if (timeout != GRPC_MILLIS_INF_FUTURE) { grpc_chttp2_incoming_metadata_buffer_set_deadline( diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 25c1a5ef05..ff76a5fcdb 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -146,13 +146,13 @@ static void report_stall(grpc_chttp2_transport *t, grpc_chttp2_stream *s, s->flow_controlled_bytes_flowed, t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - t->flow_control.remote_window, + t->flow_control->remote_window(), (uint32_t)GPR_MAX( 0, - s->flow_control.remote_window_delta + + s->flow_control->remote_window_delta() + (int64_t)t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), - s->flow_control.remote_window_delta); + s->flow_control->remote_window_delta()); } static bool stream_ref_if_not_destroyed(gpr_refcount *r) { @@ -216,8 +216,7 @@ class WriteContext { void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { uint32_t transport_announce = - grpc_chttp2_flowctl_maybe_send_transport_update(&t_->flow_control, - t_->outbuf.count > 0); + t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0); if (transport_announce) { grpc_transport_one_way_stats throwaway_stats; grpc_slice_buffer_add( @@ -245,7 +244,8 @@ class WriteContext { void UpdateStreamsNoLongerStalled() { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) { - if (!t_->closed && grpc_chttp2_list_add_writable_stream(t_, s)) { + if (t_->closed_with_error == GRPC_ERROR_NONE && + grpc_chttp2_list_add_writable_stream(t_, s)) { if (!stream_ref_if_not_destroyed(&s->refcount->refs)) { grpc_chttp2_list_remove_writable_stream(t_, s); } @@ -311,7 +311,7 @@ class DataSendContext { uint32_t stream_remote_window() const { return (uint32_t)GPR_MAX( - 0, s_->flow_control.remote_window_delta + + 0, s_->flow_control->remote_window_delta() + (int64_t)t_->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); } @@ -319,7 +319,7 @@ class DataSendContext { uint32_t max_outgoing() const { return (uint32_t)GPR_MIN( t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - GPR_MIN(stream_remote_window(), t_->flow_control.remote_window)); + GPR_MIN(stream_remote_window(), t_->flow_control->remote_window())); } bool AnyOutgoing() const { return max_outgoing() != 0; } @@ -351,8 +351,7 @@ class DataSendContext { grpc_metadata_batch_is_empty(s_->send_trailing_metadata); grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes, is_last_frame_, &s_->stats.outgoing, &t_->outbuf); - grpc_chttp2_flowctl_sent_data(&t_->flow_control, &s_->flow_control, - send_bytes); + s_->flow_control->SentData(send_bytes); if (s_->compressed_data_buffer.length == 0) { s_->sending_bytes += s_->uncompressed_data_size; } @@ -399,8 +398,8 @@ class StreamWriteContext { gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_, t_->is_client ? "CLIENT" : "SERVER", s->id, s->sent_initial_metadata, s->send_initial_metadata != NULL, - (int)(s->flow_control.local_window_delta - - s->flow_control.announced_window_delta))); + (int)(s->flow_control->local_window_delta() - + s->flow_control->announced_window_delta()))); } void FlushInitialMetadata(grpc_exec_ctx *exec_ctx) { @@ -446,8 +445,7 @@ class StreamWriteContext { void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { /* send any window updates */ - uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update( - &t_->flow_control, &s_->flow_control); + const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate(); if (stream_announce == 0) return; grpc_slice_buffer_add( @@ -468,10 +466,10 @@ class StreamWriteContext { DataSendContext data_send_context(write_context_, t_, s_); if (!data_send_context.AnyOutgoing()) { - if (t_->flow_control.remote_window == 0) { + if (t_->flow_control->remote_window() <= 0) { report_stall(t_, s_, "transport"); grpc_chttp2_list_add_stalled_by_transport(t_, s_); - } else if (data_send_context.stream_remote_window() == 0) { + } else if (data_send_context.stream_remote_window() <= 0) { report_stall(t_, s_, "stream"); grpc_chttp2_list_add_stalled_by_stream(t_, s_); } @@ -587,7 +585,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( ctx.FlushQueuedBuffers(exec_ctx); ctx.EnactHpackSettings(exec_ctx); - if (t->flow_control.remote_window > 0) { + if (t->flow_control->remote_window() > 0) { ctx.UpdateStreamsNoLongerStalled(); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index ff1367fb28..97e4f7d72b 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -692,7 +692,7 @@ static void create_grpc_frame(grpc_exec_ctx *exec_ctx, uint8_t *p = (uint8_t *)write_buffer; /* Append 5 byte header */ /* Compressed flag */ - *p++ = (flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0; + *p++ = (uint8_t)((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0); /* Message length */ *p++ = (uint8_t)(length >> 24); *p++ = (uint8_t)(length >> 16); diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 67a8358927..1551f5e988 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -623,7 +623,7 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; } else { - if (other && !other->closed) { + if (!other || !other->closed) { fill_in_metadata(exec_ctx, s, s->send_trailing_md_op->payload->send_trailing_metadata .send_trailing_metadata, @@ -925,7 +925,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, INPROC_LOG(GPR_DEBUG, "Extra initial metadata %p", s); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata"); } else { - if (!other->closed) { + if (!other || !other->closed) { fill_in_metadata( exec_ctx, s, op->payload->send_initial_metadata.send_initial_metadata, @@ -976,7 +976,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, (other->recv_trailing_md_op != NULL))) || (op->send_trailing_metadata && !op->send_message) || (op->recv_initial_metadata && s->to_read_initial_md_filled) || - (op->recv_message && (other && other->send_message_op != NULL)) || + (op->recv_message && other && (other->send_message_op != NULL)) || (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { if (!s->op_closure_scheduled) { GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE); diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index ef6c4a509b..d832dacb69 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -91,8 +91,17 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, tsi_peer_destruct(&peer); } +static int httpcli_ssl_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_httpcli_ssl_channel_security_connector *c1 = + (grpc_httpcli_ssl_channel_security_connector *)sc1; + grpc_httpcli_ssl_channel_security_connector *c2 = + (grpc_httpcli_ssl_channel_security_connector *)sc2; + return strcmp(c1->secure_peer_name, c2->secure_peer_name); +} + static grpc_security_connector_vtable httpcli_ssl_vtable = { - httpcli_ssl_destroy, httpcli_ssl_check_peer}; + httpcli_ssl_destroy, httpcli_ssl_check_peer, httpcli_ssl_cmp}; static grpc_security_status httpcli_ssl_channel_security_connector_create( grpc_exec_ctx *exec_ctx, const char *pem_root_certs, @@ -123,6 +132,10 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( *sc = NULL; return GRPC_SECURITY_ERROR; } + // We don't actually need a channel credentials object in this case, + // but we set it to a non-NULL address so that we don't trigger + // assertions in grpc_channel_security_connector_cmp(). + c->base.channel_creds = (grpc_channel_credentials *)1; c->base.add_handshakers = httpcli_ssl_add_handshakers; *sc = &c->base; return GRPC_SECURITY_OK; diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 59dd8fd2fe..fa6d79cbfc 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -30,6 +30,7 @@ #include <pthread.h> #include <string.h> #include <sys/socket.h> +#include <sys/syscall.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -49,100 +50,97 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/spinlock.h" -/******************************************************************************* - * Polling object - */ -typedef enum { - PO_POLLING_GROUP, - PO_POLLSET_SET, - PO_POLLSET, - PO_FD, - /* ordering is important: we always want to lock pollsets before fds: - this guarantees that using an fd as a pollable is safe */ - PO_EMPTY_POLLABLE, - PO_COUNT -} polling_obj_type; - -typedef struct polling_obj polling_obj; -typedef struct polling_group polling_group; - -struct polling_obj { - gpr_mu mu; - polling_obj_type type; - polling_group *group; - struct polling_obj *next; - struct polling_obj *prev; -}; +// debug aid: create workers on the heap (allows asan to spot +// use-after-destruction) +//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 -struct polling_group { - polling_obj po; - gpr_refcount refs; -}; - -static void po_init(polling_obj *po, polling_obj_type type); -static void po_destroy(polling_obj *po); -static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b); -static int po_cmp(polling_obj *a, polling_obj *b); +#define MAX_EPOLL_EVENTS 100 +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 -static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, - size_t initial_po_count); -static polling_group *pg_ref(polling_group *pg); -static void pg_unref(polling_group *pg); -static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, - polling_group *b); -static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, - polling_obj *po); +#ifndef NDEBUG +grpc_tracer_flag grpc_trace_pollable_refcount = + GRPC_TRACER_INITIALIZER(false, "pollable_refcount"); +#endif /******************************************************************************* * pollable Declarations */ -typedef struct pollable { - polling_obj po; +typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; + +typedef struct pollable pollable; + +/// A pollable is something that can be polled: it has an epoll set to poll on, +/// and a wakeup fd for kicks +/// There are three broad types: +/// - PO_EMPTY - the empty pollable, used before file descriptors are added to +/// a pollset +/// - PO_FD - a pollable containing only one FD - used to optimize single-fd +/// pollsets (which are common with synchronous api usage) +/// - PO_MULTI - a pollable containing many fds +struct pollable { + pollable_type type; // immutable + gpr_refcount refs; + int epfd; grpc_wakeup_fd wakeup; + + // only for type fd... one ref to the owner fd + grpc_fd *owner_fd; + + grpc_pollset_set *pollset_set; + pollable *next; + pollable *prev; + + gpr_mu mu; grpc_pollset_worker *root_worker; -} pollable; -static const char *polling_obj_type_string(polling_obj_type t) { + int event_cursor; + int event_count; + struct epoll_event events[MAX_EPOLL_EVENTS]; +}; + +static const char *pollable_type_string(pollable_type t) { switch (t) { - case PO_POLLING_GROUP: - return "polling_group"; - case PO_POLLSET_SET: - return "pollset_set"; - case PO_POLLSET: + case PO_MULTI: return "pollset"; case PO_FD: return "fd"; - case PO_EMPTY_POLLABLE: - return "empty_pollable"; - case PO_COUNT: - return "<invalid:count>"; + case PO_EMPTY: + return "empty"; } return "<invalid>"; } static char *pollable_desc(pollable *p) { char *out; - gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", - polling_obj_type_string(p->po.type), p->po.group, p->epfd, - p->wakeup.read_fd); + gpr_asprintf(&out, "type=%s epfd=%d wakeup=%d", pollable_type_string(p->type), + p->epfd, p->wakeup.read_fd); return out; } -static pollable g_empty_pollable; +/// Shared empty pollable - used by pollset to poll on until the first fd is +/// added +static pollable *g_empty_pollable; -static void pollable_init(pollable *p, polling_obj_type type); -static void pollable_destroy(pollable *p); -/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable *p); +static grpc_error *pollable_create(pollable_type type, pollable **p); +#ifdef NDEBUG +static pollable *pollable_ref(pollable *p); +static void pollable_unref(pollable *p); +#define POLLABLE_REF(p, r) pollable_ref(p) +#define POLLABLE_UNREF(p, r) pollable_unref(p) +#else +static pollable *pollable_ref(pollable *p, int line, const char *reason); +static void pollable_unref(pollable *p, int line, const char *reason); +#define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r)) +#define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r)) +#endif /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pollable pollable_obj; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -150,11 +148,10 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - /* The fd is either closed or we relinquished control of it. In either - cases, this indicates that the 'fd' on this structure is no longer - valid */ - gpr_mu orphaned_mu; - bool orphaned; + gpr_mu orphan_mu; + + gpr_mu pollable_mu; + pollable *pollable_obj; gpr_atm read_closure; gpr_atm write_closure; @@ -176,47 +173,52 @@ static void fd_global_shutdown(void); * Pollset Declarations */ -typedef struct pollset_worker_link { +typedef struct { grpc_pollset_worker *next; grpc_pollset_worker *prev; -} pollset_worker_link; +} pwlink; -typedef enum { - PWL_POLLSET, - PWL_POLLABLE, - POLLSET_WORKER_LINK_COUNT -} pollset_worker_links; +typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks; struct grpc_pollset_worker { bool kicked; bool initialized_cv; - pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; +#ifndef NDEBUG + // debug aid: which thread started this worker + pid_t originator; +#endif gpr_cv cv; grpc_pollset *pollset; pollable *pollable_obj; -}; -#define MAX_EPOLL_EVENTS 100 -#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 + pwlink links[PWLINK_COUNT]; +}; struct grpc_pollset { - pollable pollable_obj; - pollable *current_pollable_obj; - int kick_alls_pending; + gpr_mu mu; + pollable *active_pollable; bool kicked_without_poller; grpc_closure *shutdown_closure; grpc_pollset_worker *root_worker; - - int event_cursor; - int event_count; - struct epoll_event events[MAX_EPOLL_EVENTS]; + int containing_pollset_set_count; }; /******************************************************************************* * Pollset-set Declarations */ + struct grpc_pollset_set { - polling_obj po; + gpr_refcount refs; + gpr_mu mu; + grpc_pollset_set *parent; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; }; /******************************************************************************* @@ -250,11 +252,6 @@ static bool append_error(grpc_error **composite, grpc_error *error, * becomes a spurious read notification on a reused fd. */ -/* The alarm system needs to be able to wakeup 'some poller' sometimes - * (specifically when a new alarm needs to be triggered earlier than the next - * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a - * case occurs. */ - static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; @@ -282,8 +279,9 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable_obj); - gpr_mu_destroy(&fd->orphaned_mu); + POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); + gpr_mu_destroy(&fd->pollable_mu); + gpr_mu_destroy(&fd->orphan_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -343,12 +341,11 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } - pollable_init(&new_fd->pollable_obj, PO_FD); - + gpr_mu_init(&new_fd->pollable_mu); + gpr_mu_init(&new_fd->orphan_mu); + new_fd->pollable_obj = NULL; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; - gpr_mu_init(&new_fd->orphaned_mu); - new_fd->orphaned = false; grpc_lfev_init(&new_fd->read_closure); grpc_lfev_init(&new_fd->write_closure); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); @@ -369,24 +366,17 @@ static grpc_fd *fd_create(int fd, const char *name) { } static int fd_wrapped_fd(grpc_fd *fd) { - int ret_fd = -1; - gpr_mu_lock(&fd->orphaned_mu); - if (!fd->orphaned) { - ret_fd = fd->fd; - } - gpr_mu_unlock(&fd->orphaned_mu); - - return ret_fd; + int ret_fd = fd->fd; + return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1; } static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, bool already_closed, const char *reason) { bool is_fd_closed = already_closed; - grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->pollable_obj.po.mu); - gpr_mu_lock(&fd->orphaned_mu); + gpr_mu_lock(&fd->orphan_mu); + fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file @@ -398,8 +388,6 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, is_fd_closed = true; } - fd->orphaned = true; - if (!is_fd_closed) { gpr_log(GPR_DEBUG, "TODO: handle fd removal?"); } @@ -408,13 +396,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, to be alive (and not added to freelist) until the end of this function */ REF_BY(fd, 1, reason); - GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE); + + gpr_mu_unlock(&fd->orphan_mu); - gpr_mu_unlock(&fd->orphaned_mu); - gpr_mu_unlock(&fd->pollable_obj.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ - GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); - GRPC_ERROR_UNREF(error); } static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, @@ -451,63 +437,87 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static void pollable_init(pollable *p, polling_obj_type type) { - po_init(&p->po, type); - p->root_worker = NULL; - p->epfd = -1; +static grpc_error *pollable_create(pollable_type type, pollable **p) { + *p = NULL; + + int epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd == -1) { + return GRPC_OS_ERROR(errno, "epoll_create1"); + } + *p = (pollable *)gpr_malloc(sizeof(**p)); + grpc_error *err = grpc_wakeup_fd_init(&(*p)->wakeup); + if (err != GRPC_ERROR_NONE) { + close(epfd); + gpr_free(*p); + *p = NULL; + return err; + } + struct epoll_event ev; + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = (void *)(1 | (intptr_t) & (*p)->wakeup); + if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) { + err = GRPC_OS_ERROR(errno, "epoll_ctl"); + close(epfd); + grpc_wakeup_fd_destroy(&(*p)->wakeup); + gpr_free(*p); + *p = NULL; + return err; + } + + (*p)->type = type; + gpr_ref_init(&(*p)->refs, 1); + gpr_mu_init(&(*p)->mu); + (*p)->epfd = epfd; + (*p)->owner_fd = NULL; + (*p)->pollset_set = NULL; + (*p)->next = (*p)->prev = *p; + (*p)->root_worker = NULL; + (*p)->event_cursor = 0; + (*p)->event_count = 0; + return GRPC_ERROR_NONE; } -static void pollable_destroy(pollable *p) { - po_destroy(&p->po); - if (p->epfd != -1) { - close(p->epfd); - grpc_wakeup_fd_destroy(&p->wakeup); +#ifdef NDEBUG +static pollable *pollable_ref(pollable *p) { +#else +static pollable *pollable_ref(pollable *p, int line, const char *reason) { + if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { + int r = (int)gpr_atm_no_barrier_load(&p->refs.count); + gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, + "POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason); } +#endif + gpr_ref(&p->refs); + return p; } -/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable *p) { - if (p->epfd == -1) { - int new_epfd = epoll_create1(EPOLL_CLOEXEC); - if (new_epfd < 0) { - return GRPC_OS_ERROR(errno, "epoll_create1"); - } - grpc_error *err = grpc_wakeup_fd_init(&p->wakeup); - if (err != GRPC_ERROR_NONE) { - close(new_epfd); - return err; - } - struct epoll_event ev; - ev.events = (uint32_t)(EPOLLIN | EPOLLET); - ev.data.ptr = (void *)(1 | (intptr_t)&p->wakeup); - if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) { - err = GRPC_OS_ERROR(errno, "epoll_ctl"); - close(new_epfd); - grpc_wakeup_fd_destroy(&p->wakeup); - return err; - } - - p->epfd = new_epfd; +#ifdef NDEBUG +static void pollable_unref(pollable *p) { +#else +static void pollable_unref(pollable *p, int line, const char *reason) { + if (p == NULL) return; + if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { + int r = (int)gpr_atm_no_barrier_load(&p->refs.count); + gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, + "POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason); + } +#endif + if (p != NULL && gpr_unref(&p->refs)) { + close(p->epfd); + grpc_wakeup_fd_destroy(&p->wakeup); + gpr_free(p); } - return GRPC_ERROR_NONE; } -/* pollable must be materialized */ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; - GPR_ASSERT(epfd != -1); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } - gpr_mu_lock(&fd->orphaned_mu); - if (fd->orphaned) { - gpr_mu_unlock(&fd->orphaned_mu); - return GRPC_ERROR_NONE; - } struct epoll_event ev_fd; ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); ev_fd.data.ptr = fd; @@ -519,7 +529,6 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); } } - gpr_mu_unlock(&fd->orphaned_mu); return error; } @@ -535,128 +544,67 @@ GPR_TLS_DECL(g_current_thread_worker); static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); - pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE); - return GRPC_ERROR_NONE; + return pollable_create(PO_EMPTY, &g_empty_pollable); } static void pollset_global_shutdown(void) { - pollable_destroy(&g_empty_pollable); + POLLABLE_UNREF(g_empty_pollable, "g_empty_pollable"); gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); } +/* pollset->mu must be held while calling this function */ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) " + "rw=%p (target:NULL) cpsc=%d (target:0)", + pollset, pollset->active_pollable, pollset->shutdown_closure, + pollset->root_worker, pollset->containing_pollset_set_count); + } if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && - pollset->kick_alls_pending == 0) { + pollset->containing_pollset_set_count == 0) { GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } } -static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_unused) { - grpc_error *error = GRPC_ERROR_NONE; - grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable_obj.po.mu); - if (pollset->root_worker != NULL) { - grpc_pollset_worker *worker = pollset->root_worker; - do { - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&worker->pollable_obj->po.mu); - } - if (worker->initialized_cv && worker != pollset->root_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable_obj, - worker->pollable_obj); - } - worker->kicked = true; - gpr_cv_signal(&worker->cv); - } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable_obj, - worker->pollable_obj); - } - append_error(&error, - grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), - "pollset_shutdown"); - } - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker->pollable_obj->po.mu); - } - - worker = worker->links[PWL_POLLSET].next; - } while (worker != pollset->root_worker); - } - pollset->kick_alls_pending--; - pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - GRPC_LOG_IF_ERROR("kick_all", error); -} - -static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollset->kick_alls_pending++; - GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); -} - -static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, - grpc_pollset_worker *specific_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p kick %p tls_pollset=%p tls_worker=%p " - "root_worker=(pollset:%p pollable:%p)", - p, specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset), - (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker, - p->root_worker); - } - if (specific_worker == NULL) { - if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { - if (pollset->root_worker == NULL) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p); - } - pollset->kicked_without_poller = true; - return GRPC_ERROR_NONE; - } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p); - } - grpc_error *err = pollable_materialize(p); - if (err != GRPC_ERROR_NONE) return err; - return grpc_wakeup_fd_wakeup(&p->wakeup); - } - } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p); - } - return GRPC_ERROR_NONE; - } - } else if (specific_worker->kicked) { +/* pollset->mu must be held before calling this function, + * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be + * held */ +static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx, + grpc_pollset_worker *specific_worker) { + pollable *p = specific_worker->pollable_obj; + grpc_core::mu_guard lock(&p->mu); + GRPC_STATS_INC_POLLSET_KICK(exec_ctx); + GPR_ASSERT(specific_worker != NULL); + if (specific_worker->kicked) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); } + GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); return GRPC_ERROR_NONE; - } else if (gpr_tls_get(&g_current_thread_worker) == - (intptr_t)specific_worker) { + } + if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); } + GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); specific_worker->kicked = true; return GRPC_ERROR_NONE; - } else if (specific_worker == p->root_worker) { + } + if (specific_worker == p->root_worker) { + GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); } - grpc_error *err = pollable_materialize(p); - if (err != GRPC_ERROR_NONE) return err; specific_worker->kicked = true; - return grpc_wakeup_fd_wakeup(&p->wakeup); - } else { + grpc_error *error = grpc_wakeup_fd_wakeup(&p->wakeup); + return error; + } + if (specific_worker->initialized_cv) { + GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); } @@ -664,30 +612,79 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } + // we can get here during end_worker after removing specific_worker from the + // pollable list but before removing it from the pollset list + return GRPC_ERROR_NONE; } -/* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - pollable *p = pollset->current_pollable_obj; - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (p != &pollset->pollable_obj) { - gpr_mu_lock(&p->po.mu); + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p", + pollset, specific_worker, + (void *)gpr_tls_get(&g_current_thread_pollset), + (void *)gpr_tls_get(&g_current_thread_worker), + pollset->root_worker); } - grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); - if (p != &pollset->pollable_obj) { - gpr_mu_unlock(&p->po.mu); + if (specific_worker == NULL) { + if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { + if (pollset->root_worker == NULL) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset); + } + GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx); + pollset->kicked_without_poller = true; + return GRPC_ERROR_NONE; + } else { + // We've been asked to kick a poller, but we haven't been told which one + // ... any will do + // We look at the pollset worker list because: + // 1. the pollable list may include workers from other pollers, so we'd + // need to do an O(N) search + // 2. we'd additionally need to take the pollable lock, which we've so + // far avoided + // Now, we would prefer to wake a poller in cv_wait, and not in + // epoll_wait (since the latter would imply the need to do an additional + // wakeup) + // We know that if a worker is at the root of a pollable, it's (likely) + // also the root of a pollset, and we know that if a worker is NOT at + // the root of a pollset, it's (likely) not at the root of a pollable, + // so we take our chances and choose the SECOND worker enqueued against + // the pollset as a worker that's likely to be in cv_wait + return kick_one_worker( + exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next); + } + } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset); + } + GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); + return GRPC_ERROR_NONE; + } + } else { + return kick_one_worker(exec_ctx, specific_worker); + } +} + +static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + grpc_error *error = GRPC_ERROR_NONE; + const char *err_desc = "pollset_kick_all"; + grpc_pollset_worker *w = pollset->root_worker; + if (w != NULL) { + do { + append_error(&error, kick_one_worker(exec_ctx, w), err_desc); + w = w->links[PWLINK_POLLSET].next; + } while (w != pollset->root_worker); } return error; } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pollable_init(&pollset->pollable_obj, PO_POLLSET); - pollset->current_pollable_obj = &g_empty_pollable; - pollset->kicked_without_poller = false; - pollset->shutdown_closure = NULL; - pollset->root_worker = NULL; - *mu = &pollset->pollable_obj.po.mu; + gpr_mu_init(&pollset->mu); + pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); + *mu = &pollset->mu; } static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, @@ -719,12 +716,29 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } -static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { +static grpc_error *fd_get_or_become_pollable(grpc_fd *fd, pollable **p) { + gpr_mu_lock(&fd->pollable_mu); grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "fd_become_pollable"; - if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) { - append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc); + static const char *err_desc = "fd_get_or_become_pollable"; + if (fd->pollable_obj == NULL) { + if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj), + err_desc)) { + fd->pollable_obj->owner_fd = fd; + if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd), + err_desc)) { + POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); + fd->pollable_obj = NULL; + } + } } + if (error == GRPC_ERROR_NONE) { + GPR_ASSERT(fd->pollable_obj != NULL); + *p = POLLABLE_REF(fd->pollable_obj, "pollset"); + } else { + GPR_ASSERT(fd->pollable_obj == NULL); + *p = NULL; + } + gpr_mu_unlock(&fd->pollable_mu); return error; } @@ -733,23 +747,20 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; - pollset_kick_all(exec_ctx, pollset); + GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset)); pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { - return p != &g_empty_pollable && p != &pollset->pollable_obj; -} - -static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, bool drain) { +static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + pollable *pollable_obj, bool drain) { static const char *err_desc = "pollset_process_events"; grpc_error *error = GRPC_ERROR_NONE; for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && - pollset->event_cursor != pollset->event_count; + pollable_obj->event_cursor != pollable_obj->event_count; i++) { - int n = pollset->event_cursor++; - struct epoll_event *ev = &pollset->events[n]; + int n = pollable_obj->event_cursor++; + struct epoll_event *ev = &pollable_obj->events[n]; void *data_ptr = ev->data.ptr; if (1 & (intptr_t)data_ptr) { if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -784,22 +795,17 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable_destroy(&pollset->pollable_obj); - if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, - "pollset_pollable"); - } - GRPC_LOG_IF_ERROR("pollset_process_events", - pollset_process_events(exec_ctx, pollset, true)); + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = NULL; } -static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable *p, grpc_millis deadline) { +static grpc_error *pollable_epoll(grpc_exec_ctx *exec_ctx, pollable *p, + grpc_millis deadline) { int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { char *desc = pollable_desc(p); - gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout); + gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout); gpr_free(desc); } @@ -809,7 +815,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int r; do { GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); - r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); + r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); @@ -818,24 +824,24 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); + gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r); } - pollset->event_cursor = 0; - pollset->event_count = r; + p->event_cursor = 0; + p->event_count = r; return GRPC_ERROR_NONE; } /* Return true if first in list */ -static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link, - grpc_pollset_worker *worker) { - if (*root == NULL) { - *root = worker; +static bool worker_insert(grpc_pollset_worker **root_worker, + grpc_pollset_worker *worker, pwlinks link) { + if (*root_worker == NULL) { + *root_worker = worker; worker->links[link].next = worker->links[link].prev = worker; return true; } else { - worker->links[link].next = *root; + worker->links[link].next = *root_worker; worker->links[link].prev = worker->links[link].next->links[link].prev; worker->links[link].next->links[link].prev = worker; worker->links[link].prev->links[link].next = worker; @@ -843,26 +849,26 @@ static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link, } } -/* Return true if last in list */ -typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result; +/* returns the new root IFF the root changed */ +typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result; -static worker_remove_result worker_remove(grpc_pollset_worker **root, - pollset_worker_links link, - grpc_pollset_worker *worker) { - if (worker == *root) { +static worker_remove_result worker_remove(grpc_pollset_worker **root_worker, + grpc_pollset_worker *worker, + pwlinks link) { + if (worker == *root_worker) { if (worker == worker->links[link].next) { - *root = NULL; - return EMPTIED; + *root_worker = NULL; + return WRR_EMPTIED; } else { - *root = worker->links[link].next; + *root_worker = worker->links[link].next; worker->links[link].prev->links[link].next = worker->links[link].next; worker->links[link].next->links[link].prev = worker->links[link].prev; - return NEW_ROOT; + return WRR_NEW_ROOT; } } else { worker->links[link].prev->links[link].next = worker->links[link].next; worker->links[link].next->links[link].prev = worker->links[link].prev; - return REMOVED; + return WRR_REMOVED; } } @@ -871,25 +877,20 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, grpc_millis deadline) { - bool do_poll = true; + bool do_poll = (pollset->shutdown_closure == nullptr); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; worker->pollset = pollset; - worker->pollable_obj = pollset->current_pollable_obj; - - if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { - REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); - } - - worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE, - worker)) { + worker->pollable_obj = + POLLABLE_REF(pollset->active_pollable, "pollset_worker"); + worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET); + gpr_mu_lock(&worker->pollable_obj->mu); + if (!worker_insert(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - } + gpr_mu_unlock(&pollset->mu); if (GRPC_TRACER_ON(grpc_polling_trace) && worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, @@ -897,7 +898,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, poll_deadline_to_millis_timeout(exec_ctx, deadline)); } while (do_poll && worker->pollable_obj->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, @@ -916,158 +917,232 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker->pollable_obj, worker); } } - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker->pollable_obj->po.mu); - gpr_mu_lock(&pollset->pollable_obj.po.mu); - gpr_mu_lock(&worker->pollable_obj->po.mu); - } grpc_exec_ctx_invalidate_now(exec_ctx); + } else { + gpr_mu_unlock(&pollset->mu); } + gpr_mu_unlock(&worker->pollable_obj->mu); - return do_poll && pollset->shutdown_closure == NULL && - pollset->current_pollable_obj == worker->pollable_obj; + return do_poll; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { - if (NEW_ROOT == - worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&worker->pollable_obj->root_worker->cv); + gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&worker->pollable_obj->mu); + switch (worker_remove(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE)) { + case WRR_NEW_ROOT: { + // wakeup new poller + grpc_pollset_worker *new_root = worker->pollable_obj->root_worker; + GPR_ASSERT(new_root->initialized_cv); + gpr_cv_signal(&new_root->cv); + break; + } + case WRR_EMPTIED: + if (pollset->active_pollable != worker->pollable_obj) { + // pollable no longer being polled: flush events + pollable_process_events(exec_ctx, pollset, worker->pollable_obj, true); + } + break; + case WRR_REMOVED: + break; + } + gpr_mu_unlock(&worker->pollable_obj->mu); + POLLABLE_UNREF(worker->pollable_obj, "pollset_worker"); + if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) == + WRR_EMPTIED) { + pollset_maybe_finish_shutdown(exec_ctx, pollset); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { - UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll"); - } - if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { - pollset_maybe_finish_shutdown(exec_ctx, pollset); - } } -/* pollset->po.mu lock must be held by the caller before calling this. +#ifndef NDEBUG +static long gettid(void) { return syscall(__NR_gettid); } +#endif + +/* pollset->mu lock must be held by the caller before calling this. The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, grpc_millis deadline) { +#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP + grpc_pollset_worker *worker = + (grpc_pollset_worker *)gpr_malloc(sizeof(*worker)); +#define WORKER_PTR (worker) +#else grpc_pollset_worker worker; - if (0 && GRPC_TRACER_ON(grpc_polling_trace)) { +#define WORKER_PTR (&worker) +#endif +#ifndef NDEBUG + WORKER_PTR->originator = gettid(); +#endif + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR - " deadline=%" PRIdPTR " kwp=%d root_worker=%p", - pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline, - pollset->kicked_without_poller, pollset->root_worker); + " deadline=%" PRIdPTR " kwp=%d pollable=%p", + pollset, worker_hdl, WORKER_PTR, grpc_exec_ctx_now(exec_ctx), + deadline, pollset->kicked_without_poller, pollset->active_pollable); } - grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; + grpc_error *error = GRPC_ERROR_NONE; if (pollset->kicked_without_poller) { pollset->kicked_without_poller = false; - return GRPC_ERROR_NONE; - } - if (pollset->current_pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&pollset->current_pollable_obj->po.mu); - } - if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) { - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); - gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - GPR_ASSERT(!pollset->shutdown_closure); - append_error(&error, pollable_materialize(worker.pollable_obj), err_desc); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker.pollable_obj->po.mu); - } - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - if (pollset->event_cursor == pollset->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, - deadline), + } else { + if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); + gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); + if (WORKER_PTR->pollable_obj->event_cursor == + WORKER_PTR->pollable_obj->event_count) { + append_error(&error, pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj, + deadline), + err_desc); + } + append_error(&error, + pollable_process_events(exec_ctx, pollset, + WORKER_PTR->pollable_obj, false), err_desc); + grpc_exec_ctx_flush(exec_ctx); + gpr_tls_set(&g_current_thread_pollset, 0); + gpr_tls_set(&g_current_thread_worker, 0); } - append_error(&error, pollset_process_events(exec_ctx, pollset, false), - err_desc); - gpr_mu_lock(&pollset->pollable_obj.po.mu); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&worker.pollable_obj->po.mu); - } - gpr_tls_set(&g_current_thread_pollset, 0); - gpr_tls_set(&g_current_thread_worker, 0); - pollset_maybe_finish_shutdown(exec_ctx, pollset); + end_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl); } - end_worker(exec_ctx, pollset, &worker, worker_hdl); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker.pollable_obj->po.mu); - } - if (grpc_exec_ctx_has_work(exec_ctx)) { - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->pollable_obj.po.mu); +#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP + gpr_free(worker); +#endif +#undef WORKER_PTR + return error; +} + +static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { + static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd"; + grpc_error *error = GRPC_ERROR_NONE; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p add fd %p (%d); transition pollable from empty to fd", + pollset, fd, fd->fd); } + append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable), + err_desc); return error; } -static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_fd *fd = (grpc_fd *)arg; - UNREF_BY(exec_ctx, fd, 2, "pollset_pollable"); +static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) { + static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi"; + grpc_error *error = GRPC_ERROR_NONE; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log( + GPR_DEBUG, + "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller", + pollset, and_add_fd, and_add_fd ? and_add_fd->fd : -1, + pollset->active_pollable->owner_fd); + } + append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); + grpc_fd *initial_fd = pollset->active_pollable->owner_fd; + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = NULL; + if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable), + err_desc)) { + append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd), + err_desc); + if (and_add_fd != NULL) { + append_error(&error, + pollable_add_fd(pollset->active_pollable, and_add_fd), + err_desc); + } + } + return error; } /* expects pollsets locked, flag whether fd is locked or not */ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd *fd, - bool fd_locked) { - static const char *err_desc = "pollset_add_fd"; + grpc_pollset *pollset, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; - if (pollset->current_pollable_obj == &g_empty_pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p add fd %p; transition pollable from empty to fd", pollset, - fd); - } - /* empty pollable --> single fd pollable */ - pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable_obj = &fd->pollable_obj; - if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); - append_error(&error, fd_become_pollable_locked(fd), err_desc); - if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); - REF_BY(fd, 2, "pollset_pollable"); - } else if (pollset->current_pollable_obj == &pollset->pollable_obj) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); - } - append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), - err_desc); - } else if (pollset->current_pollable_obj != &fd->pollable_obj) { - grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p add fd %p; transition pollable from fd %p to multipoller", - pollset, fd, had_fd); - } - /* Introduce a spurious completion. - If we do not, then it may be that the fd-specific epoll set consumed - a completion without being polled, leading to a missed edge going up. */ - grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); - grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); - pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable_obj = &pollset->pollable_obj; - if (append_error(&error, pollable_materialize(&pollset->pollable_obj), - err_desc)) { - pollable_add_fd(&pollset->pollable_obj, had_fd); - pollable_add_fd(&pollset->pollable_obj, fd); - } - GRPC_CLOSURE_SCHED(exec_ctx, - GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + pollable *po_at_start = + POLLABLE_REF(pollset->active_pollable, "pollset_add_fd"); + switch (pollset->active_pollable->type) { + case PO_EMPTY: + /* empty pollable --> single fd pollable */ + error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx, + pollset, fd); + break; + case PO_FD: + gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); + if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & + 1) == 0) { + error = pollset_transition_pollable_from_empty_to_fd_locked( + exec_ctx, pollset, fd); + } else { + /* fd --> multipoller */ + error = pollset_transition_pollable_from_fd_to_multi_locked( + exec_ctx, pollset, fd); + } + gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); + break; + case PO_MULTI: + error = pollable_add_fd(pollset->active_pollable, fd); + break; + } + if (error != GRPC_ERROR_NONE) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = po_at_start; + } else { + POLLABLE_UNREF(po_at_start, "pollset_add_fd"); + } + return error; +} + +static grpc_error *pollset_as_multipollable_locked(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + pollable **pollable_obj) { + grpc_error *error = GRPC_ERROR_NONE; + pollable *po_at_start = + POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable"); + switch (pollset->active_pollable->type) { + case PO_EMPTY: + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + error = pollable_create(PO_MULTI, &pollset->active_pollable); + break; + case PO_FD: + gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); + if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & + 1) == 0) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + error = pollable_create(PO_MULTI, &pollset->active_pollable); + } else { + error = pollset_transition_pollable_from_fd_to_multi_locked( + exec_ctx, pollset, NULL); + } + gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); + break; + case PO_MULTI: + break; + } + if (error != GRPC_ERROR_NONE) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = po_at_start; + *pollable_obj = NULL; + } else { + *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set"); + POLLABLE_UNREF(po_at_start, "pollset_as_multipollable"); } return error; } static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->pollable_obj.po.mu); - grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); - gpr_mu_unlock(&pollset->pollable_obj.po.mu); + gpr_mu_lock(&pollset->mu); + grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd); + gpr_mu_unlock(&pollset->mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1075,301 +1150,255 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * Pollset-set Definitions */ +static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) { + gpr_mu_lock(&pss->mu); + while (pss->parent != NULL) { + gpr_mu_unlock(&pss->mu); + pss = pss->parent; + gpr_mu_lock(&pss->mu); + } + return pss; +} + static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss)); - po_init(&pss->po, PO_POLLSET_SET); + gpr_mu_init(&pss->mu); + gpr_ref_init(&pss->refs, 1); return pss; } -static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss) { - po_destroy(&pss->po); +static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { + if (pss == NULL) return; + if (!gpr_unref(&pss->refs)) return; + pollset_set_unref(exec_ctx, pss->parent); + gpr_mu_destroy(&pss->mu); + for (size_t i = 0; i < pss->pollset_count; i++) { + gpr_mu_lock(&pss->pollsets[i]->mu); + if (0 == --pss->pollsets[i]->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]); + } + gpr_mu_unlock(&pss->pollsets[i]->mu); + } + for (size_t i = 0; i < pss->fd_count; i++) { + UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set"); + } + gpr_free(pss->pollsets); + gpr_free(pss->fds); gpr_free(pss); } static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - po_join(exec_ctx, &pss->po, &fd->pollable_obj.po); + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd); + } + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_fd"; + pss = pss_lock_adam(pss); + for (size_t i = 0; i < pss->pollset_count; i++) { + append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd), + err_desc); + } + if (pss->fd_count == pss->fd_capacity) { + pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8); + pss->fds = + (grpc_fd **)gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds)); + } + REF_BY(fd, 2, "pollset_set"); + pss->fds[pss->fd_count++] = fd; + gpr_mu_unlock(&pss->mu); + + GRPC_LOG_IF_ERROR(err_desc, error); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - grpc_fd *fd) {} - -static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) { - po_join(exec_ctx, &pss->po, &ps->pollable_obj.po); + grpc_fd *fd) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd); + } + pss = pss_lock_adam(pss); + size_t i; + for (i = 0; i < pss->fd_count; i++) { + if (pss->fds[i] == fd) { + UNREF_BY(exec_ctx, fd, 2, "pollset_set"); + break; + } + } + GPR_ASSERT(i != pss->fd_count); + for (; i < pss->fd_count - 1; i++) { + pss->fds[i] = pss->fds[i + 1]; + } + pss->fd_count--; + gpr_mu_unlock(&pss->mu); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) {} - -static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) { - po_join(exec_ctx, &bag->po, &item->po); -} - -static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} - -static void po_init(polling_obj *po, polling_obj_type type) { - gpr_mu_init(&po->mu); - po->type = type; - po->group = NULL; - po->next = po; - po->prev = po; -} - -static polling_group *pg_lock_latest(polling_group *pg) { - /* assumes pg unlocked; consumes ref, returns ref */ - gpr_mu_lock(&pg->po.mu); - while (pg->po.group != NULL) { - polling_group *new_pg = pg_ref(pg->po.group); - gpr_mu_unlock(&pg->po.mu); - pg_unref(pg); - pg = new_pg; - gpr_mu_lock(&pg->po.mu); - } - return pg; -} - -static void po_destroy(polling_obj *po) { - if (po->group != NULL) { - polling_group *pg = pg_lock_latest(po->group); - po->prev->next = po->next; - po->next->prev = po->prev; - gpr_mu_unlock(&pg->po.mu); - pg_unref(pg); + grpc_pollset_set *pss, grpc_pollset *ps) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps); } - gpr_mu_destroy(&po->mu); -} - -static polling_group *pg_ref(polling_group *pg) { - gpr_ref(&pg->refs); - return pg; -} - -static void pg_unref(polling_group *pg) { - if (gpr_unref(&pg->refs)) { - po_destroy(&pg->po); - gpr_free(pg); + pss = pss_lock_adam(pss); + size_t i; + for (i = 0; i < pss->pollset_count; i++) { + if (pss->pollsets[i] == ps) { + break; + } } -} - -static int po_cmp(polling_obj *a, polling_obj *b) { - if (a == b) return 0; - if (a->type < b->type) return -1; - if (a->type > b->type) return 1; - if (a < b) return -1; - assert(a > b); - return 1; -} - -static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { - switch (po_cmp(a, b)) { - case 0: - return; - case 1: - GPR_SWAP(polling_obj *, a, b); - /* fall through */ - case -1: - gpr_mu_lock(&a->mu); - gpr_mu_lock(&b->mu); - - if (a->group == NULL) { - if (b->group == NULL) { - polling_obj *initial_po[] = {a, b}; - pg_create(exec_ctx, initial_po, GPR_ARRAY_SIZE(initial_po)); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - } else { - polling_group *b_group = pg_ref(b->group); - gpr_mu_unlock(&b->mu); - gpr_mu_unlock(&a->mu); - pg_join(exec_ctx, b_group, a); - } - } else if (b->group == NULL) { - polling_group *a_group = pg_ref(a->group); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - pg_join(exec_ctx, a_group, b); - } else if (a->group == b->group) { - /* nothing to do */ - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - } else { - polling_group *a_group = pg_ref(a->group); - polling_group *b_group = pg_ref(b->group); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - pg_merge(exec_ctx, a_group, b_group); - } + GPR_ASSERT(i != pss->pollset_count); + for (; i < pss->pollset_count - 1; i++) { + pss->pollsets[i] = pss->pollsets[i + 1]; } -} - -static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { - if (a->type == PO_FD && b->type == PO_POLLSET) { - pollset_add_fd_locked(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a, true); - } else if (a->type == PO_POLLSET && b->type == PO_FD) { - pollset_add_fd_locked(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b, true); + pss->pollset_count--; + gpr_mu_unlock(&pss->mu); + gpr_mu_lock(&ps->mu); + if (0 == --ps->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, ps); } + gpr_mu_unlock(&ps->mu); } -static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from, - polling_group *to) { - for (polling_obj *a = from->po.next; a != &from->po; a = a->next) { - for (polling_obj *b = to->po.next; b != &to->po; b = b->next) { - if (po_cmp(a, b) < 0) { - gpr_mu_lock(&a->mu); - gpr_mu_lock(&b->mu); - } else { - GPR_ASSERT(po_cmp(a, b) != 0); - gpr_mu_lock(&b->mu); - gpr_mu_lock(&a->mu); +// add all fds to pollables, and output a new array of unorphaned out_fds +// assumes pollsets are multipollable +static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, + size_t fd_count, grpc_pollset **pollsets, + size_t pollset_count, + const char *err_desc, grpc_fd **out_fds, + size_t *out_fd_count) { + grpc_error *error = GRPC_ERROR_NONE; + for (size_t i = 0; i < fd_count; i++) { + gpr_mu_lock(&fds[i]->orphan_mu); + if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) { + gpr_mu_unlock(&fds[i]->orphan_mu); + UNREF_BY(exec_ctx, fds[i], 2, "pollset_set"); + } else { + for (size_t j = 0; j < pollset_count; j++) { + append_error(&error, + pollable_add_fd(pollsets[j]->active_pollable, fds[i]), + err_desc); } - pg_notify(exec_ctx, a, b); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); + gpr_mu_unlock(&fds[i]->orphan_mu); + out_fds[(*out_fd_count)++] = fds[i]; } } + return error; } -static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, - size_t initial_po_count) { - /* assumes all polling objects in initial_po are locked */ - polling_group *pg = (polling_group *)gpr_malloc(sizeof(*pg)); - po_init(&pg->po, PO_POLLING_GROUP); - gpr_ref_init(&pg->refs, (int)initial_po_count); - for (size_t i = 0; i < initial_po_count; i++) { - GPR_ASSERT(initial_po[i]->group == NULL); - initial_po[i]->group = pg; - } - for (size_t i = 1; i < initial_po_count; i++) { - initial_po[i]->prev = initial_po[i - 1]; - } - for (size_t i = 0; i < initial_po_count - 1; i++) { - initial_po[i]->next = initial_po[i + 1]; - } - initial_po[0]->prev = &pg->po; - initial_po[initial_po_count - 1]->next = &pg->po; - pg->po.next = initial_po[0]; - pg->po.prev = initial_po[initial_po_count - 1]; - for (size_t i = 1; i < initial_po_count; i++) { - for (size_t j = 0; j < i; j++) { - pg_notify(exec_ctx, initial_po[i], initial_po[j]); - } +static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pss, grpc_pollset *ps) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps); } -} - -static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, - polling_obj *po) { - /* assumes neither pg nor po are locked; consumes one ref to pg */ - pg = pg_lock_latest(pg); - /* pg locked */ - for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */; - existing != &pg->po; existing = existing->next) { - if (po_cmp(po, existing) < 0) { - gpr_mu_lock(&po->mu); - gpr_mu_lock(&existing->mu); - } else { - GPR_ASSERT(po_cmp(po, existing) != 0); - gpr_mu_lock(&existing->mu); - gpr_mu_lock(&po->mu); - } - /* pg, po, existing locked */ - if (po->group != NULL) { - gpr_mu_unlock(&pg->po.mu); - polling_group *po_group = pg_ref(po->group); - gpr_mu_unlock(&po->mu); - gpr_mu_unlock(&existing->mu); - pg_merge(exec_ctx, pg, po_group); - /* early exit: polling obj picked up a group during joining: we needed - to do a full merge */ - return; - } - pg_notify(exec_ctx, po, existing); - gpr_mu_unlock(&po->mu); - gpr_mu_unlock(&existing->mu); - } - gpr_mu_lock(&po->mu); - if (po->group != NULL) { - gpr_mu_unlock(&pg->po.mu); - polling_group *po_group = pg_ref(po->group); - gpr_mu_unlock(&po->mu); - pg_merge(exec_ctx, pg, po_group); - /* early exit: polling obj picked up a group during joining: we needed - to do a full merge */ + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_pollset"; + pollable *pollable_obj = NULL; + gpr_mu_lock(&ps->mu); + if (!GRPC_LOG_IF_ERROR(err_desc, pollset_as_multipollable_locked( + exec_ctx, ps, &pollable_obj))) { + GPR_ASSERT(pollable_obj == NULL); + gpr_mu_unlock(&ps->mu); return; } - po->group = pg; - po->next = &pg->po; - po->prev = pg->po.prev; - po->prev->next = po->next->prev = po; - gpr_mu_unlock(&pg->po.mu); - gpr_mu_unlock(&po->mu); + ps->containing_pollset_set_count++; + gpr_mu_unlock(&ps->mu); + pss = pss_lock_adam(pss); + size_t initial_fd_count = pss->fd_count; + pss->fd_count = 0; + append_error(&error, + add_fds_to_pollsets(exec_ctx, pss->fds, initial_fd_count, &ps, 1, + err_desc, pss->fds, &pss->fd_count), + err_desc); + if (pss->pollset_count == pss->pollset_capacity) { + pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8); + pss->pollsets = (grpc_pollset **)gpr_realloc( + pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets)); + } + pss->pollsets[pss->pollset_count++] = ps; + gpr_mu_unlock(&pss->mu); + POLLABLE_UNREF(pollable_obj, "pollset_set"); + + GRPC_LOG_IF_ERROR(err_desc, error); } -static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, - polling_group *b) { +static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *a, + grpc_pollset_set *b) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b); + } + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_fd"; for (;;) { if (a == b) { - pg_unref(a); - pg_unref(b); + // pollset ancestors are the same: nothing to do return; } - if (a > b) GPR_SWAP(polling_group *, a, b); - gpr_mu_lock(&a->po.mu); - gpr_mu_lock(&b->po.mu); - if (a->po.group != NULL) { - polling_group *m2 = pg_ref(a->po.group); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pg_unref(a); - a = m2; - } else if (b->po.group != NULL) { - polling_group *m2 = pg_ref(b->po.group); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pg_unref(b); - b = m2; + if (a > b) { + GPR_SWAP(grpc_pollset_set *, a, b); + } + gpr_mu *a_mu = &a->mu; + gpr_mu *b_mu = &b->mu; + gpr_mu_lock(a_mu); + gpr_mu_lock(b_mu); + if (a->parent != NULL) { + a = a->parent; + } else if (b->parent != NULL) { + b = b->parent; } else { - break; + break; // exit loop, both pollsets locked } + gpr_mu_unlock(a_mu); + gpr_mu_unlock(b_mu); } - polling_group **unref = NULL; - size_t unref_count = 0; - size_t unref_cap = 0; - b->po.group = a; - pg_broadcast(exec_ctx, a, b); - pg_broadcast(exec_ctx, b, a); - while (b->po.next != &b->po) { - polling_obj *po = b->po.next; - gpr_mu_lock(&po->mu); - if (unref_count == unref_cap) { - unref_cap = GPR_MAX(8, 3 * unref_cap / 2); - unref = (polling_group **)gpr_realloc(unref, unref_cap * sizeof(*unref)); - } - unref[unref_count++] = po->group; - po->group = pg_ref(a); - // unlink from b - po->prev->next = po->next; - po->next->prev = po->prev; - // link to a - po->next = &a->po; - po->prev = a->po.prev; - po->next->prev = po->prev->next = po; - gpr_mu_unlock(&po->mu); - } - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - for (size_t i = 0; i < unref_count; i++) { - pg_unref(unref[i]); - } - gpr_free(unref); - pg_unref(b); + // try to do the least copying possible + // TODO(ctiller): there's probably a better heuristic here + const size_t a_size = a->fd_count + a->pollset_count; + const size_t b_size = b->fd_count + b->pollset_count; + if (b_size > a_size) { + GPR_SWAP(grpc_pollset_set *, a, b); + } + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a); + } + gpr_ref(&a->refs); + b->parent = a; + if (a->fd_capacity < a->fd_count + b->fd_count) { + a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count); + a->fds = (grpc_fd **)gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds)); + } + size_t initial_a_fd_count = a->fd_count; + a->fd_count = 0; + append_error(&error, add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count, + b->pollsets, b->pollset_count, + "merge_a2b", a->fds, &a->fd_count), + err_desc); + append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, + a->pollsets, a->pollset_count, + "merge_b2a", a->fds, &a->fd_count), + err_desc); + if (a->pollset_capacity < a->pollset_count + b->pollset_count) { + a->pollset_capacity = + GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count); + a->pollsets = (grpc_pollset **)gpr_realloc( + a->pollsets, a->pollset_capacity * sizeof(*a->pollsets)); + } + if (b->pollset_count > 0) { + memcpy(a->pollsets + a->pollset_count, b->pollsets, + b->pollset_count * sizeof(*b->pollsets)); + } + a->pollset_count += b->pollset_count; + gpr_free(b->fds); + gpr_free(b->pollsets); + b->fds = NULL; + b->pollsets = NULL; + b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0; + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); } +static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + /******************************************************************************* * Event engine binding */ @@ -1399,7 +1428,7 @@ static const grpc_event_engine_vtable vtable = { pollset_add_fd, pollset_set_create, - pollset_set_destroy, + pollset_set_unref, // destroy ==> unref 1 public ref pollset_set_add_pollset, pollset_set_del_pollset, pollset_set_add_pollset_set, @@ -1412,6 +1441,10 @@ static const grpc_event_engine_vtable vtable = { const grpc_event_engine_vtable *grpc_init_epollex_linux( bool explicitly_requested) { + if (!explicitly_requested) { + return NULL; + } + if (!grpc_has_wakeup_fd()) { return NULL; } @@ -1420,6 +1453,10 @@ const grpc_event_engine_vtable *grpc_init_epollex_linux( return NULL; } +#ifndef NDEBUG + grpc_register_tracer(&grpc_trace_pollable_refcount); +#endif + fd_global_init(); if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 3a1dd8d30b..677ee675a6 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -92,12 +92,9 @@ const grpc_event_engine_vtable *init_non_polling(bool explicit_request) { } // namespace static const event_engine_factory g_factories[] = { - {"epoll1", grpc_init_epoll1_linux}, - {"epollsig", grpc_init_epollsig_linux}, - {"poll", grpc_init_poll_posix}, - {"poll-cv", grpc_init_poll_cv_posix}, - {"epollex", grpc_init_epollex_linux}, - {"none", init_non_polling}, + {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux}, + {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix}, + {"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 3d17afcb8f..0394a00f3e 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -152,6 +152,15 @@ void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) { gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock_type) { + // special-case infinities as grpc_millis can be 32bit on some platforms + // while gpr_time_from_millis always takes an int64_t. + if (millis == GRPC_MILLIS_INF_FUTURE) { + return gpr_inf_future(clock_type); + } + if (millis == GRPC_MILLIS_INF_PAST) { + return gpr_inf_past(clock_type); + } + if (clock_type == GPR_TIMESPAN) { return gpr_time_from_millis(millis, GPR_TIMESPAN); } diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 7fcaef7679..dbcc976ae9 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -186,7 +186,7 @@ static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } if (old_count == 0) { GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx); - p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); + p = (backup_poller *)gpr_zalloc(sizeof(*p) + grpc_pollset_size()); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); } diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc index 795ca0660a..cf10bf24c8 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.cc +++ b/src/core/lib/security/credentials/fake/fake_credentials.cc @@ -38,7 +38,8 @@ static grpc_security_status fake_transport_security_create_security_connector( grpc_call_credentials *call_creds, const char *target, const grpc_channel_args *args, grpc_channel_security_connector **sc, grpc_channel_args **new_args) { - *sc = grpc_fake_channel_security_connector_create(call_creds, target, args); + *sc = + grpc_fake_channel_security_connector_create(c, call_creds, target, args); return GRPC_SECURITY_OK; } @@ -46,7 +47,7 @@ static grpc_security_status fake_transport_security_server_create_security_connector( grpc_exec_ctx *exec_ctx, grpc_server_credentials *c, grpc_server_security_connector **sc) { - *sc = grpc_fake_server_security_connector_create(); + *sc = grpc_fake_server_security_connector_create(c); return GRPC_SECURITY_OK; } diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index f52a424e36..7867105f56 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -262,7 +262,7 @@ static bool oauth2_token_fetcher_get_request_metadata( grpc_mdelem cached_access_token_md = GRPC_MDNULL; gpr_mu_lock(&c->mu); if (!GRPC_MDISNULL(c->access_token_md) && - (c->token_expiration + grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) { + (c->token_expiration - grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) { cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md); } if (!GRPC_MDISNULL(cached_access_token_md)) { diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc index 9df69a2a3d..290336adc0 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc @@ -62,7 +62,8 @@ static grpc_security_status ssl_create_security_connector( } } status = grpc_ssl_channel_security_connector_create( - exec_ctx, call_creds, &c->config, target, overridden_target_name, sc); + exec_ctx, creds, call_creds, &c->config, target, overridden_target_name, + sc); if (status != GRPC_SECURITY_OK) { return status; } @@ -128,7 +129,8 @@ static grpc_security_status ssl_server_create_security_connector( grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds, grpc_server_security_connector **sc) { grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; - return grpc_ssl_server_security_connector_create(exec_ctx, &c->config, sc); + return grpc_ssl_server_security_connector_create(exec_ctx, creds, &c->config, + sc); } static grpc_server_credentials_vtable ssl_server_vtable = { diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc index 51844fb91f..80d9a7b77f 100644 --- a/src/core/lib/security/transport/security_connector.cc +++ b/src/core/lib/security/transport/security_connector.cc @@ -136,6 +136,39 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, } } +int grpc_security_connector_cmp(grpc_security_connector *sc, + grpc_security_connector *other) { + if (sc == NULL || other == NULL) return GPR_ICMP(sc, other); + int c = GPR_ICMP(sc->vtable, other->vtable); + if (c != 0) return c; + return sc->vtable->cmp(sc, other); +} + +int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1, + grpc_channel_security_connector *sc2) { + GPR_ASSERT(sc1->channel_creds != NULL); + GPR_ASSERT(sc2->channel_creds != NULL); + int c = GPR_ICMP(sc1->channel_creds, sc2->channel_creds); + if (c != 0) return c; + c = GPR_ICMP(sc1->request_metadata_creds, sc2->request_metadata_creds); + if (c != 0) return c; + c = GPR_ICMP((void *)sc1->check_call_host, (void *)sc2->check_call_host); + if (c != 0) return c; + c = GPR_ICMP((void *)sc1->cancel_check_call_host, + (void *)sc2->cancel_check_call_host); + if (c != 0) return c; + return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers); +} + +int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1, + grpc_server_security_connector *sc2) { + GPR_ASSERT(sc1->server_creds != NULL); + GPR_ASSERT(sc2->server_creds != NULL); + int c = GPR_ICMP(sc1->server_creds, sc2->server_creds); + if (c != 0) return c; + return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers); +} + bool grpc_channel_security_connector_check_call_host( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, grpc_auth_context *auth_context, @@ -199,25 +232,27 @@ void grpc_security_connector_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&sc->refcount)) sc->vtable->destroy(exec_ctx, sc); } -static void connector_pointer_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { +static void connector_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, (grpc_security_connector *)p, - "connector_pointer_arg_destroy"); + "connector_arg_destroy"); } -static void *connector_pointer_arg_copy(void *p) { +static void *connector_arg_copy(void *p) { return GRPC_SECURITY_CONNECTOR_REF((grpc_security_connector *)p, - "connector_pointer_arg_copy"); + "connector_arg_copy"); } -static int connector_pointer_cmp(void *a, void *b) { return GPR_ICMP(a, b); } +static int connector_cmp(void *a, void *b) { + return grpc_security_connector_cmp((grpc_security_connector *)a, + (grpc_security_connector *)b); +} -static const grpc_arg_pointer_vtable connector_pointer_vtable = { - connector_pointer_arg_copy, connector_pointer_arg_destroy, - connector_pointer_cmp}; +static const grpc_arg_pointer_vtable connector_arg_vtable = { + connector_arg_copy, connector_arg_destroy, connector_cmp}; grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) { return grpc_channel_arg_pointer_create((char *)GRPC_ARG_SECURITY_CONNECTOR, - sc, &connector_pointer_vtable); + sc, &connector_arg_vtable); } grpc_security_connector *grpc_security_connector_from_arg(const grpc_arg *arg) { @@ -382,6 +417,32 @@ static void fake_server_check_peer(grpc_exec_ctx *exec_ctx, fake_check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); } +static int fake_channel_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_fake_channel_security_connector *c1 = + (grpc_fake_channel_security_connector *)sc1; + grpc_fake_channel_security_connector *c2 = + (grpc_fake_channel_security_connector *)sc2; + int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base); + if (c != 0) return c; + c = strcmp(c1->target, c2->target); + if (c != 0) return c; + if (c1->expected_targets == NULL || c2->expected_targets == NULL) { + c = GPR_ICMP(c1->expected_targets, c2->expected_targets); + } else { + c = strcmp(c1->expected_targets, c2->expected_targets); + } + if (c != 0) return c; + return GPR_ICMP(c1->is_lb_channel, c2->is_lb_channel); +} + +static int fake_server_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + return grpc_server_security_connector_cmp( + (grpc_server_security_connector *)sc1, + (grpc_server_security_connector *)sc2); +} + static bool fake_channel_check_call_host(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, @@ -418,12 +479,13 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, } static grpc_security_connector_vtable fake_channel_vtable = { - fake_channel_destroy, fake_channel_check_peer}; + fake_channel_destroy, fake_channel_check_peer, fake_channel_cmp}; static grpc_security_connector_vtable fake_server_vtable = { - fake_server_destroy, fake_server_check_peer}; + fake_server_destroy, fake_server_check_peer, fake_server_cmp}; grpc_channel_security_connector *grpc_fake_channel_security_connector_create( + grpc_channel_credentials *channel_creds, grpc_call_credentials *request_metadata_creds, const char *target, const grpc_channel_args *args) { grpc_fake_channel_security_connector *c = @@ -431,6 +493,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; c->base.base.vtable = &fake_channel_vtable; + c->base.channel_creds = channel_creds; c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = fake_channel_check_call_host; @@ -444,13 +507,14 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( } grpc_server_security_connector *grpc_fake_server_security_connector_create( - void) { + grpc_server_credentials *server_creds) { grpc_server_security_connector *c = (grpc_server_security_connector *)gpr_zalloc( sizeof(grpc_server_security_connector)); gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &fake_server_vtable; c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; + c->server_creds = server_creds; c->add_handshakers = fake_server_add_handshakers; return c; } @@ -473,6 +537,7 @@ static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; + grpc_channel_credentials_unref(exec_ctx, c->base.channel_creds); grpc_call_credentials_unref(exec_ctx, c->base.request_metadata_creds); tsi_ssl_client_handshaker_factory_unref(c->client_handshaker_factory); c->client_handshaker_factory = NULL; @@ -485,6 +550,7 @@ static void ssl_server_destroy(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; + grpc_server_credentials_unref(exec_ctx, c->base.server_creds); tsi_ssl_server_handshaker_factory_unref(c->server_handshaker_factory); c->server_handshaker_factory = NULL; gpr_free(sc); @@ -641,6 +707,29 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); } +static int ssl_channel_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_ssl_channel_security_connector *c1 = + (grpc_ssl_channel_security_connector *)sc1; + grpc_ssl_channel_security_connector *c2 = + (grpc_ssl_channel_security_connector *)sc2; + int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base); + if (c != 0) return c; + c = strcmp(c1->target_name, c2->target_name); + if (c != 0) return c; + return (c1->overridden_target_name == NULL || + c2->overridden_target_name == NULL) + ? GPR_ICMP(c1->overridden_target_name, c2->overridden_target_name) + : strcmp(c1->overridden_target_name, c2->overridden_target_name); +} + +static int ssl_server_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + return grpc_server_security_connector_cmp( + (grpc_server_security_connector *)sc1, + (grpc_server_security_connector *)sc2); +} + static void add_shallow_auth_property_to_peer(tsi_peer *peer, const grpc_auth_property *prop, const char *tsi_prop_name) { @@ -717,10 +806,10 @@ static void ssl_channel_cancel_check_call_host( } static grpc_security_connector_vtable ssl_channel_vtable = { - ssl_channel_destroy, ssl_channel_check_peer}; + ssl_channel_destroy, ssl_channel_check_peer, ssl_channel_cmp}; static grpc_security_connector_vtable ssl_server_vtable = { - ssl_server_destroy, ssl_server_check_peer}; + ssl_server_destroy, ssl_server_check_peer, ssl_server_cmp}; /* returns a NULL terminated slice. */ static grpc_slice compute_default_pem_root_certs_once(void) { @@ -804,7 +893,8 @@ const char *grpc_get_default_ssl_roots(void) { } grpc_security_status grpc_ssl_channel_security_connector_create( - grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds, + grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds, + grpc_call_credentials *request_metadata_creds, const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); @@ -840,6 +930,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.vtable = &ssl_channel_vtable; c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; + c->base.channel_creds = grpc_channel_credentials_ref(channel_creds); c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = ssl_channel_check_call_host; @@ -874,8 +965,8 @@ error: } grpc_security_status grpc_ssl_server_security_connector_create( - grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config, - grpc_server_security_connector **sc) { + grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds, + const grpc_ssl_server_config *config, grpc_server_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); const char **alpn_protocol_strings = (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols); @@ -897,6 +988,7 @@ grpc_security_status grpc_ssl_server_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; c->base.base.vtable = &ssl_server_vtable; + c->base.server_creds = grpc_server_credentials_ref(server_creds); result = tsi_create_ssl_server_handshaker_factory_ex( config->pem_key_cert_pairs, config->num_key_cert_pairs, config->pem_root_certs, get_tsi_client_certificate_request_type( diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 4d87cd0c80..216bb35e81 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -60,13 +60,9 @@ typedef struct { void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, grpc_auth_context **auth_context, grpc_closure *on_peer_checked); + int (*cmp)(grpc_security_connector *sc, grpc_security_connector *other); } grpc_security_connector_vtable; -typedef struct grpc_security_connector_handshake_list { - void *handshake; - struct grpc_security_connector_handshake_list *next; -} grpc_security_connector_handshake_list; - struct grpc_security_connector { const grpc_security_connector_vtable *vtable; gpr_refcount refcount; @@ -104,6 +100,10 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_auth_context **auth_context, grpc_closure *on_peer_checked); +/* Compares two security connectors. */ +int grpc_security_connector_cmp(grpc_security_connector *sc, + grpc_security_connector *other); + /* Util to encapsulate the connector in a channel arg. */ grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc); @@ -116,13 +116,14 @@ grpc_security_connector *grpc_security_connector_find_in_args( /* --- channel_security_connector object. --- - A channel security connector object represents away to configure the + A channel security connector object represents a way to configure the underlying transport security mechanism on the client side. */ typedef struct grpc_channel_security_connector grpc_channel_security_connector; struct grpc_channel_security_connector { grpc_security_connector base; + grpc_channel_credentials *channel_creds; grpc_call_credentials *request_metadata_creds; bool (*check_call_host)(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, @@ -138,6 +139,10 @@ struct grpc_channel_security_connector { grpc_handshake_manager *handshake_mgr); }; +/// A helper function for use in grpc_security_connector_cmp() implementations. +int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1, + grpc_channel_security_connector *sc2); + /// Checks that the host that will be set for a call is acceptable. /// Returns true if completed synchronously, in which case \a error will /// be set to indicate the result. Otherwise, \a on_call_host_checked @@ -161,18 +166,23 @@ void grpc_channel_security_connector_add_handshakers( /* --- server_security_connector object. --- - A server security connector object represents away to configure the + A server security connector object represents a way to configure the underlying transport security mechanism on the server side. */ typedef struct grpc_server_security_connector grpc_server_security_connector; struct grpc_server_security_connector { grpc_security_connector base; + grpc_server_credentials *server_creds; void (*add_handshakers)(grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_handshake_manager *handshake_mgr); }; +/// A helper function for use in grpc_security_connector_cmp() implementations. +int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1, + grpc_server_security_connector *sc2); + void grpc_server_security_connector_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_handshake_manager *handshake_mgr); @@ -182,13 +192,14 @@ void grpc_server_security_connector_add_handshakers( /* For TESTING ONLY! Creates a fake connector that emulates real channel security. */ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( + grpc_channel_credentials *channel_creds, grpc_call_credentials *request_metadata_creds, const char *target, const grpc_channel_args *args); /* For TESTING ONLY! Creates a fake connector that emulates real server security. */ grpc_server_security_connector *grpc_fake_server_security_connector_create( - void); + grpc_server_credentials *server_creds); /* Config for ssl clients. */ @@ -211,7 +222,8 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_channel_security_connector_create( - grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds, + grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds, + grpc_call_credentials *request_metadata_creds, const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc); @@ -236,8 +248,8 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_server_security_connector_create( - grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config, - grpc_server_security_connector **sc); + grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds, + const grpc_ssl_server_config *config, grpc_server_security_connector **sc); /* Util. */ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, diff --git a/src/core/lib/support/memory.h b/src/core/lib/support/memory.h index dc3d32e1c2..6b336681db 100644 --- a/src/core/lib/support/memory.h +++ b/src/core/lib/support/memory.h @@ -21,6 +21,7 @@ #include <grpc/support/alloc.h> +#include <limits> #include <memory> #include <utility> @@ -54,6 +55,46 @@ inline UniquePtr<T> MakeUnique(Args&&... args) { return UniquePtr<T>(New<T>(std::forward<Args>(args)...)); } +// an allocator that uses gpr_malloc/gpr_free +template <class T> +class Allocator { + public: + typedef T value_type; + typedef T* pointer; + typedef const T* const_pointer; + typedef T& reference; + typedef const T& const_reference; + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; + typedef std::false_type propagate_on_container_move_assignment; + template <class U> + struct rebind { + typedef Allocator<U> other; + }; + typedef std::true_type is_always_equal; + + pointer address(reference x) const { return &x; } + const_pointer address(const_reference x) const { return &x; } + pointer allocate(std::size_t n, + std::allocator<void>::const_pointer hint = 0) { + return static_cast<pointer>(gpr_malloc(n * sizeof(T))); + } + void deallocate(T* p, std::size_t n) { gpr_free(p); } + size_t max_size() const { + return std::numeric_limits<size_type>::max() / sizeof(value_type); + } + void construct(pointer p, const_reference val) { new ((void*)p) T(val); } + template <class U, class... Args> + void construct(U* p, Args&&... args) { + ::new ((void*)p) U(std::forward<Args>(args)...); + } + void destroy(pointer p) { p->~T(); } + template <class U> + void destroy(U* p) { + p->~U(); + } +}; + } // namespace grpc_core #endif /* GRPC_CORE_LIB_SUPPORT_MEMORY_H */ diff --git a/src/core/lib/support/vector.h b/src/core/lib/support/vector.h new file mode 100644 index 0000000000..4a7db80676 --- /dev/null +++ b/src/core/lib/support/vector.h @@ -0,0 +1,32 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_VECTOR_H +#define GRPC_CORE_LIB_SUPPORT_VECTOR_H + +#include "absl/container/inlined_vector.h" +#include "src/core/lib/support/memory.h" + +namespace grpc_core { + +template <typename T, size_t N> +using InlinedVector = absl::InlinedVector<T, N, Allocator<T>>; + +} // namespace grpc_core + +#endif diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 2a1c97c84e..f1597014b1 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -33,13 +33,12 @@ BdpEstimator::BdpEstimator(const char *name) accumulator_(0), estimate_(65536), ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)), - next_ping_scheduled_(0), inter_ping_delay_(100.0), // start at 100ms stable_estimate_count_(0), bw_est_(0), name_(name) {} -void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { +grpc_millis BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_); double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; @@ -79,7 +78,7 @@ void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { } ping_state_ = PingState::UNSCHEDULED; accumulator_ = 0; - next_ping_scheduled_ = grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_; + return grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_; } } // namespace grpc_core diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index 6447f2d62c..750da39599 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -40,30 +40,11 @@ class BdpEstimator { explicit BdpEstimator(const char *name); ~BdpEstimator() {} - // Returns true if a reasonable estimate could be obtained - bool EstimateBdp(int64_t *estimate_out) const { - *estimate_out = estimate_; - return true; - } - bool EstimateBandwidth(double *bw_out) const { - *bw_out = bw_est_; - return true; - } + int64_t EstimateBdp() const { return estimate_; } + double EstimateBandwidth() const { return bw_est_; } void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; } - // Returns true if the user should schedule a ping - bool NeedPing(grpc_exec_ctx *exec_ctx) const { - switch (ping_state_) { - case PingState::UNSCHEDULED: - return grpc_exec_ctx_now(exec_ctx) >= next_ping_scheduled_; - case PingState::SCHEDULED: - case PingState::STARTED: - return false; - } - GPR_UNREACHABLE_CODE(return false); - } - // Schedule a ping: call in response to receiving a true from // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a // transport (but not necessarily started) @@ -91,8 +72,8 @@ class BdpEstimator { ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC); } - // Completes a previously started ping - void CompletePing(grpc_exec_ctx *exec_ctx); + // Completes a previously started ping, returns when to schedule the next one + grpc_millis CompletePing(grpc_exec_ctx *exec_ctx); private: enum class PingState { UNSCHEDULED, SCHEDULED, STARTED }; @@ -102,8 +83,6 @@ class BdpEstimator { int64_t estimate_; // when was the current ping started? gpr_timespec ping_start_time_; - // when should the next ping start? - grpc_millis next_ping_scheduled_; int inter_ping_delay_; int stable_estimate_count_; double bw_est_; diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc index 5455b2481b..2392f26c0b 100644 --- a/src/core/lib/transport/metadata.cc +++ b/src/core/lib/transport/metadata.cc @@ -352,11 +352,14 @@ static size_t get_base64_encoded_size(size_t raw_length) { return raw_length / 3 * 4 + tail_xtra[raw_length % 3]; } -size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem) { +size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem, + bool use_true_binary_metadata) { size_t overhead_and_key = 32 + GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); size_t value_len = GRPC_SLICE_LENGTH(GRPC_MDVALUE(elem)); if (grpc_is_binary_header(GRPC_MDKEY(elem))) { - return overhead_and_key + get_base64_encoded_size(value_len); + return overhead_and_key + (use_true_binary_metadata + ? value_len + 1 + : get_base64_encoded_size(value_len)); } else { return overhead_and_key + value_len; } diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index 9f82225dc3..3f1032ab8a 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -132,7 +132,8 @@ grpc_mdelem grpc_mdelem_create( bool grpc_mdelem_eq(grpc_mdelem a, grpc_mdelem b); -size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem); +size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem, + bool use_true_binary_metadata); /* Mutator and accessor for grpc_mdelem user data. The destructor function is used as a type tag and is checked during user_data fetch. */ diff --git a/src/core/lib/transport/pid_controller.cc b/src/core/lib/transport/pid_controller.cc index 4b304f17b2..9f7750d693 100644 --- a/src/core/lib/transport/pid_controller.cc +++ b/src/core/lib/transport/pid_controller.cc @@ -19,45 +19,30 @@ #include "src/core/lib/transport/pid_controller.h" #include <grpc/support/useful.h> -void grpc_pid_controller_init(grpc_pid_controller *pid_controller, - grpc_pid_controller_args args) { - pid_controller->args = args; - pid_controller->last_control_value = args.initial_control_value; - grpc_pid_controller_reset(pid_controller); -} +namespace grpc_core { -void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) { - pid_controller->last_error = 0.0; - pid_controller->last_dc_dt = 0.0; - pid_controller->error_integral = 0.0; -} +PidController::PidController(const Args &args) + : last_control_value_(args.initial_control_value()), args_(args) {} -double grpc_pid_controller_update(grpc_pid_controller *pid_controller, - double error, double dt) { - if (dt == 0) return pid_controller->last_control_value; +double PidController::Update(double error, double dt) { + if (dt <= 0) return last_control_value_; /* integrate error using the trapezoid rule */ - pid_controller->error_integral += - dt * (pid_controller->last_error + error) * 0.5; - pid_controller->error_integral = GPR_CLAMP( - pid_controller->error_integral, -pid_controller->args.integral_range, - pid_controller->args.integral_range); - double diff_error = (error - pid_controller->last_error) / dt; + error_integral_ += dt * (last_error_ + error) * 0.5; + error_integral_ = GPR_CLAMP(error_integral_, -args_.integral_range(), + args_.integral_range()); + double diff_error = (error - last_error_) / dt; /* calculate derivative of control value vs time */ - double dc_dt = pid_controller->args.gain_p * error + - pid_controller->args.gain_i * pid_controller->error_integral + - pid_controller->args.gain_d * diff_error; + double dc_dt = args_.gain_p() * error + args_.gain_i() * error_integral_ + + args_.gain_d() * diff_error; /* and perform trapezoidal integration */ - double new_control_value = pid_controller->last_control_value + - dt * (pid_controller->last_dc_dt + dc_dt) * 0.5; - new_control_value = - GPR_CLAMP(new_control_value, pid_controller->args.min_control_value, - pid_controller->args.max_control_value); - pid_controller->last_error = error; - pid_controller->last_dc_dt = dc_dt; - pid_controller->last_control_value = new_control_value; + double new_control_value = + last_control_value_ + dt * (last_dc_dt_ + dc_dt) * 0.5; + new_control_value = GPR_CLAMP(new_control_value, args_.min_control_value(), + args_.max_control_value()); + last_error_ = error; + last_dc_dt_ = dc_dt; + last_control_value_ = new_control_value; return new_control_value; } -double grpc_pid_controller_last(grpc_pid_controller *pid_controller) { - return pid_controller->last_control_value; -} +} // namespace grpc_core diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h index 80899e9a20..87e59a1a90 100644 --- a/src/core/lib/transport/pid_controller.h +++ b/src/core/lib/transport/pid_controller.h @@ -19,9 +19,7 @@ #ifndef GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H #define GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H -#ifdef __cplusplus -extern "C" { -#endif +#include <limits> /* \file Simple PID controller. Implements a proportional-integral-derivative controller. @@ -30,41 +28,87 @@ extern "C" { Gains can be set to adjust sensitivity to current error (p), the integral of error (i), and the derivative of error (d). */ -typedef struct { - double gain_p; - double gain_i; - double gain_d; - double initial_control_value; - double min_control_value; - double max_control_value; - double integral_range; -} grpc_pid_controller_args; +namespace grpc_core { -typedef struct { - double last_error; - double error_integral; - double last_control_value; - double last_dc_dt; - grpc_pid_controller_args args; -} grpc_pid_controller; +class PidController { + public: + class Args { + public: + double gain_p() const { return gain_p_; } + double gain_i() const { return gain_i_; } + double gain_d() const { return gain_d_; } + double initial_control_value() const { return initial_control_value_; } + double min_control_value() const { return min_control_value_; } + double max_control_value() const { return max_control_value_; } + double integral_range() const { return integral_range_; } -/** Initialize the controller */ -void grpc_pid_controller_init(grpc_pid_controller *pid_controller, - grpc_pid_controller_args args); + Args& set_gain_p(double gain_p) { + gain_p_ = gain_p; + return *this; + } + Args& set_gain_i(double gain_i) { + gain_i_ = gain_i; + return *this; + } + Args& set_gain_d(double gain_d) { + gain_d_ = gain_d; + return *this; + } + Args& set_initial_control_value(double initial_control_value) { + initial_control_value_ = initial_control_value; + return *this; + } + Args& set_min_control_value(double min_control_value) { + min_control_value_ = min_control_value; + return *this; + } + Args& set_max_control_value(double max_control_value) { + max_control_value_ = max_control_value; + return *this; + } + Args& set_integral_range(double integral_range) { + integral_range_ = integral_range; + return *this; + } -/** Reset the controller: useful when things have changed significantly */ -void grpc_pid_controller_reset(grpc_pid_controller *pid_controller); + private: + double gain_p_ = 0.0; + double gain_i_ = 0.0; + double gain_d_ = 0.0; + double initial_control_value_ = 0.0; + double min_control_value_ = std::numeric_limits<double>::min(); + double max_control_value_ = std::numeric_limits<double>::max(); + double integral_range_ = std::numeric_limits<double>::max(); + }; -/** Update the controller: given a current error estimate, and the time since - the last update, returns a new control value */ -double grpc_pid_controller_update(grpc_pid_controller *pid_controller, - double error, double dt); + explicit PidController(const Args& args); -/** Returns the last control value calculated */ -double grpc_pid_controller_last(grpc_pid_controller *pid_controller); + /// Reset the controller internal state: useful when the environment has + /// changed significantly + void Reset() { + last_error_ = 0.0; + last_dc_dt_ = 0.0; + error_integral_ = 0.0; + } -#ifdef __cplusplus -} -#endif + /// Update the controller: given a current error estimate, and the time since + /// the last update, returns a new control value + double Update(double error, double dt); + + /// Returns the last control value calculated + double last_control_value() const { return last_control_value_; } + + /// Returns the current error integral (mostly for testing) + double error_integral() const { return error_integral_; } + + private: + double last_error_ = 0.0; + double error_integral_ = 0.0; + double last_control_value_; + double last_dc_dt_ = 0.0; + const Args args_; +}; + +} // namespace grpc_core #endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */ |