aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-10-18 14:47:18 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-10-18 14:47:18 -0700
commitd3d709a10fe2d3d9c2f2115bd4adad33e5a1ad0c (patch)
tree4a72ee6263f95fc39db944aab3699bc9f15922b8 /src
parentcdaf6d8e0660f83b708e4e73e90a4deaeab7ea2b (diff)
parentdcd7e80fdaf29b3fd081e7ac9df708d4c562eb79 (diff)
Merge github.com:grpc/grpc into tracer
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc255
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.cc576
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.h328
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.cc4
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.cc10
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.cc252
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h177
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.cc17
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc32
-rw-r--r--src/core/lib/http/httpcli_security_connector.cc15
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc1404
-rw-r--r--src/core/lib/iomgr/ev_posix.cc9
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc9
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc2
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.cc5
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.cc2
-rw-r--r--src/core/lib/security/credentials/ssl/ssl_credentials.cc6
-rw-r--r--src/core/lib/security/transport/security_connector.cc126
-rw-r--r--src/core/lib/security/transport/security_connector.h34
-rw-r--r--src/core/lib/support/cpu_linux.cc6
-rw-r--r--src/core/lib/support/memory.h41
-rw-r--r--src/core/lib/support/vector.h32
-rw-r--r--src/core/lib/transport/bdp_estimator.cc5
-rw-r--r--src/core/lib/transport/bdp_estimator.h29
-rw-r--r--src/core/lib/transport/metadata.cc7
-rw-r--r--src/core/lib/transport/metadata.h3
-rw-r--r--src/core/lib/transport/pid_controller.cc53
-rw-r--r--src/core/lib/transport/pid_controller.h110
-rw-r--r--src/cpp/client/create_channel.cc3
-rw-r--r--src/php/ext/grpc/server.c4
30 files changed, 1977 insertions, 1579 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index a103038893..ae352d5d7a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -54,7 +54,6 @@
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"
-#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
@@ -151,10 +150,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
+static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error);
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error);
+static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx,
+ void *tp, grpc_error *error);
static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
@@ -217,8 +220,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->write_cb_pool = next;
}
- t->flow_control.bdp_estimator.Destroy();
+ t->flow_control.Destroy();
+ GRPC_ERROR_UNREF(t->closed_with_error);
gpr_free(t->ping_acks);
gpr_free(t->peer_string);
gpr_free(t);
@@ -276,10 +280,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client;
- t->flow_control.remote_window = DEFAULT_WINDOW;
- t->flow_control.announced_window = DEFAULT_WINDOW;
- t->flow_control.target_initial_window_size = DEFAULT_WINDOW;
- t->flow_control.t = t;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@@ -304,6 +304,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
+ next_bdp_ping_timer_expired_locked, t,
+ grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
t, grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
@@ -316,8 +319,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
- t->flow_control.bdp_estimator.Init(t->peer_string);
-
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@@ -341,8 +342,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
- t->write_buffer_size = DEFAULT_WINDOW;
- t->flow_control.enable_bdp_probe = true;
+ t->write_buffer_size = grpc_core::chttp2::kDefaultWindow;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@@ -387,6 +387,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ bool enable_bdp = true;
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -447,8 +449,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
- t->flow_control.enable_bdp_probe =
- grpc_channel_arg_get_integer(&channel_args->args[i], {1, 0, 1});
+ enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
@@ -543,6 +544,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+ t->flow_control.Init(exec_ctx, t, enable_bdp);
+
/* No pings allowed before receiving a header or data frame. */
t->ping_state.pings_before_data_required = 0;
t->ping_state.is_delayed_ping_timer_set = false;
@@ -563,10 +566,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx,
- grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
- NULL);
+ if (enable_bdp) {
+ GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
+ schedule_bdp_ping_locked(exec_ctx, t);
+
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL);
+ }
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
@@ -596,7 +602,9 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
- if (!t->closed) {
+ end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
+ cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
if (!grpc_error_has_clear_grpc_status(error)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
@@ -611,13 +619,16 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error_add_child(t->close_transport_on_writes_finished, error);
return;
}
- t->closed = 1;
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ t->closed_with_error = GRPC_ERROR_REF(error);
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
if (t->ping_state.is_delayed_ping_timer_set) {
grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer);
}
+ if (t->have_next_bdp_ping_timer) {
+ grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer);
+ }
switch (t->keepalive_state) {
case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
@@ -637,8 +648,8 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
- end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
- cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
+ GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
+ grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -699,7 +710,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
post_destructive_reclaimer(exec_ctx, t);
}
- s->flow_control.s = s;
+ s->flow_control.Init(t->flow_control.get(), s);
GPR_TIMER_END("init_stream", 0);
return 0;
@@ -750,7 +761,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
- grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control);
+ s->flow_control.Destroy();
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
@@ -941,7 +952,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
+ if (t->closed_with_error == GRPC_ERROR_NONE &&
+ grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
}
}
@@ -994,7 +1006,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_chttp2_begin_write_result r;
- if (t->closed) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
r.writing = false;
} else {
r = grpc_chttp2_begin_write(exec_ctx, t);
@@ -1457,7 +1469,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (!s->write_closed) {
if (t->is_client) {
- if (!t->closed) {
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_ASSERT(s->id == 0);
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(exec_ctx, t);
@@ -1465,7 +1477,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_cancel_stream(
exec_ctx, t, s,
grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Transport closed", &t->closed_with_error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
} else {
@@ -1617,13 +1630,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (s->id != 0) {
if (!s->read_closed) {
already_received = s->frame_storage.length;
- grpc_chttp2_flowctl_incoming_bs_update(
- &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES,
- already_received);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
- &s->flow_control),
- t, s);
+ s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
+ already_received);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx,
+ s->flow_control->MakeAction(), t, s);
}
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@@ -1687,6 +1697,7 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
grpc_chttp2_ping_queue *pq = &t->ping_queue;
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]);
@@ -1696,6 +1707,12 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure *on_initiate, grpc_closure *on_ack) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_SCHED(exec_ctx, on_initiate,
+ GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error));
+ return;
+ }
grpc_chttp2_ping_queue *pq = &t->ping_queue;
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
GRPC_ERROR_NONE);
@@ -1754,7 +1771,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
/*The transport will be closed after the write is done */
close_transport_locked(
- exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"));
+ exec_ctx, t, grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
}
@@ -2390,57 +2409,46 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
* INPUT PROCESSING - PARSING
*/
-void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_flowctl_action action,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- switch (action.send_stream_update) {
- case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+template <class F>
+static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_core::chttp2::FlowControlAction::Urgency urgency,
+ grpc_chttp2_initiate_write_reason reason, F action) {
+ switch (urgency) {
+ case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
break;
- case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
- grpc_chttp2_initiate_write(
- exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL);
+ case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
+ // fallthrough
+ case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
+ action();
break;
- case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
- grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
- break;
- }
- switch (action.send_transport_update) {
- case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
- break;
- case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_initiate_write(
- exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL);
- break;
- // this is the same as no action b/c every time the transport enters the
- // writing path it will maybe do an update
- case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
- break;
- }
- if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
- if (action.initial_window_size > 0) {
- queue_setting_update(exec_ctx, t,
- GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- (uint32_t)action.initial_window_size);
- }
- if (action.max_frame_size > 0) {
- queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- (uint32_t)action.max_frame_size);
- }
- if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
- grpc_chttp2_initiate_write(exec_ctx, t,
- GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
- }
- }
- if (action.need_ping) {
- GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
- t->flow_control.bdp_estimator->SchedulePing();
- send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
- &t->finish_bdp_ping_locked);
}
}
+void grpc_chttp2_act_on_flowctl_action(
+ grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action,
+ grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
+ WithUrgency(
+ exec_ctx, t, action.send_stream_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
+ [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); });
+ WithUrgency(exec_ctx, t, action.send_transport_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
+ WithUrgency(exec_ctx, t, action.send_initial_window_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
+ [exec_ctx, t, &action]() {
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ action.initial_window_size());
+ });
+ WithUrgency(
+ exec_ctx, t, action.send_max_frame_size_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() {
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ action.max_frame_size());
+ });
+}
+
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_http_parser parser;
@@ -2488,13 +2496,13 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
}
GPR_SWAP(grpc_error *, err, error);
GRPC_ERROR_UNREF(err);
- if (!t->closed) {
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
- t->flow_control.bdp_estimator->AddIncomingBytes(
+ t->flow_control->bdp_estimator()->AddIncomingBytes(
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
@@ -2511,8 +2519,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action.parse", 0);
GPR_TIMER_BEGIN("post_parse_locked", 0);
- if (t->flow_control.initial_window_update != 0) {
- if (t->flow_control.initial_window_update > 0) {
+ if (t->initial_window_update != 0) {
+ if (t->initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
@@ -2521,20 +2529,21 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
}
}
- t->flow_control.initial_window_update = 0;
+ t->initial_window_update = 0;
}
GPR_TIMER_END("post_parse_locked", 0);
}
GPR_TIMER_BEGIN("post_reading_action_locked", 0);
bool keep_reading = false;
- if (error == GRPC_ERROR_NONE && t->closed) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed");
+ if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Transport closed", &t->closed_with_error, 1);
}
if (error != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
- } else if (!t->closed) {
+ } else if (t->closed_with_error == GRPC_ERROR_NONE) {
keep_reading = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
}
@@ -2543,10 +2552,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx,
- grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
- NULL);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(),
+ t, NULL);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -2559,28 +2566,60 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action_locked", 0);
}
+// t is reffed prior to calling the first time, and once the callback chain
+// that kicks off finishes, it's unreffed
+static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ t->flow_control->bdp_estimator()->SchedulePing();
+ send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
+ &t->finish_bdp_ping_locked);
+}
+
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
if (grpc_http_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
+ gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string,
+ grpc_error_string(error));
}
/* Reset the keepalive ping timer */
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
}
- t->flow_control.bdp_estimator->StartPing();
+ t->flow_control->bdp_estimator()->StartPing();
}
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
if (grpc_http_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
+ gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string,
+ grpc_error_string(error));
}
- t->flow_control.bdp_estimator->CompletePing(exec_ctx);
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+ return;
+ }
+ grpc_millis next_ping =
+ t->flow_control->bdp_estimator()->CompletePing(exec_ctx);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr);
+ GPR_ASSERT(!t->have_next_bdp_ping_timer);
+ t->have_next_bdp_ping_timer = true;
+ grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping,
+ &t->next_bdp_ping_timer_expired_locked);
+}
- GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx,
+ void *tp, grpc_error *error) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
+ GPR_ASSERT(t->have_next_bdp_ping_timer);
+ t->have_next_bdp_ping_timer = false;
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+ return;
+ }
+ schedule_bdp_ping_locked(exec_ctx, t);
}
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
@@ -2645,7 +2684,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
- if (t->destroying || t->closed) {
+ if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
} else if (error == GRPC_ERROR_NONE) {
if (t->keepalive_permit_without_calls ||
@@ -2703,8 +2742,11 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
- close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "keepalive watchdog timeout"));
+ close_transport_locked(
+ exec_ctx, t,
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "keepalive watchdog timeout"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL));
}
} else {
/* The watchdog timer should have been cancelled by
@@ -2787,13 +2829,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
size_t cur_length = s->frame_storage.length;
if (!s->read_closed) {
- grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control,
- bs->next_action.max_size_hint,
- cur_length);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
- &s->flow_control),
- t, s);
+ s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
+ cur_length);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(),
+ t, s);
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc
index 60c43d840a..dd80036530 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.cc
+++ b/src/core/ext/transport/chttp2/transport/flow_control.cc
@@ -16,7 +16,7 @@
*
*/
-#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include <inttypes.h>
#include <limits.h>
@@ -28,38 +28,15 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/support/string.h"
-static uint32_t grpc_chttp2_target_announced_window(
- const grpc_chttp2_transport_flowctl* tfc);
-
-#ifndef NDEBUG
-
-typedef struct {
- int64_t remote_window;
- int64_t target_window;
- int64_t announced_window;
- int64_t remote_window_delta;
- int64_t local_window_delta;
- int64_t announced_window_delta;
- uint32_t local_init_window;
- uint32_t local_max_frame;
-} shadow_flow_control;
-
-static void pretrace(shadow_flow_control* shadow_fc,
- grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc) {
- shadow_fc->remote_window = tfc->remote_window;
- shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc);
- shadow_fc->announced_window = tfc->announced_window;
- if (sfc != NULL) {
- shadow_fc->remote_window_delta = sfc->remote_window_delta;
- shadow_fc->local_window_delta = sfc->local_window_delta;
- shadow_fc->announced_window_delta = sfc->announced_window_delta;
- }
-}
+namespace grpc_core {
+namespace chttp2 {
+
+namespace {
-#define TRACE_PADDING 30
+static constexpr const int kTracePadding = 30;
static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
char* str;
@@ -68,7 +45,7 @@ static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
} else {
gpr_asprintf(&str, "%" PRId64 "", old_val);
}
- char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
+ char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
gpr_free(str);
return str_lp;
}
@@ -80,47 +57,58 @@ static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
} else {
gpr_asprintf(&str, "%" PRIu32 "", old_val);
}
- char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
+ char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
gpr_free(str);
return str_lp;
}
+} // namespace
+
+void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc,
+ StreamFlowControl* sfc) {
+ tfc_ = tfc;
+ sfc_ = sfc;
+ reason_ = reason;
+ remote_window_ = tfc->remote_window();
+ target_window_ = tfc->target_window();
+ announced_window_ = tfc->announced_window();
+ if (sfc != nullptr) {
+ remote_window_delta_ = sfc->remote_window_delta();
+ local_window_delta_ = sfc->local_window_delta();
+ announced_window_delta_ = sfc->announced_window_delta();
+ }
+}
-static void posttrace(shadow_flow_control* shadow_fc,
- grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc, const char* reason) {
+void FlowControlTrace::Finish() {
uint32_t acked_local_window =
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ tfc_->transport()->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
uint32_t remote_window =
- tfc->t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- char* trw_str =
- fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window);
- char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window,
- grpc_chttp2_target_announced_window(tfc));
+ tfc_->transport()->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window());
+ char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window());
char* taw_str =
- fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window);
+ fmt_int64_diff_str(announced_window_, tfc_->announced_window());
char* srw_str;
char* slw_str;
char* saw_str;
- if (sfc != NULL) {
- srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window,
- sfc->remote_window_delta + remote_window);
- slw_str =
- fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window,
- sfc->local_window_delta + acked_local_window);
- saw_str = fmt_int64_diff_str(
- shadow_fc->announced_window_delta + acked_local_window,
- sfc->announced_window_delta + acked_local_window);
+ if (sfc_ != nullptr) {
+ srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
+ sfc_->remote_window_delta() + remote_window);
+ slw_str = fmt_int64_diff_str(local_window_delta_ + acked_local_window,
+ local_window_delta_ + acked_local_window);
+ saw_str = fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
+ announced_window_delta_ + acked_local_window);
} else {
- srw_str = gpr_leftpad("", ' ', TRACE_PADDING);
- slw_str = gpr_leftpad("", ' ', TRACE_PADDING);
- saw_str = gpr_leftpad("", ' ', TRACE_PADDING);
+ srw_str = gpr_leftpad("", ' ', kTracePadding);
+ slw_str = gpr_leftpad("", ' ', kTracePadding);
+ saw_str = gpr_leftpad("", ' ', kTracePadding);
}
gpr_log(GPR_DEBUG,
"%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
- tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr",
- reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str);
+ tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0,
+ tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str,
+ tlw_str, taw_str, srw_str, slw_str, saw_str);
gpr_free(trw_str);
gpr_free(tlw_str);
gpr_free(taw_str);
@@ -129,13 +117,13 @@ static void posttrace(shadow_flow_control* shadow_fc,
gpr_free(saw_str);
}
-static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
- switch (urgency) {
- case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+const char* FlowControlAction::UrgencyString(Urgency u) {
+ switch (u) {
+ case Urgency::NO_ACTION_NEEDED:
return "no action";
- case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
+ case Urgency::UPDATE_IMMEDIATELY:
return "update immediately";
- case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
+ case Urgency::QUEUE_UPDATE:
return "queue update";
default:
GPR_UNREACHABLE_CODE(return "unknown");
@@ -143,209 +131,132 @@ static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
GPR_UNREACHABLE_CODE(return "unknown");
}
-static void trace_action(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_flowctl_action action) {
+void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
char* iw_str = fmt_uint32_diff_str(
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
- action.initial_window_size);
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ initial_window_size_);
char* mf_str = fmt_uint32_diff_str(
- tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- action.max_frame_size);
- gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s",
- urgency_to_string(action.send_transport_update),
- urgency_to_string(action.send_stream_update),
- urgency_to_string(action.send_setting_update), iw_str, mf_str);
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ max_frame_size_);
+ gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s",
+ UrgencyString(send_transport_update_),
+ UrgencyString(send_stream_update_),
+ UrgencyString(send_initial_window_update_), iw_str,
+ UrgencyString(send_max_frame_size_update_), mf_str);
gpr_free(iw_str);
gpr_free(mf_str);
}
-#define PRETRACE(tfc, sfc) \
- shadow_flow_control shadow_fc; \
- GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
-#define POSTTRACE(tfc, sfc, reason) \
- GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
-#define TRACEACTION(tfc, action) \
- GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action))
-#else
-#define PRETRACE(tfc, sfc)
-#define POSTTRACE(tfc, sfc, reason)
-#define TRACEACTION(tfc, action)
-#endif
-
-/* How many bytes of incoming flow control would we like to advertise */
-static uint32_t grpc_chttp2_target_announced_window(
- const grpc_chttp2_transport_flowctl* tfc) {
- return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1),
- tfc->announced_stream_total_over_incoming_window +
- tfc->target_initial_window_size);
-}
-
-// we have sent data on the wire, we must track this in our bookkeeping for the
-// remote peer's flow control.
-void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc,
- int64_t size) {
- PRETRACE(tfc, sfc);
- tfc->remote_window -= size;
- sfc->remote_window_delta -= size;
- POSTTRACE(tfc, sfc, " data sent");
-}
-
-static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc) {
- if (sfc->announced_window_delta > 0) {
- tfc->announced_stream_total_over_incoming_window -=
- sfc->announced_window_delta;
- } else {
- tfc->announced_stream_total_under_incoming_window +=
- -sfc->announced_window_delta;
- }
-}
-
-static void announced_window_delta_postupdate(
- grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
- if (sfc->announced_window_delta > 0) {
- tfc->announced_stream_total_over_incoming_window +=
- sfc->announced_window_delta;
- } else {
- tfc->announced_stream_total_under_incoming_window -=
- -sfc->announced_window_delta;
+TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx,
+ const grpc_chttp2_transport* t,
+ bool enable_bdp_probe)
+ : t_(t),
+ enable_bdp_probe_(enable_bdp_probe),
+ bdp_estimator_(t->peer_string),
+ pid_controller_(grpc_core::PidController::Args()
+ .set_gain_p(4)
+ .set_gain_i(8)
+ .set_gain_d(0)
+ .set_initial_control_value(TargetLogBdp())
+ .set_min_control_value(-1)
+ .set_max_control_value(25)
+ .set_integral_range(10)),
+ last_pid_update_(grpc_exec_ctx_now(exec_ctx)) {}
+
+uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
+ FlowControlTrace trace("t updt sent", this, nullptr);
+ const uint32_t target_announced_window = target_window();
+ if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
+ announced_window_ != target_announced_window) {
+ const uint32_t announce = (uint32_t)GPR_CLAMP(
+ target_announced_window - announced_window_, 0, UINT32_MAX);
+ announced_window_ += announce;
+ return announce;
}
+ return 0;
}
-// We have received data from the wire. We must track this in our own flow
-// control bookkeeping.
-// Returns an error if the incoming frame violates our flow control.
-grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc,
- int64_t incoming_frame_size) {
- uint32_t sent_init_window =
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- uint32_t acked_init_window =
- tfc->t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- PRETRACE(tfc, sfc);
- if (incoming_frame_size > tfc->announced_window) {
+grpc_error* TransportFlowControl::ValidateRecvData(
+ int64_t incoming_frame_size) {
+ if (incoming_frame_size > announced_window_) {
char* msg;
gpr_asprintf(&msg,
"frame of size %" PRId64 " overflows local window of %" PRId64,
- incoming_frame_size, tfc->announced_window);
+ incoming_frame_size, announced_window_);
grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
return err;
}
+ return GRPC_ERROR_NONE;
+}
- if (sfc != NULL) {
- int64_t acked_stream_window =
- sfc->announced_window_delta + acked_init_window;
- int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window;
- if (incoming_frame_size > acked_stream_window) {
- if (incoming_frame_size <= sent_stream_window) {
- gpr_log(
- GPR_ERROR,
- "Incoming frame of size %" PRId64
- " exceeds local window size of %" PRId64
- ".\n"
- "The (un-acked, future) window size would be %" PRId64
- " which is not exceeded.\n"
- "This would usually cause a disconnection, but allowing it due to"
- "broken HTTP2 implementations in the wild.\n"
- "See (for example) https://github.com/netty/netty/issues/6520.",
- incoming_frame_size, acked_stream_window, sent_stream_window);
- } else {
- char* msg;
- gpr_asprintf(&msg, "frame of size %" PRId64
- " overflows local window of %" PRId64,
- incoming_frame_size, acked_stream_window);
- grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- gpr_free(msg);
- return err;
- }
- }
-
- announced_window_delta_preupdate(tfc, sfc);
- sfc->announced_window_delta -= incoming_frame_size;
- announced_window_delta_postupdate(tfc, sfc);
- sfc->local_window_delta -= incoming_frame_size;
- }
+StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc,
+ const grpc_chttp2_stream* s)
+ : tfc_(tfc), s_(s) {}
- tfc->announced_window -= incoming_frame_size;
+grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) {
+ FlowControlTrace trace(" data recv", tfc_, this);
- POSTTRACE(tfc, sfc, " data recv");
- return GRPC_ERROR_NONE;
-}
+ grpc_error* error = GRPC_ERROR_NONE;
+ error = tfc_->ValidateRecvData(incoming_frame_size);
+ if (error != GRPC_ERROR_NONE) return error;
-// Returns a non zero announce integer if we should send a transport window
-// update
-uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
- grpc_chttp2_transport_flowctl* tfc, bool writing_anyway) {
- PRETRACE(tfc, NULL);
- uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
- uint32_t threshold_to_send_transport_window_update =
- tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4
- : target_announced_window / 2;
- if ((writing_anyway ||
- tfc->announced_window <= threshold_to_send_transport_window_update) &&
- tfc->announced_window != target_announced_window) {
- uint32_t announce = (uint32_t)GPR_CLAMP(
- target_announced_window - tfc->announced_window, 0, UINT32_MAX);
- tfc->announced_window += announce;
- POSTTRACE(tfc, NULL, "t updt sent");
- return announce;
+ uint32_t sent_init_window =
+ tfc_->transport()->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ uint32_t acked_init_window =
+ tfc_->transport()->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+
+ int64_t acked_stream_window = announced_window_delta_ + acked_init_window;
+ int64_t sent_stream_window = announced_window_delta_ + sent_init_window;
+ if (incoming_frame_size > acked_stream_window) {
+ if (incoming_frame_size <= sent_stream_window) {
+ gpr_log(GPR_ERROR,
+ "Incoming frame of size %" PRId64
+ " exceeds local window size of %" PRId64
+ ".\n"
+ "The (un-acked, future) window size would be %" PRId64
+ " which is not exceeded.\n"
+ "This would usually cause a disconnection, but allowing it due to"
+ "broken HTTP2 implementations in the wild.\n"
+ "See (for example) https://github.com/netty/netty/issues/6520.",
+ incoming_frame_size, acked_stream_window, sent_stream_window);
+ } else {
+ char* msg;
+ gpr_asprintf(&msg, "frame of size %" PRId64
+ " overflows local window of %" PRId64,
+ incoming_frame_size, acked_stream_window);
+ grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ return err;
+ }
}
- GRPC_FLOW_CONTROL_IF_TRACING(
- gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc,
- tfc->t->is_client ? "cli" : "svr"));
- return 0;
+
+ UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size);
+ local_window_delta_ -= incoming_frame_size;
+ tfc_->CommitRecvData(incoming_frame_size);
+ return GRPC_ERROR_NONE;
}
-// Returns a non zero announce integer if we should send a stream window update
-uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
- grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
- PRETRACE(tfc, sfc);
- if (sfc->local_window_delta > sfc->announced_window_delta) {
+uint32_t StreamFlowControl::MaybeSendUpdate() {
+ FlowControlTrace trace("s updt sent", tfc_, this);
+ if (local_window_delta_ > announced_window_delta_) {
uint32_t announce = (uint32_t)GPR_CLAMP(
- sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX);
- announced_window_delta_preupdate(tfc, sfc);
- sfc->announced_window_delta += announce;
- announced_window_delta_postupdate(tfc, sfc);
- POSTTRACE(tfc, sfc, "s updt sent");
+ local_window_delta_ - announced_window_delta_, 0, UINT32_MAX);
+ UpdateAnnouncedWindowDelta(tfc_, announce);
return announce;
}
- GRPC_FLOW_CONTROL_IF_TRACING(
- gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc,
- sfc->s->id, tfc->t->is_client ? "cli" : "svr"));
return 0;
}
-// we have received a WINDOW_UPDATE frame for a transport
-void grpc_chttp2_flowctl_recv_transport_update(
- grpc_chttp2_transport_flowctl* tfc, uint32_t size) {
- PRETRACE(tfc, NULL);
- tfc->remote_window += size;
- POSTTRACE(tfc, NULL, "t updt recv");
-}
-
-// we have received a WINDOW_UPDATE frame for a stream
-void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc,
- uint32_t size) {
- PRETRACE(tfc, sfc);
- sfc->remote_window_delta += size;
- POSTTRACE(tfc, sfc, "s updt recv");
-}
-
-void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc,
- size_t max_size_hint,
- size_t have_already) {
- PRETRACE(tfc, sfc);
+void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint,
+ size_t have_already) {
+ FlowControlTrace trace("app st recv", tfc_, this);
uint32_t max_recv_bytes;
uint32_t sent_init_window =
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ tfc_->transport()->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */
if (max_size_hint >= UINT32_MAX - sent_init_window) {
@@ -363,68 +274,18 @@ void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
- if (sfc->local_window_delta < max_recv_bytes) {
+ if (local_window_delta_ < max_recv_bytes) {
uint32_t add_max_recv_bytes =
- (uint32_t)(max_recv_bytes - sfc->local_window_delta);
- sfc->local_window_delta += add_max_recv_bytes;
- }
- POSTTRACE(tfc, sfc, "app st recv");
-}
-
-void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc) {
- announced_window_delta_preupdate(tfc, sfc);
-}
-
-// Returns an urgency with which to make an update
-static grpc_chttp2_flowctl_urgency delta_is_significant(
- const grpc_chttp2_transport_flowctl* tfc, int32_t value,
- grpc_chttp2_setting_id setting_id) {
- int64_t delta = (int64_t)value -
- (int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id];
- // TODO(ncteisen): tune this
- if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
- return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
- } else {
- return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED;
+ (uint32_t)(max_recv_bytes - local_window_delta_);
+ local_window_delta_ += add_max_recv_bytes;
}
}
-// Takes in a target and uses the pid controller to return a stabilized
-// guess at the new bdp.
-static double get_pid_controller_guess(grpc_exec_ctx* exec_ctx,
- grpc_chttp2_transport_flowctl* tfc,
- double target) {
- grpc_millis now = grpc_exec_ctx_now(exec_ctx);
- if (!tfc->pid_controller_initialized) {
- tfc->last_pid_update = now;
- tfc->pid_controller_initialized = true;
- grpc_pid_controller_args args;
- memset(&args, 0, sizeof(args));
- args.gain_p = 4;
- args.gain_i = 8;
- args.gain_d = 0;
- args.initial_control_value = target;
- args.min_control_value = -1;
- args.max_control_value = 25;
- args.integral_range = 10;
- grpc_pid_controller_init(&tfc->pid_controller, args);
- return pow(2, target);
- }
- double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller);
- double dt = (double)(now - tfc->last_pid_update) * 1e-3;
- double log2_bdp_guess =
- grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt);
- tfc->last_pid_update = now;
- return pow(2, log2_bdp_guess);
-}
-
// Take in a target and modifies it based on the memory pressure of the system
-static double get_target_under_memory_pressure(
- grpc_chttp2_transport_flowctl* tfc, double target) {
+static double AdjustForMemoryPressure(grpc_resource_quota* quota,
+ double target) {
// do not increase window under heavy memory pressure.
- double memory_pressure = grpc_resource_quota_get_memory_pressure(
- grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep)));
+ double memory_pressure = grpc_resource_quota_get_memory_pressure(quota);
static const double kLowMemPressure = 0.1;
static const double kZeroTarget = 22;
static const double kHighMemPressure = 0.8;
@@ -439,77 +300,82 @@ static double get_target_under_memory_pressure(
return target;
}
-grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
- grpc_exec_ctx* exec_ctx, grpc_chttp2_transport_flowctl* tfc,
- grpc_chttp2_stream_flowctl* sfc) {
- grpc_chttp2_flowctl_action action;
- memset(&action, 0, sizeof(action));
+double TransportFlowControl::TargetLogBdp() {
+ return AdjustForMemoryPressure(
+ grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
+ 1 + log2(bdp_estimator_.EstimateBdp()));
+}
+
+double TransportFlowControl::SmoothLogBdp(grpc_exec_ctx* exec_ctx,
+ double value) {
+ grpc_millis now = grpc_exec_ctx_now(exec_ctx);
+ double bdp_error = value - pid_controller_.last_control_value();
+ const double dt = (double)(now - last_pid_update_) * 1e-3;
+ last_pid_update_ = now;
+ return pid_controller_.Update(bdp_error, dt);
+}
+
+FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
+ int32_t value, grpc_chttp2_setting_id setting_id) {
+ int64_t delta =
+ (int64_t)value - (int64_t)t_->settings[GRPC_LOCAL_SETTINGS][setting_id];
// TODO(ncteisen): tune this
- if (sfc != NULL && !sfc->s->read_closed) {
- uint32_t sent_init_window =
- tfc->t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- if ((int64_t)sfc->local_window_delta >
- (int64_t)sfc->announced_window_delta &&
- (int64_t)sfc->announced_window_delta + sent_init_window <=
- sent_init_window / 2) {
- action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
- } else if (sfc->local_window_delta > sfc->announced_window_delta) {
- action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
- }
+ if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
+ return FlowControlAction::Urgency::QUEUE_UPDATE;
+ } else {
+ return FlowControlAction::Urgency::NO_ACTION_NEEDED;
}
- if (tfc->enable_bdp_probe) {
- action.need_ping = tfc->bdp_estimator->NeedPing(exec_ctx);
+}
+FlowControlAction TransportFlowControl::PeriodicUpdate(
+ grpc_exec_ctx* exec_ctx) {
+ FlowControlAction action;
+ if (enable_bdp_probe_) {
// get bdp estimate and update initial_window accordingly.
- int64_t estimate = -1;
- if (tfc->bdp_estimator->EstimateBdp(&estimate)) {
- double target = 1 + log2((double)estimate);
-
- // target might change based on how much memory pressure we are under
- // TODO(ncteisen): experiment with setting target to be huge under low
- // memory pressure.
- target = get_target_under_memory_pressure(tfc, target);
-
- // run our target through the pid controller to stabilize change.
- // TODO(ncteisen): experiment with other controllers here.
- double bdp_guess = get_pid_controller_guess(exec_ctx, tfc, target);
-
- // Though initial window 'could' drop to 0, we keep the floor at 128
- tfc->target_initial_window_size =
- (int32_t)GPR_CLAMP(bdp_guess, 128, INT32_MAX);
-
- grpc_chttp2_flowctl_urgency init_window_update_urgency =
- delta_is_significant(tfc, tfc->target_initial_window_size,
- GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
- if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
- action.send_setting_update = init_window_update_urgency;
- action.initial_window_size = (uint32_t)tfc->target_initial_window_size;
- }
- }
+ // target might change based on how much memory pressure we are under
+ // TODO(ncteisen): experiment with setting target to be huge under low
+ // memory pressure.
+ const double target = pow(2, SmoothLogBdp(exec_ctx, TargetLogBdp()));
+
+ // Though initial window 'could' drop to 0, we keep the floor at 128
+ target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX);
+
+ action.set_send_initial_window_update(
+ DeltaUrgency(target_initial_window_size_,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
+ target_initial_window_size_);
// get bandwidth estimate and update max_frame accordingly.
- double bw_dbl = -1;
- if (tfc->bdp_estimator->EstimateBandwidth(&bw_dbl)) {
- // we target the max of BDP or bandwidth in microseconds.
- int32_t frame_size = (int32_t)GPR_CLAMP(
- GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
- tfc->target_initial_window_size),
- 16384, 16777215);
- grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant(
- tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE);
- if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
- if (frame_size_urgency > action.send_setting_update) {
- action.send_setting_update = frame_size_urgency;
- }
- action.max_frame_size = (uint32_t)frame_size;
- }
- }
+ double bw_dbl = bdp_estimator_.EstimateBandwidth();
+ // we target the max of BDP or bandwidth in microseconds.
+ int32_t frame_size = (int32_t)GPR_CLAMP(
+ GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
+ target_initial_window_size_),
+ 16384, 16777215);
+ action.set_send_max_frame_size_update(
+ DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
+ frame_size);
}
- uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
- if (tfc->announced_window < target_announced_window / 2) {
- action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
+ return UpdateAction(action);
+}
+
+FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
+ // TODO(ncteisen): tune this
+ if (!s_->read_closed) {
+ uint32_t sent_init_window =
+ tfc_->transport()->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ if (local_window_delta_ > announced_window_delta_ &&
+ announced_window_delta_ + sent_init_window <= sent_init_window / 2) {
+ action.set_send_stream_update(
+ FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
+ } else if (local_window_delta_ > announced_window_delta_) {
+ action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
+ }
}
- TRACEACTION(tfc, action);
+
return action;
}
+
+} // namespace chttp2
+} // namespace grpc_core
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h
new file mode 100644
index 0000000000..d5107d467b
--- /dev/null
+++ b/src/core/ext/transport/chttp2/transport/flow_control.h
@@ -0,0 +1,328 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
+
+#include <stdint.h>
+
+#include <grpc/support/useful.h>
+#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
+#include "src/core/lib/support/manual_constructor.h"
+#include "src/core/lib/transport/bdp_estimator.h"
+#include "src/core/lib/transport/pid_controller.h"
+
+struct grpc_chttp2_transport;
+struct grpc_chttp2_stream;
+
+extern "C" grpc_tracer_flag grpc_flowctl_trace;
+
+namespace grpc_core {
+namespace chttp2 {
+
+static constexpr uint32_t kDefaultWindow = 65535;
+
+class TransportFlowControl;
+class StreamFlowControl;
+
+class FlowControlAction {
+ public:
+ enum class Urgency : uint8_t {
+ // Nothing to be done.
+ NO_ACTION_NEEDED = 0,
+ // Initiate a write to update the initial window immediately.
+ UPDATE_IMMEDIATELY,
+ // Push the flow control update into a send buffer, to be sent
+ // out the next time a write is initiated.
+ QUEUE_UPDATE,
+ };
+
+ Urgency send_stream_update() const { return send_stream_update_; }
+ Urgency send_transport_update() const { return send_transport_update_; }
+ Urgency send_initial_window_update() const {
+ return send_initial_window_update_;
+ }
+ Urgency send_max_frame_size_update() const {
+ return send_max_frame_size_update_;
+ }
+ uint32_t initial_window_size() const { return initial_window_size_; }
+ uint32_t max_frame_size() const { return max_frame_size_; }
+
+ FlowControlAction& set_send_stream_update(Urgency u) {
+ send_stream_update_ = u;
+ return *this;
+ }
+ FlowControlAction& set_send_transport_update(Urgency u) {
+ send_transport_update_ = u;
+ return *this;
+ }
+ FlowControlAction& set_send_initial_window_update(Urgency u,
+ uint32_t update) {
+ send_initial_window_update_ = u;
+ initial_window_size_ = update;
+ return *this;
+ }
+ FlowControlAction& set_send_max_frame_size_update(Urgency u,
+ uint32_t update) {
+ send_max_frame_size_update_ = u;
+ max_frame_size_ = update;
+ return *this;
+ }
+
+ static const char* UrgencyString(Urgency u);
+ void Trace(grpc_chttp2_transport* t) const;
+
+ private:
+ Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
+ Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
+ Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
+ Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
+ uint32_t initial_window_size_ = 0;
+ uint32_t max_frame_size_ = 0;
+};
+
+class FlowControlTrace {
+ public:
+ FlowControlTrace(const char* reason, TransportFlowControl* tfc,
+ StreamFlowControl* sfc) {
+ if (enabled_) Init(reason, tfc, sfc);
+ }
+
+ ~FlowControlTrace() {
+ if (enabled_) Finish();
+ }
+
+ private:
+ void Init(const char* reason, TransportFlowControl* tfc,
+ StreamFlowControl* sfc);
+ void Finish();
+
+ const bool enabled_ = GRPC_TRACER_ON(grpc_flowctl_trace);
+
+ TransportFlowControl* tfc_;
+ StreamFlowControl* sfc_;
+ const char* reason_;
+ int64_t remote_window_;
+ int64_t target_window_;
+ int64_t announced_window_;
+ int64_t remote_window_delta_;
+ int64_t local_window_delta_;
+ int64_t announced_window_delta_;
+};
+
+class TransportFlowControl {
+ public:
+ TransportFlowControl(grpc_exec_ctx* exec_ctx, const grpc_chttp2_transport* t,
+ bool enable_bdp_probe);
+ ~TransportFlowControl() {}
+
+ bool bdp_probe() const { return enable_bdp_probe_; }
+
+ // returns an announce if we should send a transport update to our peer,
+ // else returns zero; writing_anyway indicates if a write would happen
+ // regardless of the send - if it is false and this function returns non-zero,
+ // this announce will cause a write to occur
+ uint32_t MaybeSendUpdate(bool writing_anyway);
+
+ // Reads the flow control data and returns and actionable struct that will
+ // tell chttp2 exactly what it needs to do
+ FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); }
+
+ // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
+ // to perform more complex flow control calculations and return an action
+ // to let chttp2 change its parameters
+ FlowControlAction PeriodicUpdate(grpc_exec_ctx* exec_ctx);
+
+ void StreamSentData(int64_t size) { remote_window_ -= size; }
+
+ grpc_error* ValidateRecvData(int64_t incoming_frame_size);
+ void CommitRecvData(int64_t incoming_frame_size) {
+ announced_window_ -= incoming_frame_size;
+ }
+
+ grpc_error* RecvData(int64_t incoming_frame_size) {
+ FlowControlTrace trace(" data recv", this, nullptr);
+ grpc_error* error = ValidateRecvData(incoming_frame_size);
+ if (error != GRPC_ERROR_NONE) return error;
+ CommitRecvData(incoming_frame_size);
+ return GRPC_ERROR_NONE;
+ }
+
+ // we have received a WINDOW_UPDATE frame for a transport
+ void RecvUpdate(uint32_t size) {
+ FlowControlTrace trace("t updt recv", this, nullptr);
+ remote_window_ += size;
+ }
+
+ int64_t remote_window() const { return remote_window_; }
+ int64_t target_window() const {
+ return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1),
+ announced_stream_total_over_incoming_window_ +
+ target_initial_window_size_);
+ }
+ int64_t announced_window() const { return announced_window_; }
+
+ const grpc_chttp2_transport* transport() const { return t_; }
+
+ void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
+ if (delta > 0) {
+ announced_stream_total_over_incoming_window_ -= delta;
+ } else {
+ announced_stream_total_under_incoming_window_ += -delta;
+ }
+ }
+
+ void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
+ if (delta > 0) {
+ announced_stream_total_over_incoming_window_ += delta;
+ } else {
+ announced_stream_total_under_incoming_window_ -= -delta;
+ }
+ }
+
+ BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
+
+ void TestOnlyForceHugeWindow() {
+ announced_window_ = 1024 * 1024 * 1024;
+ remote_window_ = 1024 * 1024 * 1024;
+ }
+
+ private:
+ double TargetLogBdp();
+ double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value);
+ FlowControlAction::Urgency DeltaUrgency(int32_t value,
+ grpc_chttp2_setting_id setting_id);
+
+ FlowControlAction UpdateAction(FlowControlAction action) {
+ if (announced_window_ < target_window() / 2) {
+ action.set_send_transport_update(
+ FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
+ }
+ return action;
+ }
+
+ const grpc_chttp2_transport* const t_;
+
+ /** Our bookkeeping for the remote peer's available window */
+ int64_t remote_window_ = kDefaultWindow;
+
+ /** calculating what we should give for local window:
+ we track the total amount of flow control over initial window size
+ across all streams: this is data that we want to receive right now (it
+ has an outstanding read)
+ and the total amount of flow control under initial window size across all
+ streams: this is data we've read early
+ we want to adjust incoming_window such that:
+ incoming_window = total_over - max(bdp - total_under, 0) */
+ int64_t announced_stream_total_over_incoming_window_ = 0;
+ int64_t announced_stream_total_under_incoming_window_ = 0;
+
+ /** This is out window according to what we have sent to our remote peer. The
+ * difference between this and target window is what we use to decide when
+ * to send WINDOW_UPDATE frames. */
+ int64_t announced_window_ = kDefaultWindow;
+
+ int32_t target_initial_window_size_ = kDefaultWindow;
+
+ /** should we probe bdp? */
+ const bool enable_bdp_probe_;
+
+ /* bdp estimation */
+ grpc_core::BdpEstimator bdp_estimator_;
+
+ /* pid controller */
+ grpc_core::PidController pid_controller_;
+ grpc_millis last_pid_update_ = 0;
+};
+
+class StreamFlowControl {
+ public:
+ StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
+ ~StreamFlowControl() {
+ tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
+ }
+
+ FlowControlAction UpdateAction(FlowControlAction action);
+ FlowControlAction MakeAction() { return UpdateAction(tfc_->MakeAction()); }
+
+ // we have sent data on the wire, we must track this in our bookkeeping for
+ // the remote peer's flow control.
+ void SentData(int64_t outgoing_frame_size) {
+ FlowControlTrace tracer(" data sent", tfc_, this);
+ tfc_->StreamSentData(outgoing_frame_size);
+ remote_window_delta_ -= outgoing_frame_size;
+ }
+
+ // we have received data from the wire
+ grpc_error* RecvData(int64_t incoming_frame_size);
+
+ // returns an announce if we should send a stream update to our peer, else
+ // returns zero
+ uint32_t MaybeSendUpdate();
+
+ // we have received a WINDOW_UPDATE frame for a stream
+ void RecvUpdate(uint32_t size) {
+ FlowControlTrace trace("s updt recv", tfc_, this);
+ remote_window_delta_ += size;
+ }
+
+ // the application is asking for a certain amount of bytes
+ void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already);
+
+ int64_t remote_window_delta() const { return remote_window_delta_; }
+ int64_t local_window_delta() const { return local_window_delta_; }
+ int64_t announced_window_delta() const { return announced_window_delta_; }
+
+ const grpc_chttp2_stream* stream() const { return s_; }
+
+ void TestOnlyForceHugeWindow() {
+ announced_window_delta_ = 1024 * 1024 * 1024;
+ local_window_delta_ = 1024 * 1024 * 1024;
+ remote_window_delta_ = 1024 * 1024 * 1024;
+ }
+
+ private:
+ TransportFlowControl* const tfc_;
+ const grpc_chttp2_stream* const s_;
+
+ void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
+ tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
+ announced_window_delta_ += change;
+ tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
+ }
+
+ /** window available for us to send to peer, over or under the initial
+ * window
+ * size of the transport... ie:
+ * remote_window = remote_window_delta + transport.initial_window_size */
+ int64_t remote_window_delta_ = 0;
+
+ /** window available for peer to send to us (as a delta on
+ * transport.initial_window_size)
+ * local_window = local_window_delta + transport.initial_window_size */
+ int64_t local_window_delta_ = 0;
+
+ /** window available for peer to send to us over this stream that we have
+ * announced to the peer */
+ int64_t announced_window_delta_ = 0;
+};
+
+} // namespace chttp2
+} // namespace grpc_core
+
+#endif
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc
index 8ee12aeceb..82906d3db4 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc
@@ -202,12 +202,12 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[id] != parser->value) {
- t->flow_control.initial_window_update +=
+ t->initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[id];
if (grpc_http_trace.enabled() || grpc_flowctl_trace.enabled()) {
gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change",
t, t->is_client ? "cli" : "svr",
- (int)t->flow_control.initial_window_update);
+ (int)t->initial_window_update);
}
}
parser->incoming_settings[id] = parser->value;
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.cc b/src/core/ext/transport/chttp2/transport/frame_window_update.cc
index c9ab8d1b50..15eaf59285 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.cc
@@ -96,8 +96,7 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
if (t->incoming_stream_id != 0) {
if (s != NULL) {
- grpc_chttp2_flowctl_recv_stream_update(
- &t->flow_control, &s->flow_control, received_update);
+ s->flow_control->RecvUpdate(received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
@@ -106,10 +105,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
}
}
} else {
- bool was_zero = t->flow_control.remote_window <= 0;
- grpc_chttp2_flowctl_recv_transport_update(&t->flow_control,
- received_update);
- bool is_zero = t->flow_control.remote_window <= 0;
+ bool was_zero = t->flow_control->remote_window() <= 0;
+ t->flow_control->RecvUpdate(received_update);
+ bool is_zero = t->flow_control->remote_window() <= 0;
if (was_zero && !is_zero) {
grpc_chttp2_initiate_write(
exec_ctx, t,
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
index 098d61cc6f..ddb75fc9e7 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
@@ -178,24 +178,19 @@ static void evict_entry(grpc_chttp2_hpack_compressor *c) {
c->table_elems--;
}
-/* add an element to the decoder table */
-static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
- grpc_mdelem elem) {
- GPR_ASSERT(GRPC_MDELEM_IS_INTERNED(elem));
-
- uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem));
- uint32_t value_hash = grpc_slice_hash(GRPC_MDVALUE(elem));
- uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash);
+// Reserve space in table for the new element, evict entries if needed.
+// Return the new index of the element. Return 0 to indicate not adding to
+// table.
+static uint32_t prepare_space_for_new_elem(grpc_chttp2_hpack_compressor *c,
+ size_t elem_size) {
uint32_t new_index = c->tail_remote_index + c->table_elems + 1;
- size_t elem_size = grpc_mdelem_get_size_in_hpack_table(elem);
-
GPR_ASSERT(elem_size < 65536);
if (elem_size > c->max_table_size) {
while (c->table_size > 0) {
evict_entry(c);
}
- return;
+ return 0;
}
/* Reserve space for this element in the remote table: if this overflows
@@ -209,37 +204,26 @@ static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
c->table_size = (uint16_t)(c->table_size + elem_size);
c->table_elems++;
- /* Store this element into {entries,indices}_elem */
- if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem)) {
- /* already there: update with new index */
- c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- } else if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)],
- elem)) {
- /* already there (cuckoo): update with new index */
- c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_2(elem_hash)])) {
- /* not there, but a free element: add */
- c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
- c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_3(elem_hash)])) {
- /* not there (cuckoo), but a free element: add */
- c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
- c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] <
- c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) {
- /* not there: replace oldest */
- GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
- c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
- c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- } else {
- /* not there: replace oldest */
- GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
- c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
- c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
+ return new_index;
+}
+
+/* dummy function */
+static void add_nothing(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c, grpc_mdelem elem,
+ size_t elem_size) {}
+
+// Add a key to the dynamic table. Both key and value will be added to table at
+// the decoder.
+static void add_key_with_index(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem elem, uint32_t new_index) {
+ if (new_index == 0) {
+ return;
}
- /* do exactly the same for the key (so we can find by that again too) */
+ uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem));
+ /* Store the key into {entries,indices}_keys */
if (grpc_slice_eq(c->entries_keys[HASH_FRAGMENT_2(key_hash)],
GRPC_MDKEY(elem))) {
c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index;
@@ -272,6 +256,63 @@ static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
}
}
+/* add an element to the decoder table */
+static void add_elem_with_index(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem elem, uint32_t new_index) {
+ if (new_index == 0) {
+ return;
+ }
+ GPR_ASSERT(GRPC_MDELEM_IS_INTERNED(elem));
+
+ uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem));
+ uint32_t value_hash = grpc_slice_hash(GRPC_MDVALUE(elem));
+ uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash);
+
+ /* Store this element into {entries,indices}_elem */
+ if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem)) {
+ /* already there: update with new index */
+ c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
+ } else if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)],
+ elem)) {
+ /* already there (cuckoo): update with new index */
+ c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
+ } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_2(elem_hash)])) {
+ /* not there, but a free element: add */
+ c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
+ c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
+ } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_3(elem_hash)])) {
+ /* not there (cuckoo), but a free element: add */
+ c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
+ c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
+ } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] <
+ c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) {
+ /* not there: replace oldest */
+ GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
+ c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
+ c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
+ } else {
+ /* not there: replace oldest */
+ GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
+ c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
+ c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
+ }
+
+ add_key_with_index(exec_ctx, c, elem, new_index);
+}
+
+static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem elem, size_t elem_size) {
+ uint32_t new_index = prepare_space_for_new_elem(c, elem_size);
+ add_elem_with_index(exec_ctx, c, elem, new_index);
+}
+
+static void add_key(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem elem, size_t elem_size) {
+ uint32_t new_index = prepare_space_for_new_elem(c, elem_size);
+ add_key_with_index(exec_ctx, c, elem, new_index);
+}
+
static void emit_indexed(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c, uint32_t elem_index,
framer_state *st) {
@@ -363,7 +404,9 @@ static void emit_lithdr_noidx(grpc_exec_ctx *exec_ctx,
static void emit_lithdr_incidx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c,
- grpc_mdelem elem, framer_state *st) {
+ uint32_t unused_index, grpc_mdelem elem,
+ framer_state *st) {
+ GPR_ASSERT(unused_index == 0);
GRPC_STATS_INC_HPACK_SEND_LITHDR_INCIDX_V(exec_ctx);
GRPC_STATS_INC_HPACK_SEND_UNCOMPRESSED(exec_ctx);
uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem));
@@ -385,7 +428,9 @@ static void emit_lithdr_incidx_v(grpc_exec_ctx *exec_ctx,
static void emit_lithdr_noidx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c,
- grpc_mdelem elem, framer_state *st) {
+ uint32_t unused_index, grpc_mdelem elem,
+ framer_state *st) {
+ GPR_ASSERT(unused_index == 0);
GRPC_STATS_INC_HPACK_SEND_LITHDR_NOTIDX_V(exec_ctx);
GRPC_STATS_INC_HPACK_SEND_UNCOMPRESSED(exec_ctx);
uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem));
@@ -430,9 +475,14 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
"Reserved header (colon-prefixed) happening after regular ones.");
}
- if (grpc_http_trace.enabled() && !GRPC_MDELEM_IS_INTERNED(elem)) {
+ if (grpc_http_trace.enabled()) {
char *k = grpc_slice_to_c_string(GRPC_MDKEY(elem));
- char *v = grpc_slice_to_c_string(GRPC_MDVALUE(elem));
+ char *v = NULL;
+ if (grpc_is_binary_header(GRPC_MDKEY(elem))) {
+ v = grpc_dump_slice(GRPC_MDVALUE(elem), GPR_DUMP_HEX);
+ } else {
+ v = grpc_slice_to_c_string(GRPC_MDVALUE(elem));
+ }
gpr_log(
GPR_DEBUG,
"Encode: '%s: %s', elem_interned=%d [%d], k_interned=%d, v_interned=%d",
@@ -442,64 +492,70 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
gpr_free(k);
gpr_free(v);
}
- if (!GRPC_MDELEM_IS_INTERNED(elem)) {
- emit_lithdr_noidx_v(exec_ctx, c, elem, st);
+
+ bool elem_interned = GRPC_MDELEM_IS_INTERNED(elem);
+ bool key_interned = elem_interned || grpc_slice_is_interned(GRPC_MDKEY(elem));
+
+ // Key is not interned, emit literals.
+ if (!key_interned) {
+ emit_lithdr_noidx_v(exec_ctx, c, 0, elem, st);
return;
}
- uint32_t key_hash;
- uint32_t value_hash;
- uint32_t elem_hash;
- size_t decoder_space_usage;
- uint32_t indices_key;
- int should_add_elem;
+ uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem));
+ uint32_t elem_hash = 0;
- key_hash = grpc_slice_hash(GRPC_MDKEY(elem));
- value_hash = grpc_slice_hash(GRPC_MDVALUE(elem));
- elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash);
+ if (elem_interned) {
+ uint32_t value_hash = grpc_slice_hash(GRPC_MDVALUE(elem));
+ elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash);
- inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems);
+ inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum,
+ c->filter_elems);
- /* is this elem currently in the decoders table? */
+ /* is this elem currently in the decoders table? */
- if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem) &&
- c->indices_elems[HASH_FRAGMENT_2(elem_hash)] > c->tail_remote_index) {
- /* HIT: complete element (first cuckoo hash) */
- emit_indexed(exec_ctx, c,
- dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), st);
- return;
- }
+ if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem) &&
+ c->indices_elems[HASH_FRAGMENT_2(elem_hash)] > c->tail_remote_index) {
+ /* HIT: complete element (first cuckoo hash) */
+ emit_indexed(exec_ctx, c,
+ dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), st);
+ return;
+ }
- if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)], elem) &&
- c->indices_elems[HASH_FRAGMENT_3(elem_hash)] > c->tail_remote_index) {
- /* HIT: complete element (second cuckoo hash) */
- emit_indexed(exec_ctx, c,
- dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), st);
- return;
+ if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)], elem) &&
+ c->indices_elems[HASH_FRAGMENT_3(elem_hash)] > c->tail_remote_index) {
+ /* HIT: complete element (second cuckoo hash) */
+ emit_indexed(exec_ctx, c,
+ dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), st);
+ return;
+ }
}
+ uint32_t indices_key;
+
/* should this elem be in the table? */
- decoder_space_usage = grpc_mdelem_get_size_in_hpack_table(elem);
- should_add_elem = decoder_space_usage < MAX_DECODER_SPACE_USAGE &&
- c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >=
- c->filter_elems_sum / ONE_ON_ADD_PROBABILITY;
+ size_t decoder_space_usage =
+ grpc_mdelem_get_size_in_hpack_table(elem, st->use_true_binary_metadata);
+ bool should_add_elem = elem_interned &&
+ decoder_space_usage < MAX_DECODER_SPACE_USAGE &&
+ c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >=
+ c->filter_elems_sum / ONE_ON_ADD_PROBABILITY;
+ void (*maybe_add)(grpc_exec_ctx *, grpc_chttp2_hpack_compressor *,
+ grpc_mdelem, size_t) =
+ should_add_elem ? add_elem : add_nothing;
+ void (*emit)(grpc_exec_ctx *, grpc_chttp2_hpack_compressor *, uint32_t,
+ grpc_mdelem, framer_state *) =
+ should_add_elem ? emit_lithdr_incidx : emit_lithdr_noidx;
/* no hits for the elem... maybe there's a key? */
-
indices_key = c->indices_keys[HASH_FRAGMENT_2(key_hash)];
if (grpc_slice_eq(c->entries_keys[HASH_FRAGMENT_2(key_hash)],
GRPC_MDKEY(elem)) &&
indices_key > c->tail_remote_index) {
/* HIT: key (first cuckoo hash) */
- if (should_add_elem) {
- emit_lithdr_incidx(exec_ctx, c, dynidx(c, indices_key), elem, st);
- add_elem(exec_ctx, c, elem);
- return;
- } else {
- emit_lithdr_noidx(exec_ctx, c, dynidx(c, indices_key), elem, st);
- return;
- }
- GPR_UNREACHABLE_CODE(return );
+ emit(exec_ctx, c, dynidx(c, indices_key), elem, st);
+ maybe_add(exec_ctx, c, elem, decoder_space_usage);
+ return;
}
indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)];
@@ -507,28 +563,20 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
GRPC_MDKEY(elem)) &&
indices_key > c->tail_remote_index) {
/* HIT: key (first cuckoo hash) */
- if (should_add_elem) {
- emit_lithdr_incidx(exec_ctx, c, dynidx(c, indices_key), elem, st);
- add_elem(exec_ctx, c, elem);
- return;
- } else {
- emit_lithdr_noidx(exec_ctx, c, dynidx(c, indices_key), elem, st);
- return;
- }
- GPR_UNREACHABLE_CODE(return );
+ emit(exec_ctx, c, dynidx(c, indices_key), elem, st);
+ maybe_add(exec_ctx, c, elem, decoder_space_usage);
+ return;
}
/* no elem, key in the table... fall back to literal emission */
-
- if (should_add_elem) {
- emit_lithdr_incidx_v(exec_ctx, c, elem, st);
- add_elem(exec_ctx, c, elem);
- return;
- } else {
- emit_lithdr_noidx_v(exec_ctx, c, elem, st);
- return;
- }
- GPR_UNREACHABLE_CODE(return );
+ bool should_add_key =
+ !elem_interned && decoder_space_usage < MAX_DECODER_SPACE_USAGE;
+ emit = (should_add_elem || should_add_key) ? emit_lithdr_incidx_v
+ : emit_lithdr_noidx_v;
+ maybe_add =
+ should_add_elem ? add_elem : (should_add_key ? add_key : add_nothing);
+ emit(exec_ctx, c, 0, elem, st);
+ maybe_add(exec_ctx, c, elem, decoder_space_usage);
}
#define STRLEN_LIT(x) (sizeof(x) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 55cb55de56..f1d1334e84 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -22,6 +22,7 @@
#include <assert.h>
#include <stdbool.h>
+#include "src/core/ext/transport/chttp2/transport/flow_control.h"
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
@@ -38,9 +39,7 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/manual_constructor.h"
-#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
-#include "src/core/lib/transport/pid_controller.h"
#include "src/core/lib/transport/transport_impl.h"
#ifdef __cplusplus
@@ -238,48 +237,6 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;
-typedef struct {
- /** initial window change. This is tracked as we parse settings frames from
- * the remote peer. If there is a positive delta, then we will make all
- * streams readable since they may have become unstalled */
- int64_t initial_window_update;
-
- /** Our bookkeeping for the remote peer's available window */
- int64_t remote_window;
-
- /** calculating what we should give for local window:
- we track the total amount of flow control over initial window size
- across all streams: this is data that we want to receive right now (it
- has an outstanding read)
- and the total amount of flow control under initial window size across all
- streams: this is data we've read early
- we want to adjust incoming_window such that:
- incoming_window = total_over - max(bdp - total_under, 0) */
- int64_t announced_stream_total_over_incoming_window;
- int64_t announced_stream_total_under_incoming_window;
-
- /** This is out window according to what we have sent to our remote peer. The
- * difference between this and target window is what we use to decide when
- * to send WINDOW_UPDATE frames. */
- int64_t announced_window;
-
- int32_t target_initial_window_size;
-
- /** should we probe bdp? */
- bool enable_bdp_probe;
-
- /* bdp estimation */
- grpc_core::ManualConstructor<grpc_core::BdpEstimator> bdp_estimator;
-
- /* pid controller */
- bool pid_controller_initialized;
- grpc_pid_controller pid_controller;
- grpc_millis last_pid_update;
-
- // pointer back to transport for tracing
- const grpc_chttp2_transport *t;
-} grpc_chttp2_transport_flowctl;
-
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
gpr_refcount refs;
@@ -298,7 +255,7 @@ struct grpc_chttp2_transport {
/** is the transport destroying itself? */
uint8_t destroying;
/** has the upper layer closed the transport? */
- uint8_t closed;
+ grpc_error *closed_with_error;
/** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading;
@@ -340,7 +297,7 @@ struct grpc_chttp2_transport {
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
/** is this a client? */
- uint8_t is_client;
+ bool is_client;
/** data to write next write */
grpc_slice_buffer qbuf;
@@ -350,14 +307,14 @@ struct grpc_chttp2_transport {
uint32_t write_buffer_size;
/** have we seen a goaway */
- uint8_t seen_goaway;
+ bool seen_goaway;
/** have we sent a goaway */
grpc_chttp2_sent_goaway_state sent_goaway_state;
/** are the local settings dirty and need to be sent? */
- uint8_t dirtied_local_settings;
+ bool dirtied_local_settings;
/** have local settings been sent? */
- uint8_t sent_local_settings;
+ bool sent_local_settings;
/** bitmask of setting indexes to send out */
uint32_t force_send_settings;
/** settings values */
@@ -395,7 +352,12 @@ struct grpc_chttp2_transport {
/** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser;
- grpc_chttp2_transport_flowctl flow_control;
+ grpc_core::ManualConstructor<grpc_core::chttp2::TransportFlowControl>
+ flow_control;
+ /** initial window change. This is tracked as we parse settings frames from
+ * the remote peer. If there is a positive delta, then we will make all
+ * streams readable since they may have become unstalled */
+ int64_t initial_window_update = 0;
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
@@ -422,6 +384,7 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool;
/* bdp estimator */
+ grpc_closure next_bdp_ping_timer_expired_locked;
grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked;
@@ -442,6 +405,10 @@ struct grpc_chttp2_transport {
/** destructive cleanup closure */
grpc_closure destructive_reclaimer_locked;
+ /* next bdp ping timer */
+ bool have_next_bdp_ping_timer;
+ grpc_timer next_bdp_ping_timer;
+
/* keep-alive ping support */
/** Closure to initialize a keepalive ping */
grpc_closure init_keepalive_ping_locked;
@@ -472,25 +439,6 @@ typedef enum {
GPRC_METADATA_PUBLISHED_AT_CLOSE
} grpc_published_metadata_method;
-typedef struct {
- /** window available for us to send to peer, over or under the initial window
- * size of the transport... ie:
- * remote_window = remote_window_delta + transport.initial_window_size */
- int64_t remote_window_delta;
-
- /** window available for peer to send to us (as a delta on
- * transport.initial_window_size)
- * local_window = local_window_delta + transport.initial_window_size */
- int64_t local_window_delta;
-
- /** window available for peer to send to us over this stream that we have
- * announced to the peer */
- int64_t announced_window_delta;
-
- // read only pointer back to stream for data
- const grpc_chttp2_stream *s;
-} grpc_chttp2_stream_flowctl;
-
struct grpc_chttp2_stream {
grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
@@ -584,7 +532,8 @@ struct grpc_chttp2_stream {
bool sent_initial_metadata;
bool sent_trailing_metadata;
- grpc_chttp2_stream_flowctl flow_control;
+ grpc_core::ManualConstructor<grpc_core::chttp2::StreamFlowControl>
+ flow_control;
grpc_slice_buffer flow_controlled_buffer;
@@ -695,74 +644,10 @@ bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
/********* Flow Control ***************/
-// we have sent data on the wire
-void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc,
- int64_t size);
-
-// we have received data from the wire
-grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc,
- int64_t incoming_frame_size);
-
-// returns an announce if we should send a transport update to our peer,
-// else returns zero
-uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
- grpc_chttp2_transport_flowctl *tfc, bool writing_anyway);
-
-// returns an announce if we should send a stream update to our peer, else
-// returns zero
-uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
- grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
-
-// we have received a WINDOW_UPDATE frame for a transport
-void grpc_chttp2_flowctl_recv_transport_update(
- grpc_chttp2_transport_flowctl *tfc, uint32_t size);
-
-// we have received a WINDOW_UPDATE frame for a stream
-void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc,
- uint32_t size);
-
-// the application is asking for a certain amount of bytes
-void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc,
- size_t max_size_hint,
- size_t have_already);
-
-void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc);
-
-typedef enum {
- // Nothing to be done.
- GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED = 0,
- // Initiate a write to update the initial window immediately.
- GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY,
- // Push the flow control update into a send buffer, to be sent
- // out the next time a write is initiated.
- GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE,
-} grpc_chttp2_flowctl_urgency;
-
-typedef struct {
- grpc_chttp2_flowctl_urgency send_stream_update;
- grpc_chttp2_flowctl_urgency send_transport_update;
- grpc_chttp2_flowctl_urgency send_setting_update;
- uint32_t initial_window_size;
- uint32_t max_frame_size;
- bool need_ping;
-} grpc_chttp2_flowctl_action;
-
-// Reads the flow control data and returns and actionable struct that will tell
-// chttp2 exactly what it needs to do
-grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_flowctl *tfc,
- grpc_chttp2_stream_flowctl *sfc);
-
// Takes in a flow control action and performs all the needed operations.
-void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_flowctl_action action,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
+void grpc_chttp2_act_on_flowctl_action(
+ grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action,
+ grpc_chttp2_transport *t, grpc_chttp2_stream *s);
/********* End of Flow Control ***************/
@@ -796,20 +681,10 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
extern grpc_core::Tracer grpc_http_trace;
extern grpc_core::Tracer grpc_flowctl_trace;
-#ifndef NDEBUG
-#define GRPC_FLOW_CONTROL_IF_TRACING(stmt) \
- if (!(grpc_flowctl_trace.enabled())) \
- ; \
- else \
- stmt
-#else
-#define GRPC_FLOW_CONTROL_IF_TRACING(stmt)
-#endif
-
-#define GRPC_CHTTP2_IF_TRACING(stmt) \
- if (!(grpc_http_trace.enabled())) \
- ; \
- else \
+#define GRPC_CHTTP2_IF_TRACING(stmt) \
+ if (!(GRPC_TRACER_ON(grpc_http_trace))) \
+ ; \
+ else \
stmt
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc
index a766ac4c3f..43bf8fd254 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.cc
+++ b/src/core/ext/transport/chttp2/transport/parsing.cc
@@ -355,14 +355,15 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s =
grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
grpc_error *err = GRPC_ERROR_NONE;
- err = grpc_chttp2_flowctl_recv_data(&t->flow_control,
- s == NULL ? NULL : &s->flow_control,
- t->incoming_frame_size);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx,
- grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
- s == NULL ? NULL : &s->flow_control),
- t, s);
+ grpc_core::chttp2::FlowControlAction action;
+ if (s == nullptr) {
+ err = t->flow_control->RecvData(t->incoming_frame_size);
+ action = t->flow_control->MakeAction();
+ } else {
+ err = s->flow_control->RecvData(t->incoming_frame_size);
+ action = s->flow_control->MakeAction();
+ }
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, action, t, s);
if (err != GRPC_ERROR_NONE) {
goto error_handler;
}
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 34c9bc334a..25f7c858f8 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -142,13 +142,13 @@ static void report_stall(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
s->flow_controlled_bytes_flowed,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
- t->flow_control.remote_window,
+ t->flow_control->remote_window(),
(uint32_t)GPR_MAX(
0,
- s->flow_control.remote_window_delta +
+ s->flow_control->remote_window_delta() +
(int64_t)t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
- s->flow_control.remote_window_delta);
+ s->flow_control->remote_window_delta());
}
static bool stream_ref_if_not_destroyed(gpr_refcount *r) {
@@ -212,8 +212,7 @@ class WriteContext {
void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) {
uint32_t transport_announce =
- grpc_chttp2_flowctl_maybe_send_transport_update(&t_->flow_control,
- t_->outbuf.count > 0);
+ t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
if (transport_announce) {
grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add(
@@ -241,7 +240,8 @@ class WriteContext {
void UpdateStreamsNoLongerStalled() {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
- if (!t_->closed && grpc_chttp2_list_add_writable_stream(t_, s)) {
+ if (t_->closed_with_error == GRPC_ERROR_NONE &&
+ grpc_chttp2_list_add_writable_stream(t_, s)) {
if (!stream_ref_if_not_destroyed(&s->refcount->refs)) {
grpc_chttp2_list_remove_writable_stream(t_, s);
}
@@ -307,7 +307,7 @@ class DataSendContext {
uint32_t stream_remote_window() const {
return (uint32_t)GPR_MAX(
- 0, s_->flow_control.remote_window_delta +
+ 0, s_->flow_control->remote_window_delta() +
(int64_t)t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
}
@@ -315,7 +315,7 @@ class DataSendContext {
uint32_t max_outgoing() const {
return (uint32_t)GPR_MIN(
t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- GPR_MIN(stream_remote_window(), t_->flow_control.remote_window));
+ GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
}
bool AnyOutgoing() const { return max_outgoing() != 0; }
@@ -347,8 +347,7 @@ class DataSendContext {
grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
- grpc_chttp2_flowctl_sent_data(&t_->flow_control, &s_->flow_control,
- send_bytes);
+ s_->flow_control->SentData(send_bytes);
if (s_->compressed_data_buffer.length == 0) {
s_->sending_bytes += s_->uncompressed_data_size;
}
@@ -395,8 +394,8 @@ class StreamWriteContext {
gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
t_->is_client ? "CLIENT" : "SERVER", s->id,
s->sent_initial_metadata, s->send_initial_metadata != NULL,
- (int)(s->flow_control.local_window_delta -
- s->flow_control.announced_window_delta)));
+ (int)(s->flow_control->local_window_delta() -
+ s->flow_control->announced_window_delta())));
}
void FlushInitialMetadata(grpc_exec_ctx *exec_ctx) {
@@ -442,8 +441,7 @@ class StreamWriteContext {
void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) {
/* send any window updates */
- uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update(
- &t_->flow_control, &s_->flow_control);
+ const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
if (stream_announce == 0) return;
grpc_slice_buffer_add(
@@ -464,10 +462,10 @@ class StreamWriteContext {
DataSendContext data_send_context(write_context_, t_, s_);
if (!data_send_context.AnyOutgoing()) {
- if (t_->flow_control.remote_window == 0) {
+ if (t_->flow_control->remote_window() <= 0) {
report_stall(t_, s_, "transport");
grpc_chttp2_list_add_stalled_by_transport(t_, s_);
- } else if (data_send_context.stream_remote_window() == 0) {
+ } else if (data_send_context.stream_remote_window() <= 0) {
report_stall(t_, s_, "stream");
grpc_chttp2_list_add_stalled_by_stream(t_, s_);
}
@@ -583,7 +581,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
ctx.FlushQueuedBuffers(exec_ctx);
ctx.EnactHpackSettings(exec_ctx);
- if (t->flow_control.remote_window > 0) {
+ if (t->flow_control->remote_window() > 0) {
ctx.UpdateStreamsNoLongerStalled();
}
diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc
index ef6c4a509b..d832dacb69 100644
--- a/src/core/lib/http/httpcli_security_connector.cc
+++ b/src/core/lib/http/httpcli_security_connector.cc
@@ -91,8 +91,17 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
tsi_peer_destruct(&peer);
}
+static int httpcli_ssl_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ grpc_httpcli_ssl_channel_security_connector *c1 =
+ (grpc_httpcli_ssl_channel_security_connector *)sc1;
+ grpc_httpcli_ssl_channel_security_connector *c2 =
+ (grpc_httpcli_ssl_channel_security_connector *)sc2;
+ return strcmp(c1->secure_peer_name, c2->secure_peer_name);
+}
+
static grpc_security_connector_vtable httpcli_ssl_vtable = {
- httpcli_ssl_destroy, httpcli_ssl_check_peer};
+ httpcli_ssl_destroy, httpcli_ssl_check_peer, httpcli_ssl_cmp};
static grpc_security_status httpcli_ssl_channel_security_connector_create(
grpc_exec_ctx *exec_ctx, const char *pem_root_certs,
@@ -123,6 +132,10 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
*sc = NULL;
return GRPC_SECURITY_ERROR;
}
+ // We don't actually need a channel credentials object in this case,
+ // but we set it to a non-NULL address so that we don't trigger
+ // assertions in grpc_channel_security_connector_cmp().
+ c->base.channel_creds = (grpc_channel_credentials *)1;
c->base.add_handshakers = httpcli_ssl_add_handshakers;
*sc = &c->base;
return GRPC_SECURITY_OK;
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 58e5392c47..d30ca3fd5e 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -30,6 +30,7 @@
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
+#include <sys/syscall.h>
#include <unistd.h>
#include <grpc/support/alloc.h>
@@ -49,100 +50,96 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/spinlock.h"
-/*******************************************************************************
- * Polling object
- */
-typedef enum {
- PO_POLLING_GROUP,
- PO_POLLSET_SET,
- PO_POLLSET,
- PO_FD,
- /* ordering is important: we always want to lock pollsets before fds:
- this guarantees that using an fd as a pollable is safe */
- PO_EMPTY_POLLABLE,
- PO_COUNT
-} polling_obj_type;
-
-typedef struct polling_obj polling_obj;
-typedef struct polling_group polling_group;
-
-struct polling_obj {
- gpr_mu mu;
- polling_obj_type type;
- polling_group *group;
- struct polling_obj *next;
- struct polling_obj *prev;
-};
-
-struct polling_group {
- polling_obj po;
- gpr_refcount refs;
-};
+// debug aid: create workers on the heap (allows asan to spot
+// use-after-destruction)
+//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
-static void po_init(polling_obj *po, polling_obj_type type);
-static void po_destroy(polling_obj *po);
-static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b);
-static int po_cmp(polling_obj *a, polling_obj *b);
+#define MAX_EPOLL_EVENTS 100
+#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
-static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po,
- size_t initial_po_count);
-static polling_group *pg_ref(polling_group *pg);
-static void pg_unref(polling_group *pg);
-static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a,
- polling_group *b);
-static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
- polling_obj *po);
+#ifndef NDEBUG
+grpc_core::Tracer grpc_trace_pollable_refcount(false, "pollable_refcount");
+#endif
/*******************************************************************************
* pollable Declarations
*/
-typedef struct pollable {
- polling_obj po;
+typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
+
+typedef struct pollable pollable;
+
+/// A pollable is something that can be polled: it has an epoll set to poll on,
+/// and a wakeup fd for kicks
+/// There are three broad types:
+/// - PO_EMPTY - the empty pollable, used before file descriptors are added to
+/// a pollset
+/// - PO_FD - a pollable containing only one FD - used to optimize single-fd
+/// pollsets (which are common with synchronous api usage)
+/// - PO_MULTI - a pollable containing many fds
+struct pollable {
+ pollable_type type; // immutable
+ gpr_refcount refs;
+
int epfd;
grpc_wakeup_fd wakeup;
+
+ // only for type fd... one ref to the owner fd
+ grpc_fd *owner_fd;
+
+ grpc_pollset_set *pollset_set;
+ pollable *next;
+ pollable *prev;
+
+ gpr_mu mu;
grpc_pollset_worker *root_worker;
-} pollable;
-static const char *polling_obj_type_string(polling_obj_type t) {
+ int event_cursor;
+ int event_count;
+ struct epoll_event events[MAX_EPOLL_EVENTS];
+};
+
+static const char *pollable_type_string(pollable_type t) {
switch (t) {
- case PO_POLLING_GROUP:
- return "polling_group";
- case PO_POLLSET_SET:
- return "pollset_set";
- case PO_POLLSET:
+ case PO_MULTI:
return "pollset";
case PO_FD:
return "fd";
- case PO_EMPTY_POLLABLE:
- return "empty_pollable";
- case PO_COUNT:
- return "<invalid:count>";
+ case PO_EMPTY:
+ return "empty";
}
return "<invalid>";
}
static char *pollable_desc(pollable *p) {
char *out;
- gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
- polling_obj_type_string(p->po.type), p->po.group, p->epfd,
- p->wakeup.read_fd);
+ gpr_asprintf(&out, "type=%s epfd=%d wakeup=%d", pollable_type_string(p->type),
+ p->epfd, p->wakeup.read_fd);
return out;
}
-static pollable g_empty_pollable;
+/// Shared empty pollable - used by pollset to poll on until the first fd is
+/// added
+static pollable *g_empty_pollable;
-static void pollable_init(pollable *p, polling_obj_type type);
-static void pollable_destroy(pollable *p);
-/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
-static grpc_error *pollable_materialize(pollable *p);
+static grpc_error *pollable_create(pollable_type type, pollable **p);
+#ifdef NDEBUG
+static pollable *pollable_ref(pollable *p);
+static void pollable_unref(pollable *p);
+#define POLLABLE_REF(p, r) pollable_ref(p)
+#define POLLABLE_UNREF(p, r) pollable_unref(p)
+#else
+static pollable *pollable_ref(pollable *p, int line, const char *reason);
+static void pollable_unref(pollable *p, int line, const char *reason);
+#define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r))
+#define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r))
+#endif
/*******************************************************************************
* Fd Declarations
*/
struct grpc_fd {
- pollable pollable_obj;
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@@ -150,11 +147,10 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
- /* The fd is either closed or we relinquished control of it. In either
- cases, this indicates that the 'fd' on this structure is no longer
- valid */
- gpr_mu orphaned_mu;
- bool orphaned;
+ gpr_mu orphan_mu;
+
+ gpr_mu pollable_mu;
+ pollable *pollable_obj;
gpr_atm read_closure;
gpr_atm write_closure;
@@ -176,47 +172,52 @@ static void fd_global_shutdown(void);
* Pollset Declarations
*/
-typedef struct pollset_worker_link {
+typedef struct {
grpc_pollset_worker *next;
grpc_pollset_worker *prev;
-} pollset_worker_link;
+} pwlink;
-typedef enum {
- PWL_POLLSET,
- PWL_POLLABLE,
- POLLSET_WORKER_LINK_COUNT
-} pollset_worker_links;
+typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks;
struct grpc_pollset_worker {
bool kicked;
bool initialized_cv;
- pollset_worker_link links[POLLSET_WORKER_LINK_COUNT];
+#ifndef NDEBUG
+ // debug aid: which thread started this worker
+ pid_t originator;
+#endif
gpr_cv cv;
grpc_pollset *pollset;
pollable *pollable_obj;
-};
-#define MAX_EPOLL_EVENTS 100
-#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
+ pwlink links[PWLINK_COUNT];
+};
struct grpc_pollset {
- pollable pollable_obj;
- pollable *current_pollable_obj;
- int kick_alls_pending;
+ gpr_mu mu;
+ pollable *active_pollable;
bool kicked_without_poller;
grpc_closure *shutdown_closure;
grpc_pollset_worker *root_worker;
-
- int event_cursor;
- int event_count;
- struct epoll_event events[MAX_EPOLL_EVENTS];
+ int containing_pollset_set_count;
};
/*******************************************************************************
* Pollset-set Declarations
*/
+
struct grpc_pollset_set {
- polling_obj po;
+ gpr_refcount refs;
+ gpr_mu mu;
+ grpc_pollset_set *parent;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
};
/*******************************************************************************
@@ -250,11 +251,6 @@ static bool append_error(grpc_error **composite, grpc_error *error,
* becomes a spurious read notification on a reused fd.
*/
-/* The alarm system needs to be able to wakeup 'some poller' sometimes
- * (specifically when a new alarm needs to be triggered earlier than the next
- * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
- * case occurs. */
-
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
@@ -282,8 +278,9 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_fd *fd = (grpc_fd *)arg;
/* Add the fd to the freelist */
grpc_iomgr_unregister_object(&fd->iomgr_object);
- pollable_destroy(&fd->pollable_obj);
- gpr_mu_destroy(&fd->orphaned_mu);
+ POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
+ gpr_mu_destroy(&fd->pollable_mu);
+ gpr_mu_destroy(&fd->orphan_mu);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@@ -343,12 +340,11 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
}
- pollable_init(&new_fd->pollable_obj, PO_FD);
-
+ gpr_mu_init(&new_fd->pollable_mu);
+ gpr_mu_init(&new_fd->orphan_mu);
+ new_fd->pollable_obj = NULL;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
- gpr_mu_init(&new_fd->orphaned_mu);
- new_fd->orphaned = false;
grpc_lfev_init(&new_fd->read_closure);
grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
@@ -369,24 +365,17 @@ static grpc_fd *fd_create(int fd, const char *name) {
}
static int fd_wrapped_fd(grpc_fd *fd) {
- int ret_fd = -1;
- gpr_mu_lock(&fd->orphaned_mu);
- if (!fd->orphaned) {
- ret_fd = fd->fd;
- }
- gpr_mu_unlock(&fd->orphaned_mu);
-
- return ret_fd;
+ int ret_fd = fd->fd;
+ return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1;
}
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
bool already_closed, const char *reason) {
bool is_fd_closed = already_closed;
- grpc_error *error = GRPC_ERROR_NONE;
- gpr_mu_lock(&fd->pollable_obj.po.mu);
- gpr_mu_lock(&fd->orphaned_mu);
+ gpr_mu_lock(&fd->orphan_mu);
+
fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file
@@ -398,8 +387,6 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
is_fd_closed = true;
}
- fd->orphaned = true;
-
if (!is_fd_closed) {
gpr_log(GPR_DEBUG, "TODO: handle fd removal?");
}
@@ -408,13 +395,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
to be alive (and not added to freelist) until the end of this function */
REF_BY(fd, 1, reason);
- GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
+
+ gpr_mu_unlock(&fd->orphan_mu);
- gpr_mu_unlock(&fd->orphaned_mu);
- gpr_mu_unlock(&fd->pollable_obj.po.mu);
UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */
- GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
- GRPC_ERROR_UNREF(error);
}
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
@@ -451,63 +436,87 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
* Pollable Definitions
*/
-static void pollable_init(pollable *p, polling_obj_type type) {
- po_init(&p->po, type);
- p->root_worker = NULL;
- p->epfd = -1;
+static grpc_error *pollable_create(pollable_type type, pollable **p) {
+ *p = NULL;
+
+ int epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (epfd == -1) {
+ return GRPC_OS_ERROR(errno, "epoll_create1");
+ }
+ *p = (pollable *)gpr_malloc(sizeof(**p));
+ grpc_error *err = grpc_wakeup_fd_init(&(*p)->wakeup);
+ if (err != GRPC_ERROR_NONE) {
+ close(epfd);
+ gpr_free(*p);
+ *p = NULL;
+ return err;
+ }
+ struct epoll_event ev;
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = (void *)(1 | (intptr_t) & (*p)->wakeup);
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) {
+ err = GRPC_OS_ERROR(errno, "epoll_ctl");
+ close(epfd);
+ grpc_wakeup_fd_destroy(&(*p)->wakeup);
+ gpr_free(*p);
+ *p = NULL;
+ return err;
+ }
+
+ (*p)->type = type;
+ gpr_ref_init(&(*p)->refs, 1);
+ gpr_mu_init(&(*p)->mu);
+ (*p)->epfd = epfd;
+ (*p)->owner_fd = NULL;
+ (*p)->pollset_set = NULL;
+ (*p)->next = (*p)->prev = *p;
+ (*p)->root_worker = NULL;
+ (*p)->event_cursor = 0;
+ (*p)->event_count = 0;
+ return GRPC_ERROR_NONE;
}
-static void pollable_destroy(pollable *p) {
- po_destroy(&p->po);
- if (p->epfd != -1) {
- close(p->epfd);
- grpc_wakeup_fd_destroy(&p->wakeup);
+#ifdef NDEBUG
+static pollable *pollable_ref(pollable *p) {
+#else
+static pollable *pollable_ref(pollable *p, int line, const char *reason) {
+ if (grpc_trace_pollable_refcount.enabled()) {
+ int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
+ gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
+ "POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason);
}
+#endif
+ gpr_ref(&p->refs);
+ return p;
}
-/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
-static grpc_error *pollable_materialize(pollable *p) {
- if (p->epfd == -1) {
- int new_epfd = epoll_create1(EPOLL_CLOEXEC);
- if (new_epfd < 0) {
- return GRPC_OS_ERROR(errno, "epoll_create1");
- }
- grpc_error *err = grpc_wakeup_fd_init(&p->wakeup);
- if (err != GRPC_ERROR_NONE) {
- close(new_epfd);
- return err;
- }
- struct epoll_event ev;
- ev.events = (uint32_t)(EPOLLIN | EPOLLET);
- ev.data.ptr = (void *)(1 | (intptr_t)&p->wakeup);
- if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) {
- err = GRPC_OS_ERROR(errno, "epoll_ctl");
- close(new_epfd);
- grpc_wakeup_fd_destroy(&p->wakeup);
- return err;
- }
-
- p->epfd = new_epfd;
+#ifdef NDEBUG
+static void pollable_unref(pollable *p) {
+#else
+static void pollable_unref(pollable *p, int line, const char *reason) {
+ if (p == NULL) return;
+ if (grpc_trace_pollable_refcount.enabled()) {
+ int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
+ gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
+ "POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason);
+ }
+#endif
+ if (p != NULL && gpr_unref(&p->refs)) {
+ close(p->epfd);
+ grpc_wakeup_fd_destroy(&p->wakeup);
+ gpr_free(p);
}
- return GRPC_ERROR_NONE;
}
-/* pollable must be materialized */
static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollable_add_fd";
const int epfd = p->epfd;
- GPR_ASSERT(epfd != -1);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
- gpr_mu_lock(&fd->orphaned_mu);
- if (fd->orphaned) {
- gpr_mu_unlock(&fd->orphaned_mu);
- return GRPC_ERROR_NONE;
- }
struct epoll_event ev_fd;
ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
ev_fd.data.ptr = fd;
@@ -519,7 +528,6 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
}
}
- gpr_mu_unlock(&fd->orphaned_mu);
return error;
}
@@ -535,128 +543,67 @@ GPR_TLS_DECL(g_current_thread_worker);
static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
- pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE);
- return GRPC_ERROR_NONE;
+ return pollable_create(PO_EMPTY, &g_empty_pollable);
}
static void pollset_global_shutdown(void) {
- pollable_destroy(&g_empty_pollable);
+ POLLABLE_UNREF(g_empty_pollable, "g_empty_pollable");
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
}
+/* pollset->mu must be held while calling this function */
static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) "
+ "rw=%p (target:NULL) cpsc=%d (target:0)",
+ pollset, pollset->active_pollable, pollset->shutdown_closure,
+ pollset->root_worker, pollset->containing_pollset_set_count);
+ }
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
- pollset->kick_alls_pending == 0) {
+ pollset->containing_pollset_set_count == 0) {
GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = NULL;
}
}
-static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error_unused) {
- grpc_error *error = GRPC_ERROR_NONE;
- grpc_pollset *pollset = (grpc_pollset *)arg;
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- if (pollset->root_worker != NULL) {
- grpc_pollset_worker *worker = pollset->root_worker;
- do {
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_lock(&worker->pollable_obj->po.mu);
- }
- if (worker->initialized_cv && worker != pollset->root_worker) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
- pollset, worker, &pollset->pollable_obj,
- worker->pollable_obj);
- }
- worker->kicked = true;
- gpr_cv_signal(&worker->cv);
- } else {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
- pollset, worker, &pollset->pollable_obj,
- worker->pollable_obj);
- }
- append_error(&error,
- grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup),
- "pollset_shutdown");
- }
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker->pollable_obj->po.mu);
- }
-
- worker = worker->links[PWL_POLLSET].next;
- } while (worker != pollset->root_worker);
- }
- pollset->kick_alls_pending--;
- pollset_maybe_finish_shutdown(exec_ctx, pollset);
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- GRPC_LOG_IF_ERROR("kick_all", error);
-}
-
-static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- pollset->kick_alls_pending++;
- GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
-}
-
-static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
- grpc_pollset_worker *specific_worker) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "PS:%p kick %p tls_pollset=%p tls_worker=%p "
- "root_worker=(pollset:%p pollable:%p)",
- p, specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
- (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker,
- p->root_worker);
- }
- if (specific_worker == NULL) {
- if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
- if (pollset->root_worker == NULL) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p);
- }
- pollset->kicked_without_poller = true;
- return GRPC_ERROR_NONE;
- } else {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p);
- }
- grpc_error *err = pollable_materialize(p);
- if (err != GRPC_ERROR_NONE) return err;
- return grpc_wakeup_fd_wakeup(&p->wakeup);
- }
- } else {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p);
- }
- return GRPC_ERROR_NONE;
- }
- } else if (specific_worker->kicked) {
+/* pollset->mu must be held before calling this function,
+ * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be
+ * held */
+static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_worker *specific_worker) {
+ pollable *p = specific_worker->pollable_obj;
+ grpc_core::mu_guard lock(&p->mu);
+ GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
+ GPR_ASSERT(specific_worker != NULL);
+ if (specific_worker->kicked) {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
}
+ GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
return GRPC_ERROR_NONE;
- } else if (gpr_tls_get(&g_current_thread_worker) ==
- (intptr_t)specific_worker) {
+ }
+ if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
}
+ GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
specific_worker->kicked = true;
return GRPC_ERROR_NONE;
- } else if (specific_worker == p->root_worker) {
+ }
+ if (specific_worker == p->root_worker) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
}
- grpc_error *err = pollable_materialize(p);
- if (err != GRPC_ERROR_NONE) return err;
specific_worker->kicked = true;
- return grpc_wakeup_fd_wakeup(&p->wakeup);
- } else {
+ grpc_error *error = grpc_wakeup_fd_wakeup(&p->wakeup);
+ return error;
+ }
+ if (specific_worker->initialized_cv) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
}
@@ -664,30 +611,79 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
}
+ // we can get here during end_worker after removing specific_worker from the
+ // pollable list but before removing it from the pollset list
+ return GRPC_ERROR_NONE;
}
-/* p->po.mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
- pollable *p = pollset->current_pollable_obj;
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- if (p != &pollset->pollable_obj) {
- gpr_mu_lock(&p->po.mu);
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p",
+ pollset, specific_worker,
+ (void *)gpr_tls_get(&g_current_thread_pollset),
+ (void *)gpr_tls_get(&g_current_thread_worker),
+ pollset->root_worker);
+ }
+ if (specific_worker == NULL) {
+ if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
+ if (pollset->root_worker == NULL) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset);
+ }
+ GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx);
+ pollset->kicked_without_poller = true;
+ return GRPC_ERROR_NONE;
+ } else {
+ // We've been asked to kick a poller, but we haven't been told which one
+ // ... any will do
+ // We look at the pollset worker list because:
+ // 1. the pollable list may include workers from other pollers, so we'd
+ // need to do an O(N) search
+ // 2. we'd additionally need to take the pollable lock, which we've so
+ // far avoided
+ // Now, we would prefer to wake a poller in cv_wait, and not in
+ // epoll_wait (since the latter would imply the need to do an additional
+ // wakeup)
+ // We know that if a worker is at the root of a pollable, it's (likely)
+ // also the root of a pollset, and we know that if a worker is NOT at
+ // the root of a pollset, it's (likely) not at the root of a pollable,
+ // so we take our chances and choose the SECOND worker enqueued against
+ // the pollset as a worker that's likely to be in cv_wait
+ return kick_one_worker(
+ exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next);
+ }
+ } else {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset);
+ }
+ GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
+ return GRPC_ERROR_NONE;
+ }
+ } else {
+ return kick_one_worker(exec_ctx, specific_worker);
}
- grpc_error *error = pollset_kick_inner(pollset, p, specific_worker);
- if (p != &pollset->pollable_obj) {
- gpr_mu_unlock(&p->po.mu);
+}
+
+static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ const char *err_desc = "pollset_kick_all";
+ grpc_pollset_worker *w = pollset->root_worker;
+ if (w != NULL) {
+ do {
+ append_error(&error, kick_one_worker(exec_ctx, w), err_desc);
+ w = w->links[PWLINK_POLLSET].next;
+ } while (w != pollset->root_worker);
}
return error;
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- pollable_init(&pollset->pollable_obj, PO_POLLSET);
- pollset->current_pollable_obj = &g_empty_pollable;
- pollset->kicked_without_poller = false;
- pollset->shutdown_closure = NULL;
- pollset->root_worker = NULL;
- *mu = &pollset->pollable_obj.po.mu;
+ gpr_mu_init(&pollset->mu);
+ pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset");
+ *mu = &pollset->mu;
}
static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
@@ -719,12 +715,29 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
-static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
+static grpc_error *fd_get_or_become_pollable(grpc_fd *fd, pollable **p) {
+ gpr_mu_lock(&fd->pollable_mu);
grpc_error *error = GRPC_ERROR_NONE;
- static const char *err_desc = "fd_become_pollable";
- if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) {
- append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc);
+ static const char *err_desc = "fd_get_or_become_pollable";
+ if (fd->pollable_obj == NULL) {
+ if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj),
+ err_desc)) {
+ fd->pollable_obj->owner_fd = fd;
+ if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd),
+ err_desc)) {
+ POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
+ fd->pollable_obj = NULL;
+ }
+ }
}
+ if (error == GRPC_ERROR_NONE) {
+ GPR_ASSERT(fd->pollable_obj != NULL);
+ *p = POLLABLE_REF(fd->pollable_obj, "pollset");
+ } else {
+ GPR_ASSERT(fd->pollable_obj == NULL);
+ *p = NULL;
+ }
+ gpr_mu_unlock(&fd->pollable_mu);
return error;
}
@@ -733,23 +746,20 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
pollset->shutdown_closure = closure;
- pollset_kick_all(exec_ctx, pollset);
+ GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
-static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) {
- return p != &g_empty_pollable && p != &pollset->pollable_obj;
-}
-
-static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset, bool drain) {
+static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ pollable *pollable_obj, bool drain) {
static const char *err_desc = "pollset_process_events";
grpc_error *error = GRPC_ERROR_NONE;
for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
- pollset->event_cursor != pollset->event_count;
+ pollable_obj->event_cursor != pollable_obj->event_count;
i++) {
- int n = pollset->event_cursor++;
- struct epoll_event *ev = &pollset->events[n];
+ int n = pollable_obj->event_cursor++;
+ struct epoll_event *ev = &pollable_obj->events[n];
void *data_ptr = ev->data.ptr;
if (1 & (intptr_t)data_ptr) {
if (grpc_polling_trace.enabled()) {
@@ -784,22 +794,17 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- pollable_destroy(&pollset->pollable_obj);
- if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) {
- UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2,
- "pollset_pollable");
- }
- GRPC_LOG_IF_ERROR("pollset_process_events",
- pollset_process_events(exec_ctx, pollset, true));
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ pollset->active_pollable = NULL;
}
-static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- pollable *p, grpc_millis deadline) {
+static grpc_error *pollable_epoll(grpc_exec_ctx *exec_ctx, pollable *p,
+ grpc_millis deadline) {
int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
if (grpc_polling_trace.enabled()) {
char *desc = pollable_desc(p);
- gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout);
+ gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout);
gpr_free(desc);
}
@@ -809,7 +814,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int r;
do {
GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
- r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout);
+ r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
@@ -818,24 +823,24 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r);
+ gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r);
}
- pollset->event_cursor = 0;
- pollset->event_count = r;
+ p->event_cursor = 0;
+ p->event_count = r;
return GRPC_ERROR_NONE;
}
/* Return true if first in list */
-static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link,
- grpc_pollset_worker *worker) {
- if (*root == NULL) {
- *root = worker;
+static bool worker_insert(grpc_pollset_worker **root_worker,
+ grpc_pollset_worker *worker, pwlinks link) {
+ if (*root_worker == NULL) {
+ *root_worker = worker;
worker->links[link].next = worker->links[link].prev = worker;
return true;
} else {
- worker->links[link].next = *root;
+ worker->links[link].next = *root_worker;
worker->links[link].prev = worker->links[link].next->links[link].prev;
worker->links[link].next->links[link].prev = worker;
worker->links[link].prev->links[link].next = worker;
@@ -843,26 +848,26 @@ static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link,
}
}
-/* Return true if last in list */
-typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
+/* returns the new root IFF the root changed */
+typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result;
-static worker_remove_result worker_remove(grpc_pollset_worker **root,
- pollset_worker_links link,
- grpc_pollset_worker *worker) {
- if (worker == *root) {
+static worker_remove_result worker_remove(grpc_pollset_worker **root_worker,
+ grpc_pollset_worker *worker,
+ pwlinks link) {
+ if (worker == *root_worker) {
if (worker == worker->links[link].next) {
- *root = NULL;
- return EMPTIED;
+ *root_worker = NULL;
+ return WRR_EMPTIED;
} else {
- *root = worker->links[link].next;
+ *root_worker = worker->links[link].next;
worker->links[link].prev->links[link].next = worker->links[link].next;
worker->links[link].next->links[link].prev = worker->links[link].prev;
- return NEW_ROOT;
+ return WRR_NEW_ROOT;
}
} else {
worker->links[link].prev->links[link].next = worker->links[link].next;
worker->links[link].next->links[link].prev = worker->links[link].prev;
- return REMOVED;
+ return WRR_REMOVED;
}
}
@@ -871,25 +876,20 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl,
grpc_millis deadline) {
- bool do_poll = true;
+ bool do_poll = (pollset->shutdown_closure == nullptr);
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
worker->kicked = false;
worker->pollset = pollset;
- worker->pollable_obj = pollset->current_pollable_obj;
-
- if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
- REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll");
- }
-
- worker_insert(&pollset->root_worker, PWL_POLLSET, worker);
- if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE,
- worker)) {
+ worker->pollable_obj =
+ POLLABLE_REF(pollset->active_pollable, "pollset_worker");
+ worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET);
+ gpr_mu_lock(&worker->pollable_obj->mu);
+ if (!worker_insert(&worker->pollable_obj->root_worker, worker,
+ PWLINK_POLLABLE)) {
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- }
+ gpr_mu_unlock(&pollset->mu);
if (grpc_polling_trace.enabled() &&
worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
@@ -897,7 +897,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
poll_deadline_to_millis_timeout(exec_ctx, deadline));
}
while (do_poll && worker->pollable_obj->root_worker != worker) {
- if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu,
+ if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
@@ -916,158 +916,232 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker->pollable_obj, worker);
}
}
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker->pollable_obj->po.mu);
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- gpr_mu_lock(&worker->pollable_obj->po.mu);
- }
grpc_exec_ctx_invalidate_now(exec_ctx);
+ } else {
+ gpr_mu_unlock(&pollset->mu);
}
+ gpr_mu_unlock(&worker->pollable_obj->mu);
- return do_poll && pollset->shutdown_closure == NULL &&
- pollset->current_pollable_obj == worker->pollable_obj;
+ return do_poll;
}
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
- if (NEW_ROOT ==
- worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) {
- gpr_cv_signal(&worker->pollable_obj->root_worker->cv);
+ gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&worker->pollable_obj->mu);
+ switch (worker_remove(&worker->pollable_obj->root_worker, worker,
+ PWLINK_POLLABLE)) {
+ case WRR_NEW_ROOT: {
+ // wakeup new poller
+ grpc_pollset_worker *new_root = worker->pollable_obj->root_worker;
+ GPR_ASSERT(new_root->initialized_cv);
+ gpr_cv_signal(&new_root->cv);
+ break;
+ }
+ case WRR_EMPTIED:
+ if (pollset->active_pollable != worker->pollable_obj) {
+ // pollable no longer being polled: flush events
+ pollable_process_events(exec_ctx, pollset, worker->pollable_obj, true);
+ }
+ break;
+ case WRR_REMOVED:
+ break;
+ }
+ gpr_mu_unlock(&worker->pollable_obj->mu);
+ POLLABLE_UNREF(worker->pollable_obj, "pollset_worker");
+ if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) ==
+ WRR_EMPTIED) {
+ pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
- if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
- UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll");
- }
- if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) {
- pollset_maybe_finish_shutdown(exec_ctx, pollset);
- }
}
-/* pollset->po.mu lock must be held by the caller before calling this.
+#ifndef NDEBUG
+static long gettid(void) { return syscall(__NR_gettid); }
+#endif
+
+/* pollset->mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
grpc_millis deadline) {
+#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
+ grpc_pollset_worker *worker =
+ (grpc_pollset_worker *)gpr_malloc(sizeof(*worker));
+#define WORKER_PTR (worker)
+#else
grpc_pollset_worker worker;
- if (0 && grpc_polling_trace.enabled()) {
+#define WORKER_PTR (&worker)
+#endif
+#ifndef NDEBUG
+ WORKER_PTR->originator = gettid();
+#endif
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR
- " deadline=%" PRIdPTR " kwp=%d root_worker=%p",
- pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline,
- pollset->kicked_without_poller, pollset->root_worker);
+ " deadline=%" PRIdPTR " kwp=%d pollable=%p",
+ pollset, worker_hdl, WORKER_PTR, grpc_exec_ctx_now(exec_ctx),
+ deadline, pollset->kicked_without_poller, pollset->active_pollable);
}
- grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
+ grpc_error *error = GRPC_ERROR_NONE;
if (pollset->kicked_without_poller) {
pollset->kicked_without_poller = false;
- return GRPC_ERROR_NONE;
- }
- if (pollset->current_pollable_obj != &pollset->pollable_obj) {
- gpr_mu_lock(&pollset->current_pollable_obj->po.mu);
- }
- if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
- gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- GPR_ASSERT(!pollset->shutdown_closure);
- append_error(&error, pollable_materialize(worker.pollable_obj), err_desc);
- if (worker.pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker.pollable_obj->po.mu);
- }
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- if (pollset->event_cursor == pollset->event_count) {
- append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj,
- deadline),
+ } else {
+ if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR);
+ if (WORKER_PTR->pollable_obj->event_cursor ==
+ WORKER_PTR->pollable_obj->event_count) {
+ append_error(&error, pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj,
+ deadline),
+ err_desc);
+ }
+ append_error(&error,
+ pollable_process_events(exec_ctx, pollset,
+ WORKER_PTR->pollable_obj, false),
err_desc);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_tls_set(&g_current_thread_pollset, 0);
+ gpr_tls_set(&g_current_thread_worker, 0);
}
- append_error(&error, pollset_process_events(exec_ctx, pollset, false),
- err_desc);
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- if (worker.pollable_obj != &pollset->pollable_obj) {
- gpr_mu_lock(&worker.pollable_obj->po.mu);
- }
- gpr_tls_set(&g_current_thread_pollset, 0);
- gpr_tls_set(&g_current_thread_worker, 0);
- pollset_maybe_finish_shutdown(exec_ctx, pollset);
+ end_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl);
}
- end_worker(exec_ctx, pollset, &worker, worker_hdl);
- if (worker.pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker.pollable_obj->po.mu);
- }
- if (grpc_exec_ctx_has_work(exec_ctx)) {
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
+#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
+ gpr_free(worker);
+#endif
+#undef WORKER_PTR
+ return error;
+}
+
+static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) {
+ static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd";
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p add fd %p (%d); transition pollable from empty to fd",
+ pollset, fd, fd->fd);
}
+ append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable),
+ err_desc);
return error;
}
-static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_fd *fd = (grpc_fd *)arg;
- UNREF_BY(exec_ctx, fd, 2, "pollset_pollable");
+static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) {
+ static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi";
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(
+ GPR_DEBUG,
+ "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller",
+ pollset, and_add_fd, and_add_fd ? and_add_fd->fd : -1,
+ pollset->active_pollable->owner_fd);
+ }
+ append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
+ grpc_fd *initial_fd = pollset->active_pollable->owner_fd;
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ pollset->active_pollable = NULL;
+ if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable),
+ err_desc)) {
+ append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd),
+ err_desc);
+ if (and_add_fd != NULL) {
+ append_error(&error,
+ pollable_add_fd(pollset->active_pollable, and_add_fd),
+ err_desc);
+ }
+ }
+ return error;
}
/* expects pollsets locked, flag whether fd is locked or not */
static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset, grpc_fd *fd,
- bool fd_locked) {
- static const char *err_desc = "pollset_add_fd";
+ grpc_pollset *pollset, grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
- if (pollset->current_pollable_obj == &g_empty_pollable) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "PS:%p add fd %p; transition pollable from empty to fd", pollset,
- fd);
- }
- /* empty pollable --> single fd pollable */
- pollset_kick_all(exec_ctx, pollset);
- pollset->current_pollable_obj = &fd->pollable_obj;
- if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu);
- append_error(&error, fd_become_pollable_locked(fd), err_desc);
- if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu);
- REF_BY(fd, 2, "pollset_pollable");
- } else if (pollset->current_pollable_obj == &pollset->pollable_obj) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
- }
- append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd),
- err_desc);
- } else if (pollset->current_pollable_obj != &fd->pollable_obj) {
- grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj;
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "PS:%p add fd %p; transition pollable from fd %p to multipoller",
- pollset, fd, had_fd);
- }
- /* Introduce a spurious completion.
- If we do not, then it may be that the fd-specific epoll set consumed
- a completion without being polled, leading to a missed edge going up. */
- grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
- grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
- pollset_kick_all(exec_ctx, pollset);
- pollset->current_pollable_obj = &pollset->pollable_obj;
- if (append_error(&error, pollable_materialize(&pollset->pollable_obj),
- err_desc)) {
- pollable_add_fd(&pollset->pollable_obj, had_fd);
- pollable_add_fd(&pollset->pollable_obj, fd);
- }
- GRPC_CLOSURE_SCHED(exec_ctx,
- GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
+ pollable *po_at_start =
+ POLLABLE_REF(pollset->active_pollable, "pollset_add_fd");
+ switch (pollset->active_pollable->type) {
+ case PO_EMPTY:
+ /* empty pollable --> single fd pollable */
+ error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx,
+ pollset, fd);
+ break;
+ case PO_FD:
+ gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) &
+ 1) == 0) {
+ error = pollset_transition_pollable_from_empty_to_fd_locked(
+ exec_ctx, pollset, fd);
+ } else {
+ /* fd --> multipoller */
+ error = pollset_transition_pollable_from_fd_to_multi_locked(
+ exec_ctx, pollset, fd);
+ }
+ gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
+ break;
+ case PO_MULTI:
+ error = pollable_add_fd(pollset->active_pollable, fd);
+ break;
+ }
+ if (error != GRPC_ERROR_NONE) {
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ pollset->active_pollable = po_at_start;
+ } else {
+ POLLABLE_UNREF(po_at_start, "pollset_add_fd");
+ }
+ return error;
+}
+
+static grpc_error *pollset_as_multipollable_locked(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ pollable **pollable_obj) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ pollable *po_at_start =
+ POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable");
+ switch (pollset->active_pollable->type) {
+ case PO_EMPTY:
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ error = pollable_create(PO_MULTI, &pollset->active_pollable);
+ break;
+ case PO_FD:
+ gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) &
+ 1) == 0) {
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ error = pollable_create(PO_MULTI, &pollset->active_pollable);
+ } else {
+ error = pollset_transition_pollable_from_fd_to_multi_locked(
+ exec_ctx, pollset, NULL);
+ }
+ gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
+ break;
+ case PO_MULTI:
+ break;
+ }
+ if (error != GRPC_ERROR_NONE) {
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ pollset->active_pollable = po_at_start;
+ *pollable_obj = NULL;
+ } else {
+ *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set");
+ POLLABLE_UNREF(po_at_start, "pollset_as_multipollable");
}
return error;
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false);
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
+ gpr_mu_lock(&pollset->mu);
+ grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd);
+ gpr_mu_unlock(&pollset->mu);
GRPC_LOG_IF_ERROR("pollset_add_fd", error);
}
@@ -1075,301 +1149,255 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* Pollset-set Definitions
*/
+static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) {
+ gpr_mu_lock(&pss->mu);
+ while (pss->parent != NULL) {
+ gpr_mu_unlock(&pss->mu);
+ pss = pss->parent;
+ gpr_mu_lock(&pss->mu);
+ }
+ return pss;
+}
+
static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss));
- po_init(&pss->po, PO_POLLSET_SET);
+ gpr_mu_init(&pss->mu);
+ gpr_ref_init(&pss->refs, 1);
return pss;
}
-static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss) {
- po_destroy(&pss->po);
+static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) {
+ if (pss == NULL) return;
+ if (!gpr_unref(&pss->refs)) return;
+ pollset_set_unref(exec_ctx, pss->parent);
+ gpr_mu_destroy(&pss->mu);
+ for (size_t i = 0; i < pss->pollset_count; i++) {
+ gpr_mu_lock(&pss->pollsets[i]->mu);
+ if (0 == --pss->pollsets[i]->containing_pollset_set_count) {
+ pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]);
+ }
+ gpr_mu_unlock(&pss->pollsets[i]->mu);
+ }
+ for (size_t i = 0; i < pss->fd_count; i++) {
+ UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set");
+ }
+ gpr_free(pss->pollsets);
+ gpr_free(pss->fds);
gpr_free(pss);
}
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
- po_join(exec_ctx, &pss->po, &fd->pollable_obj.po);
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
+ }
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_set_add_fd";
+ pss = pss_lock_adam(pss);
+ for (size_t i = 0; i < pss->pollset_count; i++) {
+ append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd),
+ err_desc);
+ }
+ if (pss->fd_count == pss->fd_capacity) {
+ pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8);
+ pss->fds =
+ (grpc_fd **)gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds));
+ }
+ REF_BY(fd, 2, "pollset_set");
+ pss->fds[pss->fd_count++] = fd;
+ gpr_mu_unlock(&pss->mu);
+
+ GRPC_LOG_IF_ERROR(err_desc, error);
}
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
- grpc_fd *fd) {}
-
-static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {
- po_join(exec_ctx, &pss->po, &ps->pollable_obj.po);
+ grpc_fd *fd) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd);
+ }
+ pss = pss_lock_adam(pss);
+ size_t i;
+ for (i = 0; i < pss->fd_count; i++) {
+ if (pss->fds[i] == fd) {
+ UNREF_BY(exec_ctx, fd, 2, "pollset_set");
+ break;
+ }
+ }
+ GPR_ASSERT(i != pss->fd_count);
+ for (; i < pss->fd_count - 1; i++) {
+ pss->fds[i] = pss->fds[i + 1];
+ }
+ pss->fd_count--;
+ gpr_mu_unlock(&pss->mu);
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {}
-
-static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {
- po_join(exec_ctx, &bag->po, &item->po);
-}
-
-static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {}
-
-static void po_init(polling_obj *po, polling_obj_type type) {
- gpr_mu_init(&po->mu);
- po->type = type;
- po->group = NULL;
- po->next = po;
- po->prev = po;
-}
-
-static polling_group *pg_lock_latest(polling_group *pg) {
- /* assumes pg unlocked; consumes ref, returns ref */
- gpr_mu_lock(&pg->po.mu);
- while (pg->po.group != NULL) {
- polling_group *new_pg = pg_ref(pg->po.group);
- gpr_mu_unlock(&pg->po.mu);
- pg_unref(pg);
- pg = new_pg;
- gpr_mu_lock(&pg->po.mu);
- }
- return pg;
-}
-
-static void po_destroy(polling_obj *po) {
- if (po->group != NULL) {
- polling_group *pg = pg_lock_latest(po->group);
- po->prev->next = po->next;
- po->next->prev = po->prev;
- gpr_mu_unlock(&pg->po.mu);
- pg_unref(pg);
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps);
}
- gpr_mu_destroy(&po->mu);
-}
-
-static polling_group *pg_ref(polling_group *pg) {
- gpr_ref(&pg->refs);
- return pg;
-}
-
-static void pg_unref(polling_group *pg) {
- if (gpr_unref(&pg->refs)) {
- po_destroy(&pg->po);
- gpr_free(pg);
+ pss = pss_lock_adam(pss);
+ size_t i;
+ for (i = 0; i < pss->pollset_count; i++) {
+ if (pss->pollsets[i] == ps) {
+ break;
+ }
}
-}
-
-static int po_cmp(polling_obj *a, polling_obj *b) {
- if (a == b) return 0;
- if (a->type < b->type) return -1;
- if (a->type > b->type) return 1;
- if (a < b) return -1;
- assert(a > b);
- return 1;
-}
-
-static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) {
- switch (po_cmp(a, b)) {
- case 0:
- return;
- case 1:
- GPR_SWAP(polling_obj *, a, b);
- /* fall through */
- case -1:
- gpr_mu_lock(&a->mu);
- gpr_mu_lock(&b->mu);
-
- if (a->group == NULL) {
- if (b->group == NULL) {
- polling_obj *initial_po[] = {a, b};
- pg_create(exec_ctx, initial_po, GPR_ARRAY_SIZE(initial_po));
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
- } else {
- polling_group *b_group = pg_ref(b->group);
- gpr_mu_unlock(&b->mu);
- gpr_mu_unlock(&a->mu);
- pg_join(exec_ctx, b_group, a);
- }
- } else if (b->group == NULL) {
- polling_group *a_group = pg_ref(a->group);
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
- pg_join(exec_ctx, a_group, b);
- } else if (a->group == b->group) {
- /* nothing to do */
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
- } else {
- polling_group *a_group = pg_ref(a->group);
- polling_group *b_group = pg_ref(b->group);
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
- pg_merge(exec_ctx, a_group, b_group);
- }
+ GPR_ASSERT(i != pss->pollset_count);
+ for (; i < pss->pollset_count - 1; i++) {
+ pss->pollsets[i] = pss->pollsets[i + 1];
}
-}
-
-static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) {
- if (a->type == PO_FD && b->type == PO_POLLSET) {
- pollset_add_fd_locked(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a, true);
- } else if (a->type == PO_POLLSET && b->type == PO_FD) {
- pollset_add_fd_locked(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b, true);
+ pss->pollset_count--;
+ gpr_mu_unlock(&pss->mu);
+ gpr_mu_lock(&ps->mu);
+ if (0 == --ps->containing_pollset_set_count) {
+ pollset_maybe_finish_shutdown(exec_ctx, ps);
}
+ gpr_mu_unlock(&ps->mu);
}
-static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from,
- polling_group *to) {
- for (polling_obj *a = from->po.next; a != &from->po; a = a->next) {
- for (polling_obj *b = to->po.next; b != &to->po; b = b->next) {
- if (po_cmp(a, b) < 0) {
- gpr_mu_lock(&a->mu);
- gpr_mu_lock(&b->mu);
- } else {
- GPR_ASSERT(po_cmp(a, b) != 0);
- gpr_mu_lock(&b->mu);
- gpr_mu_lock(&a->mu);
+// add all fds to pollables, and output a new array of unorphaned out_fds
+// assumes pollsets are multipollable
+static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds,
+ size_t fd_count, grpc_pollset **pollsets,
+ size_t pollset_count,
+ const char *err_desc, grpc_fd **out_fds,
+ size_t *out_fd_count) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ for (size_t i = 0; i < fd_count; i++) {
+ gpr_mu_lock(&fds[i]->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) {
+ gpr_mu_unlock(&fds[i]->orphan_mu);
+ UNREF_BY(exec_ctx, fds[i], 2, "pollset_set");
+ } else {
+ for (size_t j = 0; j < pollset_count; j++) {
+ append_error(&error,
+ pollable_add_fd(pollsets[j]->active_pollable, fds[i]),
+ err_desc);
}
- pg_notify(exec_ctx, a, b);
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
+ gpr_mu_unlock(&fds[i]->orphan_mu);
+ out_fds[(*out_fd_count)++] = fds[i];
}
}
+ return error;
}
-static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po,
- size_t initial_po_count) {
- /* assumes all polling objects in initial_po are locked */
- polling_group *pg = (polling_group *)gpr_malloc(sizeof(*pg));
- po_init(&pg->po, PO_POLLING_GROUP);
- gpr_ref_init(&pg->refs, (int)initial_po_count);
- for (size_t i = 0; i < initial_po_count; i++) {
- GPR_ASSERT(initial_po[i]->group == NULL);
- initial_po[i]->group = pg;
- }
- for (size_t i = 1; i < initial_po_count; i++) {
- initial_po[i]->prev = initial_po[i - 1];
- }
- for (size_t i = 0; i < initial_po_count - 1; i++) {
- initial_po[i]->next = initial_po[i + 1];
- }
- initial_po[0]->prev = &pg->po;
- initial_po[initial_po_count - 1]->next = &pg->po;
- pg->po.next = initial_po[0];
- pg->po.prev = initial_po[initial_po_count - 1];
- for (size_t i = 1; i < initial_po_count; i++) {
- for (size_t j = 0; j < i; j++) {
- pg_notify(exec_ctx, initial_po[i], initial_po[j]);
- }
+static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps);
}
-}
-
-static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
- polling_obj *po) {
- /* assumes neither pg nor po are locked; consumes one ref to pg */
- pg = pg_lock_latest(pg);
- /* pg locked */
- for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */;
- existing != &pg->po; existing = existing->next) {
- if (po_cmp(po, existing) < 0) {
- gpr_mu_lock(&po->mu);
- gpr_mu_lock(&existing->mu);
- } else {
- GPR_ASSERT(po_cmp(po, existing) != 0);
- gpr_mu_lock(&existing->mu);
- gpr_mu_lock(&po->mu);
- }
- /* pg, po, existing locked */
- if (po->group != NULL) {
- gpr_mu_unlock(&pg->po.mu);
- polling_group *po_group = pg_ref(po->group);
- gpr_mu_unlock(&po->mu);
- gpr_mu_unlock(&existing->mu);
- pg_merge(exec_ctx, pg, po_group);
- /* early exit: polling obj picked up a group during joining: we needed
- to do a full merge */
- return;
- }
- pg_notify(exec_ctx, po, existing);
- gpr_mu_unlock(&po->mu);
- gpr_mu_unlock(&existing->mu);
- }
- gpr_mu_lock(&po->mu);
- if (po->group != NULL) {
- gpr_mu_unlock(&pg->po.mu);
- polling_group *po_group = pg_ref(po->group);
- gpr_mu_unlock(&po->mu);
- pg_merge(exec_ctx, pg, po_group);
- /* early exit: polling obj picked up a group during joining: we needed
- to do a full merge */
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_set_add_pollset";
+ pollable *pollable_obj = NULL;
+ gpr_mu_lock(&ps->mu);
+ if (!GRPC_LOG_IF_ERROR(err_desc, pollset_as_multipollable_locked(
+ exec_ctx, ps, &pollable_obj))) {
+ GPR_ASSERT(pollable_obj == NULL);
+ gpr_mu_unlock(&ps->mu);
return;
}
- po->group = pg;
- po->next = &pg->po;
- po->prev = pg->po.prev;
- po->prev->next = po->next->prev = po;
- gpr_mu_unlock(&pg->po.mu);
- gpr_mu_unlock(&po->mu);
+ ps->containing_pollset_set_count++;
+ gpr_mu_unlock(&ps->mu);
+ pss = pss_lock_adam(pss);
+ size_t initial_fd_count = pss->fd_count;
+ pss->fd_count = 0;
+ append_error(&error,
+ add_fds_to_pollsets(exec_ctx, pss->fds, initial_fd_count, &ps, 1,
+ err_desc, pss->fds, &pss->fd_count),
+ err_desc);
+ if (pss->pollset_count == pss->pollset_capacity) {
+ pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8);
+ pss->pollsets = (grpc_pollset **)gpr_realloc(
+ pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets));
+ }
+ pss->pollsets[pss->pollset_count++] = ps;
+ gpr_mu_unlock(&pss->mu);
+ POLLABLE_UNREF(pollable_obj, "pollset_set");
+
+ GRPC_LOG_IF_ERROR(err_desc, error);
}
-static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a,
- polling_group *b) {
+static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *a,
+ grpc_pollset_set *b) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b);
+ }
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_set_add_fd";
for (;;) {
if (a == b) {
- pg_unref(a);
- pg_unref(b);
+ // pollset ancestors are the same: nothing to do
return;
}
- if (a > b) GPR_SWAP(polling_group *, a, b);
- gpr_mu_lock(&a->po.mu);
- gpr_mu_lock(&b->po.mu);
- if (a->po.group != NULL) {
- polling_group *m2 = pg_ref(a->po.group);
- gpr_mu_unlock(&a->po.mu);
- gpr_mu_unlock(&b->po.mu);
- pg_unref(a);
- a = m2;
- } else if (b->po.group != NULL) {
- polling_group *m2 = pg_ref(b->po.group);
- gpr_mu_unlock(&a->po.mu);
- gpr_mu_unlock(&b->po.mu);
- pg_unref(b);
- b = m2;
+ if (a > b) {
+ GPR_SWAP(grpc_pollset_set *, a, b);
+ }
+ gpr_mu *a_mu = &a->mu;
+ gpr_mu *b_mu = &b->mu;
+ gpr_mu_lock(a_mu);
+ gpr_mu_lock(b_mu);
+ if (a->parent != NULL) {
+ a = a->parent;
+ } else if (b->parent != NULL) {
+ b = b->parent;
} else {
- break;
+ break; // exit loop, both pollsets locked
}
+ gpr_mu_unlock(a_mu);
+ gpr_mu_unlock(b_mu);
}
- polling_group **unref = NULL;
- size_t unref_count = 0;
- size_t unref_cap = 0;
- b->po.group = a;
- pg_broadcast(exec_ctx, a, b);
- pg_broadcast(exec_ctx, b, a);
- while (b->po.next != &b->po) {
- polling_obj *po = b->po.next;
- gpr_mu_lock(&po->mu);
- if (unref_count == unref_cap) {
- unref_cap = GPR_MAX(8, 3 * unref_cap / 2);
- unref = (polling_group **)gpr_realloc(unref, unref_cap * sizeof(*unref));
- }
- unref[unref_count++] = po->group;
- po->group = pg_ref(a);
- // unlink from b
- po->prev->next = po->next;
- po->next->prev = po->prev;
- // link to a
- po->next = &a->po;
- po->prev = a->po.prev;
- po->next->prev = po->prev->next = po;
- gpr_mu_unlock(&po->mu);
- }
- gpr_mu_unlock(&a->po.mu);
- gpr_mu_unlock(&b->po.mu);
- for (size_t i = 0; i < unref_count; i++) {
- pg_unref(unref[i]);
- }
- gpr_free(unref);
- pg_unref(b);
+ // try to do the least copying possible
+ // TODO(ctiller): there's probably a better heuristic here
+ const size_t a_size = a->fd_count + a->pollset_count;
+ const size_t b_size = b->fd_count + b->pollset_count;
+ if (b_size > a_size) {
+ GPR_SWAP(grpc_pollset_set *, a, b);
+ }
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a);
+ }
+ gpr_ref(&a->refs);
+ b->parent = a;
+ if (a->fd_capacity < a->fd_count + b->fd_count) {
+ a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count);
+ a->fds = (grpc_fd **)gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds));
+ }
+ size_t initial_a_fd_count = a->fd_count;
+ a->fd_count = 0;
+ append_error(&error, add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count,
+ b->pollsets, b->pollset_count,
+ "merge_a2b", a->fds, &a->fd_count),
+ err_desc);
+ append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count,
+ a->pollsets, a->pollset_count,
+ "merge_b2a", a->fds, &a->fd_count),
+ err_desc);
+ if (a->pollset_capacity < a->pollset_count + b->pollset_count) {
+ a->pollset_capacity =
+ GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count);
+ a->pollsets = (grpc_pollset **)gpr_realloc(
+ a->pollsets, a->pollset_capacity * sizeof(*a->pollsets));
+ }
+ if (b->pollset_count > 0) {
+ memcpy(a->pollsets + a->pollset_count, b->pollsets,
+ b->pollset_count * sizeof(*b->pollsets));
+ }
+ a->pollset_count += b->pollset_count;
+ gpr_free(b->fds);
+ gpr_free(b->pollsets);
+ b->fds = NULL;
+ b->pollsets = NULL;
+ b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0;
+ gpr_mu_unlock(&a->mu);
+ gpr_mu_unlock(&b->mu);
}
+static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {}
+
/*******************************************************************************
* Event engine binding
*/
@@ -1399,7 +1427,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_add_fd,
pollset_set_create,
- pollset_set_destroy,
+ pollset_set_unref, // destroy ==> unref 1 public ref
pollset_set_add_pollset,
pollset_set_del_pollset,
pollset_set_add_pollset_set,
@@ -1412,6 +1440,10 @@ static const grpc_event_engine_vtable vtable = {
const grpc_event_engine_vtable *grpc_init_epollex_linux(
bool explicitly_requested) {
+ if (!explicitly_requested) {
+ return NULL;
+ }
+
if (!grpc_has_wakeup_fd()) {
return NULL;
}
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 45403c3037..28e4efa011 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -91,12 +91,9 @@ const grpc_event_engine_vtable *init_non_polling(bool explicit_request) {
} // namespace
static const event_engine_factory g_factories[] = {
- {"epoll1", grpc_init_epoll1_linux},
- {"epollsig", grpc_init_epollsig_linux},
- {"poll", grpc_init_poll_posix},
- {"poll-cv", grpc_init_poll_cv_posix},
- {"epollex", grpc_init_epollex_linux},
- {"none", init_non_polling},
+ {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
+ {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix},
+ {"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling},
};
static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 6707f40f10..e4d956dbef 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -152,6 +152,15 @@ void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) {
gpr_timespec grpc_millis_to_timespec(grpc_millis millis,
gpr_clock_type clock_type) {
+ // special-case infinities as grpc_millis can be 32bit on some platforms
+ // while gpr_time_from_millis always takes an int64_t.
+ if (millis == GRPC_MILLIS_INF_FUTURE) {
+ return gpr_inf_future(clock_type);
+ }
+ if (millis == GRPC_MILLIS_INF_PAST) {
+ return gpr_inf_past(clock_type);
+ }
+
if (clock_type == GPR_TIMESPAN) {
return gpr_time_from_millis(millis, GPR_TIMESPAN);
}
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index ae8d7b4724..cfdacaf988 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -186,7 +186,7 @@ static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
}
if (old_count == 0) {
GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx);
- p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size());
+ p = (backup_poller *)gpr_zalloc(sizeof(*p) + grpc_pollset_size());
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p);
}
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc
index 795ca0660a..cf10bf24c8 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.cc
+++ b/src/core/lib/security/credentials/fake/fake_credentials.cc
@@ -38,7 +38,8 @@ static grpc_security_status fake_transport_security_create_security_connector(
grpc_call_credentials *call_creds, const char *target,
const grpc_channel_args *args, grpc_channel_security_connector **sc,
grpc_channel_args **new_args) {
- *sc = grpc_fake_channel_security_connector_create(call_creds, target, args);
+ *sc =
+ grpc_fake_channel_security_connector_create(c, call_creds, target, args);
return GRPC_SECURITY_OK;
}
@@ -46,7 +47,7 @@ static grpc_security_status
fake_transport_security_server_create_security_connector(
grpc_exec_ctx *exec_ctx, grpc_server_credentials *c,
grpc_server_security_connector **sc) {
- *sc = grpc_fake_server_security_connector_create();
+ *sc = grpc_fake_server_security_connector_create(c);
return GRPC_SECURITY_OK;
}
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
index 8258bd7cd1..e58f4eed1c 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
@@ -262,7 +262,7 @@ static bool oauth2_token_fetcher_get_request_metadata(
grpc_mdelem cached_access_token_md = GRPC_MDNULL;
gpr_mu_lock(&c->mu);
if (!GRPC_MDISNULL(c->access_token_md) &&
- (c->token_expiration + grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) {
+ (c->token_expiration - grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) {
cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md);
}
if (!GRPC_MDISNULL(cached_access_token_md)) {
diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc
index 9df69a2a3d..290336adc0 100644
--- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc
+++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc
@@ -62,7 +62,8 @@ static grpc_security_status ssl_create_security_connector(
}
}
status = grpc_ssl_channel_security_connector_create(
- exec_ctx, call_creds, &c->config, target, overridden_target_name, sc);
+ exec_ctx, creds, call_creds, &c->config, target, overridden_target_name,
+ sc);
if (status != GRPC_SECURITY_OK) {
return status;
}
@@ -128,7 +129,8 @@ static grpc_security_status ssl_server_create_security_connector(
grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds,
grpc_server_security_connector **sc) {
grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds;
- return grpc_ssl_server_security_connector_create(exec_ctx, &c->config, sc);
+ return grpc_ssl_server_security_connector_create(exec_ctx, creds, &c->config,
+ sc);
}
static grpc_server_credentials_vtable ssl_server_vtable = {
diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc
index 3c19d5e60a..31e2fd0107 100644
--- a/src/core/lib/security/transport/security_connector.cc
+++ b/src/core/lib/security/transport/security_connector.cc
@@ -136,6 +136,39 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
}
}
+int grpc_security_connector_cmp(grpc_security_connector *sc,
+ grpc_security_connector *other) {
+ if (sc == NULL || other == NULL) return GPR_ICMP(sc, other);
+ int c = GPR_ICMP(sc->vtable, other->vtable);
+ if (c != 0) return c;
+ return sc->vtable->cmp(sc, other);
+}
+
+int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1,
+ grpc_channel_security_connector *sc2) {
+ GPR_ASSERT(sc1->channel_creds != NULL);
+ GPR_ASSERT(sc2->channel_creds != NULL);
+ int c = GPR_ICMP(sc1->channel_creds, sc2->channel_creds);
+ if (c != 0) return c;
+ c = GPR_ICMP(sc1->request_metadata_creds, sc2->request_metadata_creds);
+ if (c != 0) return c;
+ c = GPR_ICMP((void *)sc1->check_call_host, (void *)sc2->check_call_host);
+ if (c != 0) return c;
+ c = GPR_ICMP((void *)sc1->cancel_check_call_host,
+ (void *)sc2->cancel_check_call_host);
+ if (c != 0) return c;
+ return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers);
+}
+
+int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1,
+ grpc_server_security_connector *sc2) {
+ GPR_ASSERT(sc1->server_creds != NULL);
+ GPR_ASSERT(sc2->server_creds != NULL);
+ int c = GPR_ICMP(sc1->server_creds, sc2->server_creds);
+ if (c != 0) return c;
+ return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers);
+}
+
bool grpc_channel_security_connector_check_call_host(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
const char *host, grpc_auth_context *auth_context,
@@ -199,25 +232,27 @@ void grpc_security_connector_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&sc->refcount)) sc->vtable->destroy(exec_ctx, sc);
}
-static void connector_pointer_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) {
+static void connector_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) {
GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, (grpc_security_connector *)p,
- "connector_pointer_arg_destroy");
+ "connector_arg_destroy");
}
-static void *connector_pointer_arg_copy(void *p) {
+static void *connector_arg_copy(void *p) {
return GRPC_SECURITY_CONNECTOR_REF((grpc_security_connector *)p,
- "connector_pointer_arg_copy");
+ "connector_arg_copy");
}
-static int connector_pointer_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
+static int connector_cmp(void *a, void *b) {
+ return grpc_security_connector_cmp((grpc_security_connector *)a,
+ (grpc_security_connector *)b);
+}
-static const grpc_arg_pointer_vtable connector_pointer_vtable = {
- connector_pointer_arg_copy, connector_pointer_arg_destroy,
- connector_pointer_cmp};
+static const grpc_arg_pointer_vtable connector_arg_vtable = {
+ connector_arg_copy, connector_arg_destroy, connector_cmp};
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) {
return grpc_channel_arg_pointer_create((char *)GRPC_ARG_SECURITY_CONNECTOR,
- sc, &connector_pointer_vtable);
+ sc, &connector_arg_vtable);
}
grpc_security_connector *grpc_security_connector_from_arg(const grpc_arg *arg) {
@@ -382,6 +417,32 @@ static void fake_server_check_peer(grpc_exec_ctx *exec_ctx,
fake_check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked);
}
+static int fake_channel_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ grpc_fake_channel_security_connector *c1 =
+ (grpc_fake_channel_security_connector *)sc1;
+ grpc_fake_channel_security_connector *c2 =
+ (grpc_fake_channel_security_connector *)sc2;
+ int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base);
+ if (c != 0) return c;
+ c = strcmp(c1->target, c2->target);
+ if (c != 0) return c;
+ if (c1->expected_targets == NULL || c2->expected_targets == NULL) {
+ c = GPR_ICMP(c1->expected_targets, c2->expected_targets);
+ } else {
+ c = strcmp(c1->expected_targets, c2->expected_targets);
+ }
+ if (c != 0) return c;
+ return GPR_ICMP(c1->is_lb_channel, c2->is_lb_channel);
+}
+
+static int fake_server_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ return grpc_server_security_connector_cmp(
+ (grpc_server_security_connector *)sc1,
+ (grpc_server_security_connector *)sc2);
+}
+
static bool fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
const char *host,
@@ -418,12 +479,13 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx,
}
static grpc_security_connector_vtable fake_channel_vtable = {
- fake_channel_destroy, fake_channel_check_peer};
+ fake_channel_destroy, fake_channel_check_peer, fake_channel_cmp};
static grpc_security_connector_vtable fake_server_vtable = {
- fake_server_destroy, fake_server_check_peer};
+ fake_server_destroy, fake_server_check_peer, fake_server_cmp};
grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
+ grpc_channel_credentials *channel_creds,
grpc_call_credentials *request_metadata_creds, const char *target,
const grpc_channel_args *args) {
grpc_fake_channel_security_connector *c =
@@ -431,6 +493,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
gpr_ref_init(&c->base.base.refcount, 1);
c->base.base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
c->base.base.vtable = &fake_channel_vtable;
+ c->base.channel_creds = channel_creds;
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = fake_channel_check_call_host;
@@ -444,13 +507,14 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
}
grpc_server_security_connector *grpc_fake_server_security_connector_create(
- void) {
+ grpc_server_credentials *server_creds) {
grpc_server_security_connector *c =
(grpc_server_security_connector *)gpr_zalloc(
sizeof(grpc_server_security_connector));
gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &fake_server_vtable;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
+ c->server_creds = server_creds;
c->add_handshakers = fake_server_add_handshakers;
return c;
}
@@ -473,6 +537,7 @@ static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
+ grpc_channel_credentials_unref(exec_ctx, c->base.channel_creds);
grpc_call_credentials_unref(exec_ctx, c->base.request_metadata_creds);
tsi_ssl_client_handshaker_factory_unref(c->client_handshaker_factory);
c->client_handshaker_factory = NULL;
@@ -485,6 +550,7 @@ static void ssl_server_destroy(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
+ grpc_server_credentials_unref(exec_ctx, c->base.server_creds);
tsi_ssl_server_handshaker_factory_unref(c->server_handshaker_factory);
c->server_handshaker_factory = NULL;
gpr_free(sc);
@@ -641,6 +707,29 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx,
GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error);
}
+static int ssl_channel_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ grpc_ssl_channel_security_connector *c1 =
+ (grpc_ssl_channel_security_connector *)sc1;
+ grpc_ssl_channel_security_connector *c2 =
+ (grpc_ssl_channel_security_connector *)sc2;
+ int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base);
+ if (c != 0) return c;
+ c = strcmp(c1->target_name, c2->target_name);
+ if (c != 0) return c;
+ return (c1->overridden_target_name == NULL ||
+ c2->overridden_target_name == NULL)
+ ? GPR_ICMP(c1->overridden_target_name, c2->overridden_target_name)
+ : strcmp(c1->overridden_target_name, c2->overridden_target_name);
+}
+
+static int ssl_server_cmp(grpc_security_connector *sc1,
+ grpc_security_connector *sc2) {
+ return grpc_server_security_connector_cmp(
+ (grpc_server_security_connector *)sc1,
+ (grpc_server_security_connector *)sc2);
+}
+
static void add_shallow_auth_property_to_peer(tsi_peer *peer,
const grpc_auth_property *prop,
const char *tsi_prop_name) {
@@ -717,10 +806,10 @@ static void ssl_channel_cancel_check_call_host(
}
static grpc_security_connector_vtable ssl_channel_vtable = {
- ssl_channel_destroy, ssl_channel_check_peer};
+ ssl_channel_destroy, ssl_channel_check_peer, ssl_channel_cmp};
static grpc_security_connector_vtable ssl_server_vtable = {
- ssl_server_destroy, ssl_server_check_peer};
+ ssl_server_destroy, ssl_server_check_peer, ssl_server_cmp};
/* returns a NULL terminated slice. */
static grpc_slice compute_default_pem_root_certs_once(void) {
@@ -804,7 +893,8 @@ const char *grpc_get_default_ssl_roots(void) {
}
grpc_security_status grpc_ssl_channel_security_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds,
+ grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds,
+ grpc_call_credentials *request_metadata_creds,
const grpc_ssl_config *config, const char *target_name,
const char *overridden_target_name, grpc_channel_security_connector **sc) {
size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
@@ -840,6 +930,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
gpr_ref_init(&c->base.base.refcount, 1);
c->base.base.vtable = &ssl_channel_vtable;
c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
+ c->base.channel_creds = grpc_channel_credentials_ref(channel_creds);
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = ssl_channel_check_call_host;
@@ -874,8 +965,8 @@ error:
}
grpc_security_status grpc_ssl_server_security_connector_create(
- grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config,
- grpc_server_security_connector **sc) {
+ grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds,
+ const grpc_ssl_server_config *config, grpc_server_security_connector **sc) {
size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
const char **alpn_protocol_strings =
(const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols);
@@ -897,6 +988,7 @@ grpc_security_status grpc_ssl_server_security_connector_create(
gpr_ref_init(&c->base.base.refcount, 1);
c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
c->base.base.vtable = &ssl_server_vtable;
+ c->base.server_creds = grpc_server_credentials_ref(server_creds);
result = tsi_create_ssl_server_handshaker_factory_ex(
config->pem_key_cert_pairs, config->num_key_cert_pairs,
config->pem_root_certs, get_tsi_client_certificate_request_type(
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index e4ee2110eb..38b53a2b8e 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -60,13 +60,9 @@ typedef struct {
void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc,
tsi_peer peer, grpc_auth_context **auth_context,
grpc_closure *on_peer_checked);
+ int (*cmp)(grpc_security_connector *sc, grpc_security_connector *other);
} grpc_security_connector_vtable;
-typedef struct grpc_security_connector_handshake_list {
- void *handshake;
- struct grpc_security_connector_handshake_list *next;
-} grpc_security_connector_handshake_list;
-
struct grpc_security_connector {
const grpc_security_connector_vtable *vtable;
gpr_refcount refcount;
@@ -104,6 +100,10 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_auth_context **auth_context,
grpc_closure *on_peer_checked);
+/* Compares two security connectors. */
+int grpc_security_connector_cmp(grpc_security_connector *sc,
+ grpc_security_connector *other);
+
/* Util to encapsulate the connector in a channel arg. */
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc);
@@ -116,13 +116,14 @@ grpc_security_connector *grpc_security_connector_find_in_args(
/* --- channel_security_connector object. ---
- A channel security connector object represents away to configure the
+ A channel security connector object represents a way to configure the
underlying transport security mechanism on the client side. */
typedef struct grpc_channel_security_connector grpc_channel_security_connector;
struct grpc_channel_security_connector {
grpc_security_connector base;
+ grpc_channel_credentials *channel_creds;
grpc_call_credentials *request_metadata_creds;
bool (*check_call_host)(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc, const char *host,
@@ -138,6 +139,10 @@ struct grpc_channel_security_connector {
grpc_handshake_manager *handshake_mgr);
};
+/// A helper function for use in grpc_security_connector_cmp() implementations.
+int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1,
+ grpc_channel_security_connector *sc2);
+
/// Checks that the host that will be set for a call is acceptable.
/// Returns true if completed synchronously, in which case \a error will
/// be set to indicate the result. Otherwise, \a on_call_host_checked
@@ -161,18 +166,23 @@ void grpc_channel_security_connector_add_handshakers(
/* --- server_security_connector object. ---
- A server security connector object represents away to configure the
+ A server security connector object represents a way to configure the
underlying transport security mechanism on the server side. */
typedef struct grpc_server_security_connector grpc_server_security_connector;
struct grpc_server_security_connector {
grpc_security_connector base;
+ grpc_server_credentials *server_creds;
void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr);
};
+/// A helper function for use in grpc_security_connector_cmp() implementations.
+int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1,
+ grpc_server_security_connector *sc2);
+
void grpc_server_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr);
@@ -182,13 +192,14 @@ void grpc_server_security_connector_add_handshakers(
/* For TESTING ONLY!
Creates a fake connector that emulates real channel security. */
grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
+ grpc_channel_credentials *channel_creds,
grpc_call_credentials *request_metadata_creds, const char *target,
const grpc_channel_args *args);
/* For TESTING ONLY!
Creates a fake connector that emulates real server security. */
grpc_server_security_connector *grpc_fake_server_security_connector_create(
- void);
+ grpc_server_credentials *server_creds);
/* Config for ssl clients. */
@@ -211,7 +222,8 @@ typedef struct {
specific error code otherwise.
*/
grpc_security_status grpc_ssl_channel_security_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds,
+ grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds,
+ grpc_call_credentials *request_metadata_creds,
const grpc_ssl_config *config, const char *target_name,
const char *overridden_target_name, grpc_channel_security_connector **sc);
@@ -236,8 +248,8 @@ typedef struct {
specific error code otherwise.
*/
grpc_security_status grpc_ssl_server_security_connector_create(
- grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config,
- grpc_server_security_connector **sc);
+ grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds,
+ const grpc_ssl_server_config *config, grpc_server_security_connector **sc);
/* Util. */
const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
diff --git a/src/core/lib/support/cpu_linux.cc b/src/core/lib/support/cpu_linux.cc
index 2280668442..53619caa5f 100644
--- a/src/core/lib/support/cpu_linux.cc
+++ b/src/core/lib/support/cpu_linux.cc
@@ -38,8 +38,9 @@ static int ncpus = 0;
static void init_num_cpus() {
/* This must be signed. sysconf returns -1 when the number cannot be
determined */
+ int cpu = sched_getcpu();
ncpus = (int)sysconf(_SC_NPROCESSORS_ONLN);
- if (ncpus < 1) {
+ if (ncpus < 1 || cpu < 0) {
gpr_log(GPR_ERROR, "Cannot determine number of CPUs: assuming 1");
ncpus = 1;
}
@@ -56,6 +57,9 @@ unsigned gpr_cpu_current_cpu(void) {
// sched_getcpu() is undefined on musl
return 0;
#else
+ if (gpr_cpu_num_cores() == 1) {
+ return 0;
+ }
int cpu = sched_getcpu();
if (cpu < 0) {
gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno));
diff --git a/src/core/lib/support/memory.h b/src/core/lib/support/memory.h
index dc3d32e1c2..6b336681db 100644
--- a/src/core/lib/support/memory.h
+++ b/src/core/lib/support/memory.h
@@ -21,6 +21,7 @@
#include <grpc/support/alloc.h>
+#include <limits>
#include <memory>
#include <utility>
@@ -54,6 +55,46 @@ inline UniquePtr<T> MakeUnique(Args&&... args) {
return UniquePtr<T>(New<T>(std::forward<Args>(args)...));
}
+// an allocator that uses gpr_malloc/gpr_free
+template <class T>
+class Allocator {
+ public:
+ typedef T value_type;
+ typedef T* pointer;
+ typedef const T* const_pointer;
+ typedef T& reference;
+ typedef const T& const_reference;
+ typedef std::size_t size_type;
+ typedef std::ptrdiff_t difference_type;
+ typedef std::false_type propagate_on_container_move_assignment;
+ template <class U>
+ struct rebind {
+ typedef Allocator<U> other;
+ };
+ typedef std::true_type is_always_equal;
+
+ pointer address(reference x) const { return &x; }
+ const_pointer address(const_reference x) const { return &x; }
+ pointer allocate(std::size_t n,
+ std::allocator<void>::const_pointer hint = 0) {
+ return static_cast<pointer>(gpr_malloc(n * sizeof(T)));
+ }
+ void deallocate(T* p, std::size_t n) { gpr_free(p); }
+ size_t max_size() const {
+ return std::numeric_limits<size_type>::max() / sizeof(value_type);
+ }
+ void construct(pointer p, const_reference val) { new ((void*)p) T(val); }
+ template <class U, class... Args>
+ void construct(U* p, Args&&... args) {
+ ::new ((void*)p) U(std::forward<Args>(args)...);
+ }
+ void destroy(pointer p) { p->~T(); }
+ template <class U>
+ void destroy(U* p) {
+ p->~U();
+ }
+};
+
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_SUPPORT_MEMORY_H */
diff --git a/src/core/lib/support/vector.h b/src/core/lib/support/vector.h
new file mode 100644
index 0000000000..4a7db80676
--- /dev/null
+++ b/src/core/lib/support/vector.h
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_VECTOR_H
+#define GRPC_CORE_LIB_SUPPORT_VECTOR_H
+
+#include "absl/container/inlined_vector.h"
+#include "src/core/lib/support/memory.h"
+
+namespace grpc_core {
+
+template <typename T, size_t N>
+using InlinedVector = absl::InlinedVector<T, N, Allocator<T>>;
+
+} // namespace grpc_core
+
+#endif
diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc
index 6eb2f54864..36826f45ca 100644
--- a/src/core/lib/transport/bdp_estimator.cc
+++ b/src/core/lib/transport/bdp_estimator.cc
@@ -32,13 +32,12 @@ BdpEstimator::BdpEstimator(const char *name)
accumulator_(0),
estimate_(65536),
ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)),
- next_ping_scheduled_(0),
inter_ping_delay_(100.0), // start at 100ms
stable_estimate_count_(0),
bw_est_(0),
name_(name) {}
-void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) {
+grpc_millis BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_);
double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec;
@@ -78,7 +77,7 @@ void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) {
}
ping_state_ = PingState::UNSCHEDULED;
accumulator_ = 0;
- next_ping_scheduled_ = grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_;
+ return grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_;
}
} // namespace grpc_core
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index 681595c34b..3558eb49e7 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -40,30 +40,11 @@ class BdpEstimator {
explicit BdpEstimator(const char *name);
~BdpEstimator() {}
- // Returns true if a reasonable estimate could be obtained
- bool EstimateBdp(int64_t *estimate_out) const {
- *estimate_out = estimate_;
- return true;
- }
- bool EstimateBandwidth(double *bw_out) const {
- *bw_out = bw_est_;
- return true;
- }
+ int64_t EstimateBdp() const { return estimate_; }
+ double EstimateBandwidth() const { return bw_est_; }
void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; }
- // Returns true if the user should schedule a ping
- bool NeedPing(grpc_exec_ctx *exec_ctx) const {
- switch (ping_state_) {
- case PingState::UNSCHEDULED:
- return grpc_exec_ctx_now(exec_ctx) >= next_ping_scheduled_;
- case PingState::SCHEDULED:
- case PingState::STARTED:
- return false;
- }
- GPR_UNREACHABLE_CODE(return false);
- }
-
// Schedule a ping: call in response to receiving a true from
// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
// transport (but not necessarily started)
@@ -91,8 +72,8 @@ class BdpEstimator {
ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC);
}
- // Completes a previously started ping
- void CompletePing(grpc_exec_ctx *exec_ctx);
+ // Completes a previously started ping, returns when to schedule the next one
+ grpc_millis CompletePing(grpc_exec_ctx *exec_ctx);
private:
enum class PingState { UNSCHEDULED, SCHEDULED, STARTED };
@@ -102,8 +83,6 @@ class BdpEstimator {
int64_t estimate_;
// when was the current ping started?
gpr_timespec ping_start_time_;
- // when should the next ping start?
- grpc_millis next_ping_scheduled_;
int inter_ping_delay_;
int stable_estimate_count_;
double bw_est_;
diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc
index 99eccba34e..1eddb6f0d6 100644
--- a/src/core/lib/transport/metadata.cc
+++ b/src/core/lib/transport/metadata.cc
@@ -351,11 +351,14 @@ static size_t get_base64_encoded_size(size_t raw_length) {
return raw_length / 3 * 4 + tail_xtra[raw_length % 3];
}
-size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem) {
+size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem,
+ bool use_true_binary_metadata) {
size_t overhead_and_key = 32 + GRPC_SLICE_LENGTH(GRPC_MDKEY(elem));
size_t value_len = GRPC_SLICE_LENGTH(GRPC_MDVALUE(elem));
if (grpc_is_binary_header(GRPC_MDKEY(elem))) {
- return overhead_and_key + get_base64_encoded_size(value_len);
+ return overhead_and_key + (use_true_binary_metadata
+ ? value_len + 1
+ : get_base64_encoded_size(value_len));
} else {
return overhead_and_key + value_len;
}
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index a70ebc4921..66780d925b 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -132,7 +132,8 @@ grpc_mdelem grpc_mdelem_create(
bool grpc_mdelem_eq(grpc_mdelem a, grpc_mdelem b);
-size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem);
+size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem,
+ bool use_true_binary_metadata);
/* Mutator and accessor for grpc_mdelem user data. The destructor function
is used as a type tag and is checked during user_data fetch. */
diff --git a/src/core/lib/transport/pid_controller.cc b/src/core/lib/transport/pid_controller.cc
index 4b304f17b2..9f7750d693 100644
--- a/src/core/lib/transport/pid_controller.cc
+++ b/src/core/lib/transport/pid_controller.cc
@@ -19,45 +19,30 @@
#include "src/core/lib/transport/pid_controller.h"
#include <grpc/support/useful.h>
-void grpc_pid_controller_init(grpc_pid_controller *pid_controller,
- grpc_pid_controller_args args) {
- pid_controller->args = args;
- pid_controller->last_control_value = args.initial_control_value;
- grpc_pid_controller_reset(pid_controller);
-}
+namespace grpc_core {
-void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) {
- pid_controller->last_error = 0.0;
- pid_controller->last_dc_dt = 0.0;
- pid_controller->error_integral = 0.0;
-}
+PidController::PidController(const Args &args)
+ : last_control_value_(args.initial_control_value()), args_(args) {}
-double grpc_pid_controller_update(grpc_pid_controller *pid_controller,
- double error, double dt) {
- if (dt == 0) return pid_controller->last_control_value;
+double PidController::Update(double error, double dt) {
+ if (dt <= 0) return last_control_value_;
/* integrate error using the trapezoid rule */
- pid_controller->error_integral +=
- dt * (pid_controller->last_error + error) * 0.5;
- pid_controller->error_integral = GPR_CLAMP(
- pid_controller->error_integral, -pid_controller->args.integral_range,
- pid_controller->args.integral_range);
- double diff_error = (error - pid_controller->last_error) / dt;
+ error_integral_ += dt * (last_error_ + error) * 0.5;
+ error_integral_ = GPR_CLAMP(error_integral_, -args_.integral_range(),
+ args_.integral_range());
+ double diff_error = (error - last_error_) / dt;
/* calculate derivative of control value vs time */
- double dc_dt = pid_controller->args.gain_p * error +
- pid_controller->args.gain_i * pid_controller->error_integral +
- pid_controller->args.gain_d * diff_error;
+ double dc_dt = args_.gain_p() * error + args_.gain_i() * error_integral_ +
+ args_.gain_d() * diff_error;
/* and perform trapezoidal integration */
- double new_control_value = pid_controller->last_control_value +
- dt * (pid_controller->last_dc_dt + dc_dt) * 0.5;
- new_control_value =
- GPR_CLAMP(new_control_value, pid_controller->args.min_control_value,
- pid_controller->args.max_control_value);
- pid_controller->last_error = error;
- pid_controller->last_dc_dt = dc_dt;
- pid_controller->last_control_value = new_control_value;
+ double new_control_value =
+ last_control_value_ + dt * (last_dc_dt_ + dc_dt) * 0.5;
+ new_control_value = GPR_CLAMP(new_control_value, args_.min_control_value(),
+ args_.max_control_value());
+ last_error_ = error;
+ last_dc_dt_ = dc_dt;
+ last_control_value_ = new_control_value;
return new_control_value;
}
-double grpc_pid_controller_last(grpc_pid_controller *pid_controller) {
- return pid_controller->last_control_value;
-}
+} // namespace grpc_core
diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h
index 80899e9a20..87e59a1a90 100644
--- a/src/core/lib/transport/pid_controller.h
+++ b/src/core/lib/transport/pid_controller.h
@@ -19,9 +19,7 @@
#ifndef GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H
#define GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include <limits>
/* \file Simple PID controller.
Implements a proportional-integral-derivative controller.
@@ -30,41 +28,87 @@ extern "C" {
Gains can be set to adjust sensitivity to current error (p), the integral
of error (i), and the derivative of error (d). */
-typedef struct {
- double gain_p;
- double gain_i;
- double gain_d;
- double initial_control_value;
- double min_control_value;
- double max_control_value;
- double integral_range;
-} grpc_pid_controller_args;
+namespace grpc_core {
-typedef struct {
- double last_error;
- double error_integral;
- double last_control_value;
- double last_dc_dt;
- grpc_pid_controller_args args;
-} grpc_pid_controller;
+class PidController {
+ public:
+ class Args {
+ public:
+ double gain_p() const { return gain_p_; }
+ double gain_i() const { return gain_i_; }
+ double gain_d() const { return gain_d_; }
+ double initial_control_value() const { return initial_control_value_; }
+ double min_control_value() const { return min_control_value_; }
+ double max_control_value() const { return max_control_value_; }
+ double integral_range() const { return integral_range_; }
-/** Initialize the controller */
-void grpc_pid_controller_init(grpc_pid_controller *pid_controller,
- grpc_pid_controller_args args);
+ Args& set_gain_p(double gain_p) {
+ gain_p_ = gain_p;
+ return *this;
+ }
+ Args& set_gain_i(double gain_i) {
+ gain_i_ = gain_i;
+ return *this;
+ }
+ Args& set_gain_d(double gain_d) {
+ gain_d_ = gain_d;
+ return *this;
+ }
+ Args& set_initial_control_value(double initial_control_value) {
+ initial_control_value_ = initial_control_value;
+ return *this;
+ }
+ Args& set_min_control_value(double min_control_value) {
+ min_control_value_ = min_control_value;
+ return *this;
+ }
+ Args& set_max_control_value(double max_control_value) {
+ max_control_value_ = max_control_value;
+ return *this;
+ }
+ Args& set_integral_range(double integral_range) {
+ integral_range_ = integral_range;
+ return *this;
+ }
-/** Reset the controller: useful when things have changed significantly */
-void grpc_pid_controller_reset(grpc_pid_controller *pid_controller);
+ private:
+ double gain_p_ = 0.0;
+ double gain_i_ = 0.0;
+ double gain_d_ = 0.0;
+ double initial_control_value_ = 0.0;
+ double min_control_value_ = std::numeric_limits<double>::min();
+ double max_control_value_ = std::numeric_limits<double>::max();
+ double integral_range_ = std::numeric_limits<double>::max();
+ };
-/** Update the controller: given a current error estimate, and the time since
- the last update, returns a new control value */
-double grpc_pid_controller_update(grpc_pid_controller *pid_controller,
- double error, double dt);
+ explicit PidController(const Args& args);
-/** Returns the last control value calculated */
-double grpc_pid_controller_last(grpc_pid_controller *pid_controller);
+ /// Reset the controller internal state: useful when the environment has
+ /// changed significantly
+ void Reset() {
+ last_error_ = 0.0;
+ last_dc_dt_ = 0.0;
+ error_integral_ = 0.0;
+ }
-#ifdef __cplusplus
-}
-#endif
+ /// Update the controller: given a current error estimate, and the time since
+ /// the last update, returns a new control value
+ double Update(double error, double dt);
+
+ /// Returns the last control value calculated
+ double last_control_value() const { return last_control_value_; }
+
+ /// Returns the current error integral (mostly for testing)
+ double error_integral() const { return error_integral_; }
+
+ private:
+ double last_error_ = 0.0;
+ double error_integral_ = 0.0;
+ double last_control_value_;
+ double last_dc_dt_ = 0.0;
+ const Args args_;
+};
+
+} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */
diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc
index e2893c8f3c..de67281dd4 100644
--- a/src/cpp/client/create_channel.cc
+++ b/src/cpp/client/create_channel.cc
@@ -38,8 +38,7 @@ std::shared_ptr<Channel> CreateCustomChannel(
const grpc::string& target,
const std::shared_ptr<ChannelCredentials>& creds,
const ChannelArguments& args) {
- internal::GrpcLibrary
- init_lib; // We need to call init in case of a bad creds.
+ GrpcLibraryCodegen init_lib; // We need to call init in case of a bad creds.
return creds
? creds->CreateChannel(target, args)
: CreateChannelInternal("", grpc_lame_client_channel_create(
diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c
index e46037743d..a65d233017 100644
--- a/src/php/ext/grpc/server.c
+++ b/src/php/ext/grpc/server.c
@@ -169,7 +169,7 @@ PHP_METHOD(Server, requestCall) {
/**
* Add a http2 over tcp listener.
* @param string $addr The address to add
- * @return bool True on success, false on failure
+ * @return int Port on success, 0 on failure
*/
PHP_METHOD(Server, addHttp2Port) {
const char *addr;
@@ -190,7 +190,7 @@ PHP_METHOD(Server, addHttp2Port) {
* Add a secure http2 over tcp listener.
* @param string $addr The address to add
* @param ServerCredentials The ServerCredentials object
- * @return bool True on success, false on failure
+ * @return int Port on success, 0 on failure
*/
PHP_METHOD(Server, addSecureHttp2Port) {
const char *addr;