aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_channel/client_channel.c6
-rw-r--r--src/core/ext/client_channel/client_channel.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c390
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c22
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c11
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h122
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c30
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c43
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c120
-rw-r--r--src/core/lib/channel/connected_channel.c6
-rw-r--r--src/core/lib/channel/connected_channel.h5
-rw-r--r--src/core/lib/iomgr/closure.c8
-rw-r--r--src/core/lib/iomgr/closure.h5
-rw-r--r--src/core/lib/iomgr/resource_quota.c32
-rw-r--r--src/core/lib/iomgr/resource_quota.h6
-rw-r--r--src/core/lib/iomgr/udp_server.c38
-rw-r--r--src/core/lib/iomgr/udp_server.h5
-rw-r--r--src/core/lib/support/log_posix.c12
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/transport/bdp_estimator.c104
-rw-r--r--src/core/lib/transport/bdp_estimator.h76
-rw-r--r--src/core/lib/transport/pid_controller.c36
-rw-r--r--src/core/lib/transport/pid_controller.h17
-rw-r--r--src/objective-c/!ProtoCompiler-gRPCPlugin.podspec15
-rw-r--r--src/objective-c/BoringSSL.podspec382
-rw-r--r--src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m2
-rw-r--r--src/objective-c/GRPCClient/private/GRPCHost.m5
-rw-r--r--src/objective-c/GRPCClient/private/version.h41
-rw-r--r--src/php/README.md49
-rw-r--r--src/php/composer.json2
-rw-r--r--src/php/tests/generated_code/math_client.php15
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
34 files changed, 1135 insertions, 488 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 07eb68a3eb..208c95b67a 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -644,6 +644,12 @@ typedef struct client_channel_call_data {
grpc_linked_mdelem lb_token_mdelem;
} call_data;
+grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
+ grpc_call_element *call_elem) {
+ grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
+ return scc == CANCELLED_CALL ? NULL : scc;
+}
+
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("add_waiting_locked", 0);
if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
diff --git a/src/core/ext/client_channel/client_channel.h b/src/core/ext/client_channel/client_channel.h
index f02587d0c1..5e6e64e58b 100644
--- a/src/core/ext/client_channel/client_channel.h
+++ b/src/core/ext/client_channel/client_channel.h
@@ -57,4 +57,8 @@ void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
+/* Debug helper: pull the subchannel call from a call stack element */
+grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
+ grpc_call_element *elem);
+
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 15f486d676..fa18f5a725 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -124,6 +124,21 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
+static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error);
+static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error);
+
+static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error);
+static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type,
+ grpc_closure *on_initiate,
+ grpc_closure *on_complete);
+
+#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
+#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
+
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -155,16 +170,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_combiner_destroy(exec_ctx, t->combiner);
- /* callback remaining pings: they're not allowed to call into the transpot,
- and maybe they hold resources that need to be freed */
- while (t->pings.next != &t->pings) {
- grpc_chttp2_outstanding_ping *ping = t->pings.next;
- grpc_closure_sched(exec_ctx, ping->on_recv,
- GRPC_ERROR_CREATE("Transport closed"));
- ping->next->prev = ping->prev;
- ping->prev->next = ping->next;
- gpr_free(ping);
- }
+ cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
while (t->write_cb_pool) {
grpc_chttp2_write_cb *next = t->write_cb_pool->next;
@@ -172,6 +178,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->write_cb_pool = next;
}
+ gpr_free(t->ping_acks);
gpr_free(t->peer_string);
gpr_free(t);
}
@@ -224,10 +231,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
- t->stream_lookahead = DEFAULT_WINDOW;
- t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
- t->ping_counter = 1;
- t->pings.next = t->pings.prev = &t->pings;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@@ -248,6 +251,22 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+
+ grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
+ t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
+ grpc_pid_controller_init(
+ &t->pid_controller,
+ (grpc_pid_controller_args){.gain_p = 4,
+ .gain_i = 8,
+ .gain_d = 0,
+ .initial_control_value = log2(DEFAULT_WINDOW),
+ .min_control_value = -1,
+ .max_control_value = 22,
+ .integral_range = 10});
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@@ -290,6 +309,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
DEFAULT_MAX_HEADER_LIST_SIZE);
+ t->ping_policy = (grpc_chttp2_repeated_ping_policy){
+ .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
+ .min_time_between_pings =
+ gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN),
+ };
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -307,14 +332,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) {
- const grpc_integer_options options = {-1, 5, INT_MAX};
- const int value =
- grpc_channel_arg_get_integer(&channel_args->args[i], options);
- if (value >= 0) {
- t->stream_lookahead = (uint32_t)value;
- }
- } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
const grpc_integer_options options = {-1, 0, INT_MAX};
const int value =
@@ -324,6 +341,19 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(uint32_t)value);
}
} else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
+ t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) {
+ t->ping_policy.min_time_between_pings = gpr_time_from_millis(
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0,
+ INT_MAX}),
+ GPR_TIMESPAN);
+ } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
&channel_args->args[i],
@@ -334,24 +364,26 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id setting_id;
grpc_integer_options integer_options;
bool availability[2] /* server, client */;
- } settings_map[] = {
- {GRPC_ARG_MAX_CONCURRENT_STREAMS,
- GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
- {-1, 0, INT_MAX},
- {true, false}},
- {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
- GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
- {-1, 0, INT_MAX},
- {true, true}},
- {GRPC_ARG_MAX_METADATA_SIZE,
- GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
- {-1, 0, INT_MAX},
- {true, true}},
- {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
- GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- {-1, 16384, 16777215},
- {true, true}},
- };
+ } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ {-1, 0, INT32_MAX},
+ {true, false}},
+ {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
+ GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_MAX_METADATA_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ {-1, 16384, 16777215},
+ {true, true}},
+ {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ {-1, 5, INT32_MAX},
+ {true, true}}};
for (j = 0; j < (int)GPR_ARRAY_SIZE(settings_map); j++) {
if (0 == strcmp(channel_args->args[i].key,
settings_map[j].channel_arg_name)) {
@@ -374,6 +406,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
+
grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
post_benign_reclaimer(exec_ctx, t);
}
@@ -425,6 +460,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
+ cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -475,11 +511,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
if (server_data) {
s->id = (uint32_t)(uintptr_t)server_data;
- s->outgoing_window = t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window = s->max_recv_bytes =
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
@@ -508,6 +539,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
grpc_chttp2_list_remove_stalled_by_transport(t, s);
+ grpc_chttp2_list_remove_stalled_by_stream(t, s);
for (int i = 0; i < STREAM_LIST_COUNT; i++) {
if (s->included[i]) {
@@ -647,13 +679,21 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
-void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, bool covered_by_poller,
- const char *reason) {
+void grpc_chttp2_become_writable(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_write_type stream_write_type, const char *reason) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
- grpc_chttp2_initiate_write(exec_ctx, t, covered_by_poller, reason);
+ }
+ switch (stream_write_type) {
+ case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK:
+ break;
+ case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED:
+ grpc_chttp2_initiate_write(exec_ctx, t, true, reason);
+ break;
+ case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED:
+ grpc_chttp2_initiate_write(exec_ctx, t, false, reason);
+ break;
}
}
@@ -781,7 +821,6 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
- uint32_t stream_incoming_window;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
@@ -804,15 +843,11 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
"no_more_stream_ids");
}
- s->outgoing_window = t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window = stream_incoming_window =
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes);
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ "new_stream");
}
/* cancel out streams that will never be started */
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -907,7 +942,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ "op.send_message");
}
}
@@ -1069,7 +1106,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
"op.send_initial_metadata");
}
} else {
@@ -1160,7 +1198,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
"op.send_trailing_metadata");
}
}
@@ -1179,8 +1218,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->recv_message = op->recv_message;
if (s->id != 0 &&
(s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
- incoming_byte_stream_update_flow_control(exec_ctx, t, s,
- t->stream_lookahead, 0);
+ incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1224,43 +1262,46 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_TIMER_END("perform_stream_op", 0);
}
+static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error) {
+ /* callback remaining pings: they're not allowed to call into the transpot,
+ and maybe they hold resources that need to be freed */
+ for (size_t i = 0; i < GRPC_CHTTP2_PING_TYPE_COUNT; i++) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[i];
+ for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
+ grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
+ grpc_closure_list_sched(exec_ctx, &pq->lists[j]);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_closure *on_recv) {
- grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
- p->next = &t->pings;
- p->prev = p->next->prev;
- p->prev->next = p->next->prev = p;
- p->id[0] = (uint8_t)((t->ping_counter >> 56) & 0xff);
- p->id[1] = (uint8_t)((t->ping_counter >> 48) & 0xff);
- p->id[2] = (uint8_t)((t->ping_counter >> 40) & 0xff);
- p->id[3] = (uint8_t)((t->ping_counter >> 32) & 0xff);
- p->id[4] = (uint8_t)((t->ping_counter >> 24) & 0xff);
- p->id[5] = (uint8_t)((t->ping_counter >> 16) & 0xff);
- p->id[6] = (uint8_t)((t->ping_counter >> 8) & 0xff);
- p->id[7] = (uint8_t)(t->ping_counter & 0xff);
- t->ping_counter++;
- p->on_recv = on_recv;
- grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
- grpc_chttp2_initiate_write(exec_ctx, t, true, "send_ping");
+ grpc_chttp2_ping_type ping_type,
+ grpc_closure *on_initiate, grpc_closure *on_ack) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
+ GRPC_ERROR_NONE);
+ if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
+ GRPC_ERROR_NONE)) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping");
+ }
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- const uint8_t *opaque_8bytes) {
- grpc_chttp2_outstanding_ping *ping;
- for (ping = t->pings.next; ping != &t->pings; ping = ping->next) {
- if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE);
- ping->next->prev = ping->prev;
- ping->prev->next = ping->next;
- gpr_free(ping);
- return;
- }
+ uint64_t id) {
+ grpc_chttp2_ping_queue *pq =
+ &t->ping_queues[id % GRPC_CHTTP2_PING_TYPE_COUNT];
+ if (pq->inflight_id != id) {
+ char *from = grpc_endpoint_get_peer(t->ep);
+ gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
+ gpr_free(from);
+ return;
+ }
+ grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings");
}
- char *msg = gpr_dump((const char *)opaque_8bytes, 8, GPR_DUMP_HEX);
- char *from = grpc_endpoint_get_peer(t->ep);
- gpr_log(GPR_DEBUG, "Unknown ping response from %s: %s", from, msg);
- gpr_free(from);
- gpr_free(msg);
}
static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1308,7 +1349,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_ping) {
- send_ping_locked(exec_ctx, t, op->send_ping);
+ send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL,
+ op->send_ping);
}
if (close_transport != GRPC_ERROR_NONE) {
@@ -1733,34 +1775,28 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
-/** update window from a settings change */
-typedef struct {
- grpc_chttp2_transport *t;
- grpc_exec_ctx *exec_ctx;
-} update_global_window_args;
+/*******************************************************************************
+ * INPUT PROCESSING - PARSING
+ */
-static void update_global_window(void *args, uint32_t id, void *stream) {
- update_global_window_args *a = args;
- grpc_chttp2_transport *t = a->t;
- grpc_chttp2_stream *s = stream;
- int was_zero;
- int is_zero;
- int64_t initial_window_update = t->initial_window_update;
-
- if (initial_window_update > 0) {
- was_zero = s->outgoing_window <= 0;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", t, s, outgoing_window,
- initial_window_update);
- is_zero = s->outgoing_window <= 0;
-
- if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(a->exec_ctx, t, s, true,
- "update_global_window");
- }
+static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ double bdp_dbl) {
+ uint32_t bdp;
+ if (bdp_dbl <= 0) {
+ bdp = 0;
+ } else if (bdp_dbl > UINT32_MAX) {
+ bdp = UINT32_MAX;
} else {
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("settings", t, s, outgoing_window,
- -initial_window_update);
+ bdp = (uint32_t)(bdp_dbl);
+ }
+ int64_t delta =
+ (int64_t)bdp -
+ (int64_t)t->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) {
+ return;
}
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp);
}
/*******************************************************************************
@@ -1802,6 +1838,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp;
+ bool need_bdp_ping = false;
GRPC_ERROR_REF(error);
@@ -1819,9 +1856,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
+ if (grpc_bdp_estimator_add_incoming_bytes(
+ &t->bdp_estimator,
+ (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) {
+ need_bdp_ping = true;
+ }
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
- };
+ }
if (errors[1] != GRPC_ERROR_NONE) {
errors[2] = try_http_parsing(exec_ctx, t);
GRPC_ERROR_UNREF(error);
@@ -1835,21 +1877,16 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("post_parse_locked", 0);
if (t->initial_window_update != 0) {
- update_global_window_args args = {t, exec_ctx};
- grpc_chttp2_stream_map_for_each(&t->stream_map, update_global_window,
- &args);
+ if (t->initial_window_update > 0) {
+ grpc_chttp2_stream *s;
+ while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
+ grpc_chttp2_become_writable(
+ exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "unstalled");
+ }
+ }
t->initial_window_update = 0;
}
- /* handle higher level things */
- if (t->incoming_window < t->connection_window_target * 3 / 4) {
- int64_t announce_bytes = t->connection_window_target - t->incoming_window;
- GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, announce_incoming_window,
- announce_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, incoming_window,
- announce_bytes);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "global incoming window");
- }
-
GPR_TIMER_END("post_parse_locked", 0);
}
@@ -1870,6 +1907,35 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
+
+ if (need_bdp_ping) {
+ GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
+ grpc_bdp_estimator_schedule_ping(&t->bdp_estimator);
+ send_ping_locked(exec_ctx, t,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
+ &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
+ }
+
+ int64_t estimate = -1;
+ if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
+ double target = 1 + log2((double)estimate);
+ double memory_pressure = grpc_resource_quota_get_memory_pressure(
+ grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep)));
+ if (memory_pressure > 0.8) {
+ target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
+ }
+ double bdp_error = target - grpc_pid_controller_last(&t->pid_controller);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
+ double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
+ if (dt > 0.1) {
+ dt = 0.1;
+ }
+ double log2_bdp_guess =
+ grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
+ update_bdp(exec_ctx, t, pow(2, log2_bdp_guess));
+ t->last_pid_update = now;
+ }
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -1882,6 +1948,26 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action_locked", 0);
}
+static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
+ }
+ grpc_bdp_estimator_start_ping(&t->bdp_estimator);
+}
+
+static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
+ }
+ grpc_bdp_estimator_complete_ping(&t->bdp_estimator);
+
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+}
+
/*******************************************************************************
* CALLBACK LOOP
*/
@@ -1932,10 +2018,12 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
size_t max_size_hint,
size_t have_already) {
uint32_t max_recv_bytes;
+ uint32_t initial_window_size =
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */
- if (max_size_hint >= UINT32_MAX - t->stream_lookahead) {
- max_recv_bytes = UINT32_MAX - t->stream_lookahead;
+ if (max_size_hint >= UINT32_MAX - initial_window_size) {
+ max_recv_bytes = UINT32_MAX - initial_window_size;
} else {
max_recv_bytes = (uint32_t)max_size_hint;
}
@@ -1948,20 +2036,26 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
}
/* add some small lookahead to keep pipelines flowing */
- GPR_ASSERT(max_recv_bytes <= UINT32_MAX - t->stream_lookahead);
- max_recv_bytes += t->stream_lookahead;
- if (s->max_recv_bytes < max_recv_bytes) {
- uint32_t add_max_recv_bytes = max_recv_bytes - s->max_recv_bytes;
- bool new_window_write_is_covered_by_poller =
- s->max_recv_bytes < have_already;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, max_recv_bytes,
- add_max_recv_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window,
+ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size);
+ if (s->incoming_window_delta < max_recv_bytes && !s->read_closed) {
+ uint32_t add_max_recv_bytes =
+ (uint32_t)(max_recv_bytes - s->incoming_window_delta);
+ grpc_chttp2_stream_write_type write_type =
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED;
+ if (s->incoming_window_delta + initial_window_size <
+ (int64_t)have_already) {
+ write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
+ }
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes);
- grpc_chttp2_become_writable(exec_ctx, t, s,
- new_window_write_is_covered_by_poller,
+ if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size -
+ (int64_t)s->announce_window >
+ (int64_t)initial_window_size / 2) {
+ write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
+ }
+ grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
"read_incoming_stream");
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index 7de5f6362d..f487533c41 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -40,7 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
+grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) {
grpc_slice slice = grpc_slice_malloc(9 + 8);
uint8_t *p = GRPC_SLICE_START_PTR(slice);
@@ -53,7 +53,14 @@ grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
*p++ = 0;
*p++ = 0;
*p++ = 0;
- memcpy(p, opaque_8bytes, 8);
+ *p++ = (uint8_t)(opaque_8bytes >> 56);
+ *p++ = (uint8_t)(opaque_8bytes >> 48);
+ *p++ = (uint8_t)(opaque_8bytes >> 40);
+ *p++ = (uint8_t)(opaque_8bytes >> 32);
+ *p++ = (uint8_t)(opaque_8bytes >> 24);
+ *p++ = (uint8_t)(opaque_8bytes >> 16);
+ *p++ = (uint8_t)(opaque_8bytes >> 8);
+ *p++ = (uint8_t)(opaque_8bytes);
return slice;
}
@@ -70,6 +77,7 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
}
parser->byte = 0;
parser->is_ack = flags;
+ parser->opaque_8bytes = 0;
return GRPC_ERROR_NONE;
}
@@ -83,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_ping_parser *p = parser;
while (p->byte != 8 && cur != end) {
- p->opaque_8bytes[p->byte] = *cur;
+ p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte));
cur++;
p->byte++;
}
@@ -93,8 +101,12 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
- grpc_slice_buffer_add(&t->qbuf,
- grpc_chttp2_ping_create(1, p->opaque_8bytes));
+ if (t->ping_ack_count == t->ping_ack_capacity) {
+ t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3);
+ t->ping_acks = gpr_realloc(
+ t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
+ }
+ t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response");
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h
index b9889e2d11..ef642465d7 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.h
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.h
@@ -41,10 +41,10 @@
typedef struct {
uint8_t byte;
uint8_t is_ack;
- uint8_t opaque_8bytes[8];
+ uint64_t opaque_8bytes;
} grpc_chttp2_ping_parser;
-grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes);
+grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes);
grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
uint32_t length, uint8_t flags);
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c
index be9b663ac1..82290e34cd 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.c
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.c
@@ -236,7 +236,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[parser->id] != parser->value) {
- t->initial_window_update =
+ t->initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[parser->id];
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "adding %d for initial_window change",
@@ -245,8 +245,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
parser->incoming_settings[parser->id] = parser->value;
if (grpc_http_trace) {
- gpr_log(GPR_DEBUG, "CHTTP2:%s: got setting %d = %d",
- t->is_client ? "CLI" : "SVR", parser->id, parser->value);
+ gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %d = %d",
+ t->is_client ? "CLI" : "SVR", t->peer_string, parser->id,
+ parser->value);
}
} else if (grpc_http_trace) {
gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)",
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index 31a31c2871..8fa0bb471a 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -110,13 +110,12 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
if (t->incoming_stream_id != 0) {
if (s != NULL) {
- bool was_zero = s->outgoing_window <= 0;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window,
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta,
received_update);
- bool is_zero = s->outgoing_window <= 0;
- if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(exec_ctx, t, s, false,
- "stream.read_flow_control");
+ if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
+ grpc_chttp2_become_writable(
+ exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "stream.read_flow_control");
}
}
} else {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ee5edc92df..1dabf9edba 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -50,7 +50,9 @@
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/pid_controller.h"
#include "src/core/lib/transport/transport_impl.h"
/* streams are kept in various linked lists depending on what things need to
@@ -59,6 +61,7 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
+ GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@@ -72,6 +75,34 @@ typedef enum {
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
} grpc_chttp2_write_state;
+typedef enum {
+ GRPC_CHTTP2_PING_ON_NEXT_WRITE = 0,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
+ GRPC_CHTTP2_PING_TYPE_COUNT /* must be last */
+} grpc_chttp2_ping_type;
+
+typedef enum {
+ GRPC_CHTTP2_PCL_INITIATE = 0,
+ GRPC_CHTTP2_PCL_NEXT,
+ GRPC_CHTTP2_PCL_INFLIGHT,
+ GRPC_CHTTP2_PCL_COUNT /* must be last */
+} grpc_chttp2_ping_closure_list;
+
+typedef struct {
+ grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT];
+ uint64_t inflight_id;
+} grpc_chttp2_ping_queue;
+
+typedef struct {
+ gpr_timespec min_time_between_pings;
+ int max_pings_without_data;
+} grpc_chttp2_repeated_ping_policy;
+
+typedef struct {
+ gpr_timespec last_ping_sent_time;
+ int pings_before_data_required;
+} grpc_chttp2_repeated_ping_state;
+
/* deframer state for the overall http2 stream of bytes */
typedef enum {
/* prefix: one entry per http2 connection prefix byte */
@@ -144,14 +175,6 @@ typedef enum {
GRPC_CHTTP2_GOAWAY_SENT,
} grpc_chttp2_sent_goaway_state;
-/* Outstanding ping request data */
-typedef struct grpc_chttp2_outstanding_ping {
- uint8_t id[8];
- grpc_closure *on_recv;
- struct grpc_chttp2_outstanding_ping *next;
- struct grpc_chttp2_outstanding_ping *prev;
-} grpc_chttp2_outstanding_ping;
-
typedef struct grpc_chttp2_write_cb {
int64_t call_at_byte;
grpc_closure *closure;
@@ -271,16 +294,19 @@ struct grpc_chttp2_transport {
copied to next_stream_id in parsing when parsing commences */
uint32_t next_stream_id;
- /** how far to lookahead in a stream? */
- uint32_t stream_lookahead;
-
/** last new stream id */
uint32_t last_new_stream_id;
- /** pings awaiting responses */
- grpc_chttp2_outstanding_ping pings;
- /** next payload for an outgoing ping */
- uint64_t ping_counter;
+ /** ping queues for various ping insertion points */
+ grpc_chttp2_ping_queue ping_queues[GRPC_CHTTP2_PING_TYPE_COUNT];
+ grpc_chttp2_repeated_ping_policy ping_policy;
+ grpc_chttp2_repeated_ping_state ping_state;
+ uint64_t ping_ctr; /* unique id for pings */
+
+ /** ping acks */
+ size_t ping_ack_count;
+ size_t ping_ack_capacity;
+ uint64_t *ping_acks;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
@@ -324,6 +350,13 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool;
+ /* bdp estimator */
+ grpc_bdp_estimator bdp_estimator;
+ grpc_pid_controller pid_controller;
+ grpc_closure start_bdp_ping_locked;
+ grpc_closure finish_bdp_ping_locked;
+ gpr_timespec last_pid_update;
+
/* if non-NULL, close the transport with this error when writes are finished
*/
grpc_error *close_transport_on_writes_finished;
@@ -362,12 +395,10 @@ struct grpc_chttp2_stream {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
- /** window available for us to send to peer */
- int64_t outgoing_window;
- /** The number of bytes the upper layers have offered to receive.
- As the upper layer offers more bytes, this value increases.
- As bytes are read, this value decreases. */
- uint32_t max_recv_bytes;
+ /** window available for us to send to peer, over or under the initial window
+ * size of the transport... ie:
+ * outgoing_window = outgoing_window_delta + transport.initial_window_size */
+ int64_t outgoing_window_delta;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
grpc_closure *send_initial_metadata_finished;
@@ -428,8 +459,10 @@ struct grpc_chttp2_stream {
grpc_error *forced_close_error;
/** how many header frames have we received? */
uint8_t header_frames_received;
- /** window available for peer to send to us */
- int64_t incoming_window;
+ /** window available for peer to send to us (as a delta on
+ * transport.initial_window_size)
+ * incoming_window = incoming_window_delta + transport.initial_window_size */
+ int64_t incoming_window_delta;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */
@@ -478,36 +511,43 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
/** Get a writable stream
returns non-zero if there was a stream available */
-int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport *t, grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t);
-int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t);
+bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
+void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
+bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
uint32_t id);
grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
@@ -672,13 +712,23 @@ void grpc_chttp2_incoming_byte_stream_finished(
grpc_error *error);
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- const uint8_t *opaque_8bytes);
+ uint64_t id);
+
+typedef enum {
+ /* don't initiate a transport write, but piggyback on the next one */
+ GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
+ /* initiate a covered write */
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ /* initiate an uncovered write */
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED
+} grpc_chttp2_stream_write_type;
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, bool covered_by_poller,
+ grpc_chttp2_stream *s,
+ grpc_chttp2_stream_write_type type,
const char *reason);
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index f58cd696f9..24bd93067b 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -376,25 +376,45 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err;
}
+ uint32_t target_incoming_window = GPR_MAX(
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ 1024);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
+ if (t->incoming_window <= target_incoming_window / 2) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
+ }
if (s != NULL) {
- if (incoming_frame_size > s->incoming_window) {
+ if (incoming_frame_size >
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
char *msg;
gpr_asprintf(&msg,
"frame of size %d overflows incoming window of %" PRId64,
- t->incoming_frame_size, s->incoming_window);
+ t->incoming_frame_size,
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window,
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta,
incoming_frame_size);
+ if ((int64_t)t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] +
+ (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <=
+ (int64_t)t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] /
+ 2) {
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "window-update-required");
+ }
s->received_bytes += incoming_frame_size;
- s->max_recv_bytes -=
- (uint32_t)GPR_MIN(s->max_recv_bytes, incoming_frame_size);
}
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index a60264cc51..078818fb18 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -37,14 +37,14 @@
/* core list management */
-static int stream_list_empty(grpc_chttp2_transport *t,
- grpc_chttp2_stream_list_id id) {
+static bool stream_list_empty(grpc_chttp2_transport *t,
+ grpc_chttp2_stream_list_id id) {
return t->lists[id].head == NULL;
}
-static int stream_list_pop(grpc_chttp2_transport *t,
- grpc_chttp2_stream **stream,
- grpc_chttp2_stream_list_id id) {
+static bool stream_list_pop(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **stream,
+ grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
@@ -124,8 +124,8 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
-int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
@@ -139,12 +139,12 @@ bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING);
}
-int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
+bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
}
-int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
}
@@ -153,8 +153,8 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
-int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
@@ -168,8 +168,8 @@ void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
-int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
@@ -177,3 +177,18 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
+
+void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
+
+bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
+ return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
+
+bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 148e3844b5..05e6f59947 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -56,6 +56,75 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->write_cb_pool = cb;
}
+static void collapse_pings_from_into(grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type,
+ grpc_chttp2_ping_queue *pq) {
+ for (size_t i = 0; i < GRPC_CHTTP2_PCL_COUNT; i++) {
+ grpc_closure_list_move(&t->ping_queues[ping_type].lists[i], &pq->lists[i]);
+ }
+}
+
+static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
+ if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
+ /* no ping needed: wait */
+ return;
+ }
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
+ /* ping already in-flight: wait */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "Ping delayed [%p]: already pinging", t->peer_string);
+ }
+ return;
+ }
+ if (t->ping_state.pings_before_data_required == 0 &&
+ t->ping_policy.max_pings_without_data != 0) {
+ /* need to send something of substance before sending a ping again */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "Ping delayed [%p]: too many recent pings: %d/%d",
+ t->peer_string, t->ping_state.pings_before_data_required,
+ t->ping_policy.max_pings_without_data);
+ }
+ return;
+ }
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec elapsed = gpr_time_sub(now, t->ping_state.last_ping_sent_time);
+ /*gpr_log(GPR_DEBUG, "elapsed:%d.%09d min:%d.%09d", (int)elapsed.tv_sec,
+ elapsed.tv_nsec, (int)t->ping_policy.min_time_between_pings.tv_sec,
+ (int)t->ping_policy.min_time_between_pings.tv_nsec);*/
+ if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) {
+ /* not enough elapsed time between successive pings */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG,
+ "Ping delayed [%p]: not enough time elapsed since last ping",
+ t->peer_string);
+ }
+ return;
+ }
+ /* coalesce equivalent pings into this one */
+ switch (ping_type) {
+ case GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE:
+ collapse_pings_from_into(t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, pq);
+ break;
+ case GRPC_CHTTP2_PING_ON_NEXT_WRITE:
+ break;
+ case GRPC_CHTTP2_PING_TYPE_COUNT:
+ GPR_UNREACHABLE_CODE(break);
+ }
+ pq->inflight_id = t->ping_ctr * GRPC_CHTTP2_PING_TYPE_COUNT + ping_type;
+ t->ping_ctr++;
+ grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
+ grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
+ &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ grpc_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_ping_create(false, pq->inflight_id));
+ t->ping_state.last_ping_sent_time = now;
+ t->ping_state.pings_before_data_required -=
+ (t->ping_state.pings_before_data_required != 0);
+}
+
static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, int64_t send_bytes,
grpc_chttp2_write_cb **list, grpc_error *error) {
@@ -139,6 +208,8 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
s->sent_initial_metadata = true;
sent_initial_metadata = true;
now_writing = true;
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
}
/* send any window updates */
if (s->announce_window > 0) {
@@ -146,15 +217,22 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(
s->id, s->announce_window, &s->stats.outgoing));
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0) {
- uint32_t max_outgoing =
- (uint32_t)GPR_MIN(t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- GPR_MIN(s->outgoing_window, t->outgoing_window));
+ uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
+ 0,
+ s->outgoing_window_delta +
+ (int64_t)t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+ uint32_t max_outgoing = (uint32_t)GPR_MIN(
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ GPR_MIN(stream_outgoing_window, t->outgoing_window));
if (max_outgoing > 0) {
uint32_t send_bytes =
(uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
@@ -167,10 +245,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
is_last_frame, &s->stats.outgoing,
&t->outbuf);
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = true;
@@ -189,6 +269,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
} else if (t->outgoing_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(t, s);
now_writing = true;
+ } else if (stream_outgoing_window == 0) {
+ grpc_chttp2_list_add_stalled_by_stream(t, s);
+ now_writing = true;
}
}
if (s->send_trailing_metadata != NULL &&
@@ -227,15 +310,32 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
- if (t->announce_incoming_window > 0) {
- uint32_t announced =
- (uint32_t)GPR_MIN(t->announce_incoming_window, UINT32_MAX);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, announce_incoming_window,
- announced);
+ uint32_t target_incoming_window = GPR_MAX(
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ 1024);
+ uint32_t threshold_to_send_transport_window_update =
+ t->outbuf.count > 0 ? 3 * target_incoming_window / 4
+ : target_incoming_window / 2;
+ if (t->incoming_window <= threshold_to_send_transport_window_update) {
+ maybe_initiate_ping(exec_ctx, t,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
+ uint32_t announced = (uint32_t)GPR_CLAMP(
+ target_incoming_window - t->incoming_window, 0, UINT32_MAX);
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("write", t, incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create(
0, announced, &throwaway_stats));
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
+ }
+
+ for (size_t i = 0; i < t->ping_ack_count; i++) {
+ grpc_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_ping_create(1, t->ping_acks[i]));
}
+ t->ping_ack_count = 0;
+
+ maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE);
GPR_TIMER_END("grpc_chttp2_begin_write", 0);
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index ccc0619e1c..068c61c92a 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -140,7 +140,7 @@ static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
const grpc_channel_info *channel_info) {}
-static const grpc_channel_filter connected_channel_filter = {
+const grpc_channel_filter grpc_connected_filter = {
con_start_transport_stream_op,
con_start_transport_op,
sizeof(call_data),
@@ -158,7 +158,7 @@ static const grpc_channel_filter connected_channel_filter = {
static void bind_transport(grpc_channel_stack *channel_stack,
grpc_channel_element *elem, void *t) {
channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(elem->filter == &connected_channel_filter);
+ GPR_ASSERT(elem->filter == &grpc_connected_filter);
GPR_ASSERT(cd->transport == NULL);
cd->transport = t;
@@ -178,7 +178,7 @@ bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx,
grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
GPR_ASSERT(t != NULL);
return grpc_channel_stack_builder_append_filter(
- builder, &connected_channel_filter, bind_transport, t);
+ builder, &grpc_connected_filter, bind_transport, t);
}
grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) {
diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h
index 3585c0ecbc..5c7ea9ed26 100644
--- a/src/core/lib/channel/connected_channel.h
+++ b/src/core/lib/channel/connected_channel.h
@@ -36,8 +36,13 @@
#include "src/core/lib/channel/channel_stack_builder.h"
+extern const grpc_channel_filter grpc_connected_filter;
+
bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder,
void *arg_must_be_null);
+/* Debug helper to dig the transport stream out of a call element */
+grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem);
+
#endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 8e4efd3370..509c1ff95d 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -51,20 +51,22 @@ void grpc_closure_list_init(grpc_closure_list *closure_list) {
closure_list->head = closure_list->tail = NULL;
}
-void grpc_closure_list_append(grpc_closure_list *closure_list,
+bool grpc_closure_list_append(grpc_closure_list *closure_list,
grpc_closure *closure, grpc_error *error) {
if (closure == NULL) {
GRPC_ERROR_UNREF(error);
- return;
+ return false;
}
closure->error_data.error = error;
closure->next_data.next = NULL;
- if (closure_list->head == NULL) {
+ bool was_empty = (closure_list->head == NULL);
+ if (was_empty) {
closure_list->head = closure;
} else {
closure_list->tail->next_data.next = closure;
}
closure_list->tail = closure;
+ return was_empty;
}
void grpc_closure_list_fail_all(grpc_closure_list *list,
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 6748b21e59..2510d50b42 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -116,8 +116,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
void grpc_closure_list_init(grpc_closure_list *list);
/** add \a closure to the end of \a list
- and set \a closure's result to \a error */
-void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
+ and set \a closure's result to \a error
+ Returns true if \a list becomes non-empty */
+bool grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
grpc_error *error);
/** force all success bits in \a list to false */
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index d5995a5ac6..2cc979467f 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -33,6 +33,8 @@
#include "src/core/lib/iomgr/resource_quota.h"
+#include <limits.h>
+#include <stdint.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -44,6 +46,8 @@
int grpc_resource_quota_trace = 0;
+#define MEMORY_USAGE_ESTIMATION_MAX 65536
+
/* Internal linked list pointers for a resource user */
typedef struct {
grpc_resource_user *next;
@@ -126,9 +130,12 @@ struct grpc_resource_quota {
/* refcount */
gpr_refcount refs;
+ /* estimate of current memory usage
+ scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
+ gpr_atm memory_usage_estimation;
+
/* Master combiner lock: all activity on a quota executes under this combiner
- * (so no mutex is needed for this data structure)
- */
+ * (so no mutex is needed for this data structure) */
grpc_combiner *combiner;
/* Size of the resource quota */
int64_t size;
@@ -269,6 +276,16 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE);
}
+/* update the atomically available resource estimate - use no barriers since
+ timeliness of delivery really doesn't matter much */
+static void rq_update_estimate(grpc_resource_quota *resource_quota) {
+ gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
+ (gpr_atm)((1.0 -
+ ((double)resource_quota->free_pool) /
+ ((double)resource_quota->size)) *
+ MEMORY_USAGE_ESTIMATION_MAX));
+}
+
/* returns true if all allocations are completed */
static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
@@ -281,6 +298,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
int64_t amt = -resource_user->free_pool;
resource_user->free_pool = 0;
resource_quota->free_pool -= amt;
+ rq_update_estimate(resource_quota);
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64
" bytes; rq_free_pool -> %" PRId64,
@@ -315,6 +333,7 @@ static bool rq_reclaim_from_per_user_free_pool(
int64_t amt = resource_user->free_pool;
resource_user->free_pool = 0;
resource_quota->free_pool += amt;
+ rq_update_estimate(resource_quota);
if (grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
" bytes; rq_free_pool -> %" PRId64,
@@ -531,6 +550,7 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
int64_t delta = a->size - a->resource_quota->size;
a->resource_quota->size += delta;
a->resource_quota->free_pool += delta;
+ rq_update_estimate(a->resource_quota);
rq_step_sched(exec_ctx, a->resource_quota);
grpc_resource_quota_unref_internal(exec_ctx, a->resource_quota);
gpr_free(a);
@@ -557,6 +577,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
resource_quota->size = INT64_MAX;
resource_quota->step_scheduled = false;
resource_quota->reclaiming = false;
+ gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
if (name != NULL) {
resource_quota->name = gpr_strdup(name);
} else {
@@ -602,6 +623,13 @@ void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) {
grpc_resource_quota_ref_internal(resource_quota);
}
+double grpc_resource_quota_get_memory_pressure(
+ grpc_resource_quota *resource_quota) {
+ return ((double)(gpr_atm_no_barrier_load(
+ &resource_quota->memory_usage_estimation))) /
+ ((double)MEMORY_USAGE_ESTIMATION_MAX);
+}
+
/* Public API */
void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
size_t size) {
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index d1127ce9ea..b9f62cbf83 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -84,6 +84,12 @@ void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *grpc_resource_quota_from_channel_args(
const grpc_channel_args *channel_args);
+/* Return a number indicating current memory pressure:
+ 0.0 ==> no memory usage
+ 1.0 ==> maximum memory usage */
+double grpc_resource_quota_get_memory_pressure(
+ grpc_resource_quota *resource_quota);
+
typedef struct grpc_resource_user grpc_resource_user;
grpc_resource_user *grpc_resource_user_create(
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 3b23b47d4f..02a194c982 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -76,8 +76,10 @@ struct grpc_udp_listener {
grpc_udp_server *server;
grpc_resolved_address addr;
grpc_closure read_closure;
+ grpc_closure write_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
+ grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
struct grpc_udp_listener *next;
@@ -304,9 +306,33 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_mu_unlock(&sp->server->mu);
}
+static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_udp_listener *sp = arg;
+
+ gpr_mu_lock(&(sp->server->mu));
+ if (error != GRPC_ERROR_NONE) {
+ if (0 == --sp->server->active_ports) {
+ gpr_mu_unlock(&sp->server->mu);
+ deactivated_all_ports(exec_ctx, sp->server);
+ } else {
+ gpr_mu_unlock(&sp->server->mu);
+ }
+ return;
+ }
+
+ /* Tell the registered callback that the socket is writeable. */
+ GPR_ASSERT(sp->write_cb);
+ sp->write_cb(exec_ctx, sp->emfd);
+
+ /* Re-arm the notification event so we get another chance to write. */
+ grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ gpr_mu_unlock(&sp->server->mu);
+}
+
static int add_socket_to_server(grpc_udp_server *s, int fd,
const grpc_resolved_address *addr,
grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
grpc_udp_listener *sp;
int port;
@@ -333,6 +359,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp->emfd = grpc_fd_create(fd, name);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->read_cb = read_cb;
+ sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
@@ -345,6 +372,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
int grpc_udp_server_add_port(grpc_udp_server *s,
const grpc_resolved_address *addr,
grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
grpc_udp_listener *sp;
int allocated_port1 = -1;
@@ -391,7 +419,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
&dsmode, &fd));
- allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
+ allocated_port1 =
+ add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -413,7 +442,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
addr = &addr4_copy;
}
- allocated_port2 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
+ allocated_port2 =
+ add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
done:
gpr_free(allocated_addr);
@@ -451,6 +481,10 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ grpc_closure_init(&sp->write_closure, on_write, sp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+
s->active_ports++;
sp = sp->next;
}
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index f3c466a031..ce068cbf04 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -49,6 +49,10 @@ typedef struct grpc_udp_server grpc_udp_server;
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
struct grpc_server *server);
+/* Called when the socket is writeable. */
+typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx,
+ grpc_fd *emfd);
+
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd);
@@ -75,6 +79,7 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index);
int grpc_udp_server_add_port(grpc_udp_server *s,
const grpc_resolved_address *addr,
grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
diff --git a/src/core/lib/support/log_posix.c b/src/core/lib/support/log_posix.c
index f972da0887..79458dd7a3 100644
--- a/src/core/lib/support/log_posix.c
+++ b/src/core/lib/support/log_posix.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <pthread.h>
#include <stdarg.h>
@@ -93,10 +94,13 @@ void gpr_default_log(gpr_log_func_args *args) {
strcpy(time_buffer, "error:strftime");
}
- fprintf(stderr, "%s%s.%09d %7tu %s:%d] %s\n",
- gpr_log_severity_string(args->severity), time_buffer,
- (int)(now.tv_nsec), gettid(), display_file, args->line,
- args->message);
+ char *prefix;
+ gpr_asprintf(&prefix, "%s%s.%09d %7tu %s:%d]",
+ gpr_log_severity_string(args->severity), time_buffer,
+ (int)(now.tv_nsec), gettid(), display_file, args->line);
+
+ fprintf(stderr, "%-70s %s\n", prefix, args->message);
+ gpr_free(prefix);
}
#endif /* defined(GPR_POSIX_LOG) */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 787e4d0dd2..b338ac4c48 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -63,6 +63,7 @@
#include "src/core/lib/surface/init.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/server.h"
+#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -192,6 +193,7 @@ void grpc_init(void) {
grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
grpc_register_tracer("combiner", &grpc_combiner_trace);
grpc_register_tracer("server_channel", &grpc_server_channel_trace);
+ grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace);
// Default pluck trace to 1
grpc_cq_pluck_trace = 1;
grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace);
diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c
new file mode 100644
index 0000000000..e1483677fd
--- /dev/null
+++ b/src/core/lib/transport/bdp_estimator.c
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/transport/bdp_estimator.h"
+
+#include <stdlib.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+int grpc_bdp_estimator_trace = 0;
+
+void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
+ estimator->estimate = 65536;
+ estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
+ estimator->name = name;
+}
+
+bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
+ int64_t *estimate) {
+ *estimate = estimator->estimate;
+ return true;
+}
+
+bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
+ int64_t num_bytes) {
+ estimator->accumulator += num_bytes;
+ switch (estimator->ping_state) {
+ case GRPC_BDP_PING_UNSCHEDULED:
+ return true;
+ case GRPC_BDP_PING_SCHEDULED:
+ return false;
+ case GRPC_BDP_PING_STARTED:
+ return false;
+ }
+ GPR_UNREACHABLE_CODE(return false);
+}
+
+void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator) {
+ if (grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:sched acc=%" PRId64 " est=%" PRId64,
+ estimator->name, estimator->accumulator, estimator->estimate);
+ }
+ GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_UNSCHEDULED);
+ estimator->ping_state = GRPC_BDP_PING_SCHEDULED;
+ estimator->accumulator = 0;
+}
+
+void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator) {
+ if (grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:start acc=%" PRId64 " est=%" PRId64,
+ estimator->name, estimator->accumulator, estimator->estimate);
+ }
+ GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_SCHEDULED);
+ estimator->ping_state = GRPC_BDP_PING_STARTED;
+ estimator->accumulator = 0;
+}
+
+void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator) {
+ if (grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "bdp[%s]:complete acc=%" PRId64 " est=%" PRId64,
+ estimator->name, estimator->accumulator, estimator->estimate);
+ }
+ GPR_ASSERT(estimator->ping_state == GRPC_BDP_PING_STARTED);
+ if (estimator->accumulator > 2 * estimator->estimate / 3) {
+ estimator->estimate *= 2;
+ if (grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "bdp[%s]: estimate increased to %" PRId64,
+ estimator->name, estimator->estimate);
+ }
+ }
+ estimator->ping_state = GRPC_BDP_PING_UNSCHEDULED;
+ estimator->accumulator = 0;
+}
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
new file mode 100644
index 0000000000..bcaf899910
--- /dev/null
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
+#define GRPC_CORE_LIB_TRANSPORT_BDP_ESTIMATOR_H
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#define GRPC_BDP_SAMPLES 16
+#define GRPC_BDP_MIN_SAMPLES_FOR_ESTIMATE 3
+
+extern int grpc_bdp_estimator_trace;
+
+typedef enum {
+ GRPC_BDP_PING_UNSCHEDULED,
+ GRPC_BDP_PING_SCHEDULED,
+ GRPC_BDP_PING_STARTED
+} grpc_bdp_estimator_ping_state;
+
+typedef struct grpc_bdp_estimator {
+ grpc_bdp_estimator_ping_state ping_state;
+ int64_t accumulator;
+ int64_t estimate;
+ const char *name;
+} grpc_bdp_estimator;
+
+void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
+
+// Returns true if a reasonable estimate could be obtained
+bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
+ int64_t *estimate);
+// Returns true if the user should schedule a ping
+bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
+ int64_t num_bytes);
+// Schedule a ping: call in response to receiving a true from
+// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
+// transport (but not necessarily started)
+void grpc_bdp_estimator_schedule_ping(grpc_bdp_estimator *estimator);
+// Start a ping: call after calling grpc_bdp_estimator_schedule_ping and once
+// the ping is on the wire
+void grpc_bdp_estimator_start_ping(grpc_bdp_estimator *estimator);
+// Completes a previously started ping
+void grpc_bdp_estimator_complete_ping(grpc_bdp_estimator *estimator);
+
+#endif
diff --git a/src/core/lib/transport/pid_controller.c b/src/core/lib/transport/pid_controller.c
index 3cef225d4b..19cb1c0b36 100644
--- a/src/core/lib/transport/pid_controller.c
+++ b/src/core/lib/transport/pid_controller.c
@@ -32,26 +32,46 @@
*/
#include "src/core/lib/transport/pid_controller.h"
+#include <grpc/support/useful.h>
void grpc_pid_controller_init(grpc_pid_controller *pid_controller,
- double gain_p, double gain_i, double gain_d) {
- pid_controller->gain_p = gain_p;
- pid_controller->gain_i = gain_i;
- pid_controller->gain_d = gain_d;
+ grpc_pid_controller_args args) {
+ pid_controller->args = args;
+ pid_controller->last_control_value = args.initial_control_value;
grpc_pid_controller_reset(pid_controller);
}
void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) {
pid_controller->last_error = 0.0;
+ pid_controller->last_dc_dt = 0.0;
pid_controller->error_integral = 0.0;
}
double grpc_pid_controller_update(grpc_pid_controller *pid_controller,
double error, double dt) {
- pid_controller->error_integral += error * dt;
+ /* integrate error using the trapezoid rule */
+ pid_controller->error_integral +=
+ dt * (pid_controller->last_error + error) * 0.5;
+ pid_controller->error_integral = GPR_CLAMP(
+ pid_controller->error_integral, -pid_controller->args.integral_range,
+ pid_controller->args.integral_range);
double diff_error = (error - pid_controller->last_error) / dt;
+ /* calculate derivative of control value vs time */
+ double dc_dt = pid_controller->args.gain_p * error +
+ pid_controller->args.gain_i * pid_controller->error_integral +
+ pid_controller->args.gain_d * diff_error;
+ /* and perform trapezoidal integration */
+ double new_control_value = pid_controller->last_control_value +
+ dt * (pid_controller->last_dc_dt + dc_dt) * 0.5;
+ new_control_value =
+ GPR_CLAMP(new_control_value, pid_controller->args.min_control_value,
+ pid_controller->args.max_control_value);
pid_controller->last_error = error;
- return dt * (pid_controller->gain_p * error +
- pid_controller->gain_i * pid_controller->error_integral +
- pid_controller->gain_d * diff_error);
+ pid_controller->last_dc_dt = dc_dt;
+ pid_controller->last_control_value = new_control_value;
+ return new_control_value;
+}
+
+double grpc_pid_controller_last(grpc_pid_controller *pid_controller) {
+ return pid_controller->last_control_value;
}
diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h
index 83c82d6471..0a86521e90 100644
--- a/src/core/lib/transport/pid_controller.h
+++ b/src/core/lib/transport/pid_controller.h
@@ -45,20 +45,33 @@ typedef struct {
double gain_p;
double gain_i;
double gain_d;
+ double initial_control_value;
+ double min_control_value;
+ double max_control_value;
+ double integral_range;
+} grpc_pid_controller_args;
+
+typedef struct {
double last_error;
double error_integral;
+ double last_control_value;
+ double last_dc_dt;
+ grpc_pid_controller_args args;
} grpc_pid_controller;
/** Initialize the controller */
void grpc_pid_controller_init(grpc_pid_controller *pid_controller,
- double gain_p, double gain_i, double gain_d);
+ grpc_pid_controller_args args);
/** Reset the controller: useful when things have changed significantly */
void grpc_pid_controller_reset(grpc_pid_controller *pid_controller);
/** Update the controller: given a current error estimate, and the time since
- the last update, returns a delta to the control value */
+ the last update, returns a new control value */
double grpc_pid_controller_update(grpc_pid_controller *pid_controller,
double error, double dt);
+/** Returns the last control value calculated */
+double grpc_pid_controller_last(grpc_pid_controller *pid_controller);
+
#endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */
diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
index bcc2bb6126..3429e2b29b 100644
--- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
+++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
@@ -1,5 +1,11 @@
-# CocoaPods podspec for the gRPC Proto Compiler Plugin
+# This file has been automatically generated from a template file.
+# Please make modifications to
+# `templates/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec.template`
+# instead. This file can be regenerated from the template by running
+# `tools/buildgen/generate_projects.sh`.
+# CocoaPods podspec for the gRPC Proto Compiler Plugin
+#
# Copyright 2016, Google Inc.
# All rights reserved.
#
@@ -36,7 +42,7 @@ Pod::Spec.new do |s|
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler-gRPCPlugin'
- v = '1.0.2'
+ v = '1.2.0-dev'
s.version = v
s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.'
s.description = <<-DESC
@@ -84,10 +90,7 @@ Pod::Spec.new do |s|
repo = 'grpc/grpc'
file = "grpc_objective_c_plugin-#{v}-macos-x86_64.zip"
s.source = {
- # TODO(mxyan): Change back to "https://github.com/#{repo}/releases/download/v#{v}/#{file}" for
- # next release
- # :http => "https://github.com/#{repo}/releases/download/v#{v}/#{file}",
- :http => "https://github.com/#{repo}/releases/download/objective-c-v#{v}/#{file}",
+ :http => "https://github.com/#{repo}/releases/download/v#{v}/#{file}",
# TODO(jcanizales): Add sha1 or sha256
# :sha1 => '??',
}
diff --git a/src/objective-c/BoringSSL.podspec b/src/objective-c/BoringSSL.podspec
index 47b5b1a2e7..908bb0b5e5 100644
--- a/src/objective-c/BoringSSL.podspec
+++ b/src/objective-c/BoringSSL.podspec
@@ -31,7 +31,7 @@
Pod::Spec.new do |s|
s.name = 'BoringSSL'
- version = '7.0'
+ version = '8.0'
s.version = version
s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google’s needs.'
# Adapted from the homepage:
@@ -388,42 +388,42 @@ Pod::Spec.new do |s|
0x28340c19,
0x283480ac,
0x283500ea,
- 0x2c322910,
- 0x2c32a91e,
- 0x2c332930,
- 0x2c33a942,
- 0x2c342956,
- 0x2c34a968,
- 0x2c352983,
- 0x2c35a995,
- 0x2c3629a8,
+ 0x2c3228ca,
+ 0x2c32a8d8,
+ 0x2c3328ea,
+ 0x2c33a8fc,
+ 0x2c342910,
+ 0x2c34a922,
+ 0x2c35293d,
+ 0x2c35a94f,
+ 0x2c362962,
0x2c36832d,
- 0x2c3729b5,
- 0x2c37a9c7,
- 0x2c3829da,
- 0x2c38a9f1,
- 0x2c3929ff,
- 0x2c39aa0f,
- 0x2c3a2a21,
- 0x2c3aaa35,
- 0x2c3b2a46,
- 0x2c3baa65,
- 0x2c3c2a79,
- 0x2c3caa8f,
- 0x2c3d2aa8,
- 0x2c3daac5,
- 0x2c3e2ad6,
- 0x2c3eaae4,
- 0x2c3f2afc,
- 0x2c3fab14,
- 0x2c402b21,
+ 0x2c37296f,
+ 0x2c37a981,
+ 0x2c382994,
+ 0x2c38a9ab,
+ 0x2c3929b9,
+ 0x2c39a9c9,
+ 0x2c3a29db,
+ 0x2c3aa9ef,
+ 0x2c3b2a00,
+ 0x2c3baa1f,
+ 0x2c3c2a33,
+ 0x2c3caa49,
+ 0x2c3d2a62,
+ 0x2c3daa7f,
+ 0x2c3e2a90,
+ 0x2c3eaa9e,
+ 0x2c3f2ab6,
+ 0x2c3faace,
+ 0x2c402adb,
0x2c4090e7,
- 0x2c412b32,
- 0x2c41ab45,
+ 0x2c412aec,
+ 0x2c41aaff,
0x2c4210c0,
- 0x2c42ab56,
+ 0x2c42ab10,
0x2c430720,
- 0x2c43aa57,
+ 0x2c43aa11,
0x30320000,
0x30328015,
0x3033001f,
@@ -639,74 +639,74 @@ Pod::Spec.new do |s|
0x405b1e9e,
0x405b9eaf,
0x405c1ec2,
- 0x405c9ee3,
- 0x405d1ef0,
- 0x405d9f07,
- 0x405e1f27,
+ 0x405c9ed3,
+ 0x405d1ee0,
+ 0x405d9ef7,
+ 0x405e1f17,
0x405e8a95,
- 0x405f1f48,
- 0x405f9f55,
- 0x40601f63,
- 0x40609f85,
- 0x40611fad,
- 0x40619fc2,
- 0x40621fd9,
- 0x40629fea,
- 0x40631ffb,
- 0x4063a010,
- 0x40642027,
- 0x4064a053,
- 0x4065206e,
- 0x4065a085,
- 0x4066209d,
- 0x4066a0c7,
- 0x406720f2,
- 0x4067a113,
- 0x40682126,
- 0x4068a147,
- 0x40692179,
- 0x4069a1a7,
- 0x406a21c8,
- 0x406aa1e8,
- 0x406b2370,
- 0x406ba393,
- 0x406c23a9,
- 0x406ca60b,
- 0x406d263a,
- 0x406da662,
- 0x406e2690,
- 0x406ea6a8,
- 0x406f26c7,
- 0x406fa6dc,
- 0x407026ef,
- 0x4070a70c,
+ 0x405f1f38,
+ 0x405f9f45,
+ 0x40601f53,
+ 0x40609f75,
+ 0x40611f9d,
+ 0x40619fb2,
+ 0x40621fc9,
+ 0x40629fda,
+ 0x40631feb,
+ 0x4063a000,
+ 0x40642017,
+ 0x4064a043,
+ 0x4065205e,
+ 0x4065a075,
+ 0x4066208d,
+ 0x4066a0b7,
+ 0x406720e2,
+ 0x4067a103,
+ 0x40682116,
+ 0x4068a137,
+ 0x40692169,
+ 0x4069a197,
+ 0x406a21b8,
+ 0x406aa1d8,
+ 0x406b2360,
+ 0x406ba383,
+ 0x406c2399,
+ 0x406ca5c5,
+ 0x406d25f4,
+ 0x406da61c,
+ 0x406e264a,
+ 0x406ea662,
+ 0x406f2681,
+ 0x406fa696,
+ 0x407026a9,
+ 0x4070a6c6,
0x40710800,
- 0x4071a71e,
- 0x40722731,
- 0x4072a74a,
- 0x40732762,
+ 0x4071a6d8,
+ 0x407226eb,
+ 0x4072a704,
+ 0x4073271c,
0x4073936d,
- 0x40742776,
- 0x4074a790,
- 0x407527a1,
- 0x4075a7b5,
- 0x407627c3,
+ 0x40742730,
+ 0x4074a74a,
+ 0x4075275b,
+ 0x4075a76f,
+ 0x4076277d,
0x407691aa,
- 0x407727e8,
- 0x4077a80a,
- 0x40782825,
- 0x4078a85e,
- 0x40792875,
- 0x4079a88b,
- 0x407a2897,
- 0x407aa8aa,
- 0x407b28bf,
- 0x407ba8d1,
- 0x407c28e6,
- 0x407ca8ef,
- 0x407d2162,
+ 0x407727a2,
+ 0x4077a7c4,
+ 0x407827df,
+ 0x4078a818,
+ 0x4079282f,
+ 0x4079a845,
+ 0x407a2851,
+ 0x407aa864,
+ 0x407b2879,
+ 0x407ba88b,
+ 0x407c28a0,
+ 0x407ca8a9,
+ 0x407d2152,
0x407d9c57,
- 0x407e283a,
+ 0x407e27f4,
0x407e9e16,
0x407f1a67,
0x407f9887,
@@ -714,45 +714,42 @@ Pod::Spec.new do |s|
0x40809a8f,
0x40811cd9,
0x40819c08,
- 0x4082267b,
+ 0x40822635,
0x4082986d,
0x40831df1,
- 0x4083a038,
+ 0x4083a028,
0x40841aa3,
0x40849e4e,
- 0x40851ed3,
- 0x41f4229b,
- 0x41f9232d,
- 0x41fe2220,
- 0x41fea3fc,
- 0x41ff24ed,
- 0x420322b4,
- 0x420822d6,
- 0x4208a312,
- 0x42092204,
- 0x4209a34c,
- 0x420a225b,
- 0x420aa23b,
- 0x420b227b,
- 0x420ba2f4,
- 0x420c2509,
- 0x420ca3c9,
- 0x420d23e3,
- 0x420da41a,
- 0x42122434,
- 0x421724d0,
- 0x4217a476,
- 0x421c2498,
- 0x421f2453,
- 0x42212520,
- 0x422624b3,
- 0x422b25ef,
- 0x422ba59d,
- 0x422c25d7,
- 0x422ca55c,
- 0x422d253b,
- 0x422da5bc,
- 0x422e2582,
+ 0x41f4228b,
+ 0x41f9231d,
+ 0x41fe2210,
+ 0x41fea3ec,
+ 0x41ff24dd,
+ 0x420322a4,
+ 0x420822c6,
+ 0x4208a302,
+ 0x420921f4,
+ 0x4209a33c,
+ 0x420a224b,
+ 0x420aa22b,
+ 0x420b226b,
+ 0x420ba2e4,
+ 0x420c24f9,
+ 0x420ca3b9,
+ 0x420d23d3,
+ 0x420da40a,
+ 0x42122424,
+ 0x421724c0,
+ 0x4217a466,
+ 0x421c2488,
+ 0x421f2443,
+ 0x42212510,
+ 0x422624a3,
+ 0x422b25a9,
+ 0x422ba572,
+ 0x422c2591,
+ 0x422ca54c,
+ 0x422d252b,
0x4432072b,
0x4432873a,
0x44330746,
@@ -795,69 +792,69 @@ Pod::Spec.new do |s|
0x4c3d136d,
0x4c3d937c,
0x4c3e1389,
- 0x50322b68,
- 0x5032ab77,
- 0x50332b82,
- 0x5033ab92,
- 0x50342bab,
- 0x5034abc5,
- 0x50352bd3,
- 0x5035abe9,
- 0x50362bfb,
- 0x5036ac11,
- 0x50372c2a,
- 0x5037ac3d,
- 0x50382c55,
- 0x5038ac66,
- 0x50392c7b,
- 0x5039ac8f,
- 0x503a2caf,
- 0x503aacc5,
- 0x503b2cdd,
- 0x503bacef,
- 0x503c2d0b,
- 0x503cad22,
- 0x503d2d3b,
- 0x503dad51,
- 0x503e2d5e,
- 0x503ead74,
- 0x503f2d86,
+ 0x50322b22,
+ 0x5032ab31,
+ 0x50332b3c,
+ 0x5033ab4c,
+ 0x50342b65,
+ 0x5034ab7f,
+ 0x50352b8d,
+ 0x5035aba3,
+ 0x50362bb5,
+ 0x5036abcb,
+ 0x50372be4,
+ 0x5037abf7,
+ 0x50382c0f,
+ 0x5038ac20,
+ 0x50392c35,
+ 0x5039ac49,
+ 0x503a2c69,
+ 0x503aac7f,
+ 0x503b2c97,
+ 0x503baca9,
+ 0x503c2cc5,
+ 0x503cacdc,
+ 0x503d2cf5,
+ 0x503dad0b,
+ 0x503e2d18,
+ 0x503ead2e,
+ 0x503f2d40,
0x503f8382,
- 0x50402d99,
- 0x5040ada9,
- 0x50412dc3,
- 0x5041add2,
- 0x50422dec,
- 0x5042ae09,
- 0x50432e19,
- 0x5043ae29,
- 0x50442e38,
+ 0x50402d53,
+ 0x5040ad63,
+ 0x50412d7d,
+ 0x5041ad8c,
+ 0x50422da6,
+ 0x5042adc3,
+ 0x50432dd3,
+ 0x5043ade3,
+ 0x50442df2,
0x5044843f,
- 0x50452e4c,
- 0x5045ae6a,
- 0x50462e7d,
- 0x5046ae93,
- 0x50472ea5,
- 0x5047aeba,
- 0x50482ee0,
- 0x5048aeee,
- 0x50492f01,
- 0x5049af16,
- 0x504a2f2c,
- 0x504aaf3c,
- 0x504b2f5c,
- 0x504baf6f,
- 0x504c2f92,
- 0x504cafc0,
- 0x504d2fd2,
- 0x504dafef,
- 0x504e300a,
- 0x504eb026,
- 0x504f3038,
- 0x504fb04f,
- 0x5050305e,
+ 0x50452e06,
+ 0x5045ae24,
+ 0x50462e37,
+ 0x5046ae4d,
+ 0x50472e5f,
+ 0x5047ae74,
+ 0x50482e9a,
+ 0x5048aea8,
+ 0x50492ebb,
+ 0x5049aed0,
+ 0x504a2ee6,
+ 0x504aaef6,
+ 0x504b2f16,
+ 0x504baf29,
+ 0x504c2f4c,
+ 0x504caf7a,
+ 0x504d2f8c,
+ 0x504dafa9,
+ 0x504e2fc4,
+ 0x504eafe0,
+ 0x504f2ff2,
+ 0x504fb009,
+ 0x50503018,
0x505086ef,
- 0x50513071,
+ 0x5051302b,
0x58320ec9,
0x68320e8b,
0x68328c25,
@@ -1292,7 +1289,6 @@ Pod::Spec.new do |s|
"NO_RENEGOTIATION\\0"
"NO_REQUIRED_DIGEST\\0"
"NO_SHARED_CIPHER\\0"
- "NO_SHARED_GROUP\\0"
"NULL_SSL_CTX\\0"
"NULL_SSL_METHOD_PASSED\\0"
"OLD_SESSION_CIPHER_NOT_RETURNED\\0"
@@ -1353,9 +1349,7 @@ Pod::Spec.new do |s|
"TLSV1_ALERT_USER_CANCELLED\\0"
"TLSV1_BAD_CERTIFICATE_HASH_VALUE\\0"
"TLSV1_BAD_CERTIFICATE_STATUS_RESPONSE\\0"
- "TLSV1_CERTIFICATE_REQUIRED\\0"
"TLSV1_CERTIFICATE_UNOBTAINABLE\\0"
- "TLSV1_UNKNOWN_PSK_IDENTITY\\0"
"TLSV1_UNRECOGNIZED_NAME\\0"
"TLSV1_UNSUPPORTED_EXTENSION\\0"
"TLS_PEER_DID_NOT_RESPOND_WITH_CERTIFICATE_LIST\\0"
diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m
index 40afca2d3d..0e2fa13f2c 100644
--- a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m
+++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m
@@ -159,11 +159,13 @@ static void PassFlagsToContextInfoBlock(SCNetworkReachabilityRef target,
if (strongSelf) {
if (lossHandler && !flags.reachable) {
lossHandler();
+#if TARGET_OS_IPHONE
} else if (wifiStatusChangeHandler &&
strongSelf->_previousReachabilityFlags &&
(flags.isWWAN ^
strongSelf->_previousReachabilityFlags.isWWAN)) {
wifiStatusChangeHandler();
+#endif
}
strongSelf->_previousReachabilityFlags = flags;
}
diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m
index 450bec36e0..246af560cd 100644
--- a/src/objective-c/GRPCClient/private/GRPCHost.m
+++ b/src/objective-c/GRPCClient/private/GRPCHost.m
@@ -45,13 +45,10 @@
#import "GRPCCompletionQueue.h"
#import "GRPCConnectivityMonitor.h"
#import "NSDictionary+GRPC.h"
+#import "version.h"
NS_ASSUME_NONNULL_BEGIN
-// TODO(jcanizales): Generate the version in a standalone header, from templates. Like
-// templates/src/core/surface/version.c.template .
-#define GRPC_OBJC_VERSION_STRING @"1.0.2"
-
static NSMutableDictionary *kHostCache;
// This connectivity monitor flushes the host cache when connectivity status
diff --git a/src/objective-c/GRPCClient/private/version.h b/src/objective-c/GRPCClient/private/version.h
new file mode 100644
index 0000000000..e569faa25b
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/version.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// This file is autogenerated from a template file. Please make
+// modifications to
+// `templates/src/objective-c/GRPCClient/private/version.h.template`
+// instead. This file can be regenerated from the template by running
+// `tools/buildgen/generate_projects.sh`.
+
+
+#define GRPC_OBJC_VERSION_STRING @"1.2.0-dev"
diff --git a/src/php/README.md b/src/php/README.md
index 320220d3e4..ed91d2fbe5 100644
--- a/src/php/README.md
+++ b/src/php/README.md
@@ -12,24 +12,28 @@ shared C library.
* `composer`
* `phpunit` (optional)
-**Ubuntu/Debian:**
+**Install PHP and PECL on Ubuntu/Debian:**
```sh
-$ sudo apt-get install php5 php5-dev
+$ sudo apt-get install php5 php5-dev php-pear
+
+OR
+
+$ sudo apt-get install php7.0 php7.0-dev php-pear
```
-**PEAR/PECL:**
+**Install PECL on Mac:**
```sh
$ curl -O http://pear.php.net/go-pear.phar
$ sudo php -d detect_unicode=0 go-pear.phar
```
-**Composer:**
+**Install Composer (Linux or Mac):**
```sh
$ curl -sS https://getcomposer.org/installer | php
$ sudo mv composer.phar /usr/local/bin/composer
```
-**PHPUnit:**
+**Install PHPUnit (Linux or Mac):**
```sh
$ wget https://phar.phpunit.de/phpunit-old.phar
$ chmod +x phpunit-old.phar
@@ -48,6 +52,14 @@ This will compile and install the gRPC PHP extension into the standard PHP
extension directory. You should be able to run the [unit tests](#unit-tests),
with the PHP extension installed.
+**Update php.ini**
+
+Add this line to your `php.ini` file, e.g. `/etc/php5/cli/php.ini`
+
+```sh
+extension=grpc.so
+```
+
**Add the gRPC PHP library as a Composer dependency**
@@ -55,7 +67,7 @@ You need to add this to your project's `composer.json` file.
```
"require": {
- "grpc/grpc": "v1.0.0"
+ "grpc/grpc": "v1.1.0"
}
```
@@ -96,14 +108,6 @@ $ make
$ sudo make install
```
-### Update php.ini
-
-Add this line to your `php.ini` file, e.g. `/etc/php5/cli/php.ini`
-
-```sh
-extension=grpc.so
-```
-
## Unit Tests
You will need the source code to run tests
@@ -138,7 +142,7 @@ $ composer install
### Protobuf compiler
Again if you don't have it already, you need to install the protobuf compiler
-`protoc`, version 3.1.0+.
+`protoc`, version 3.2.0+.
If `protoc` hasn't been installed, you can download the `protoc` binaries from
[the protocol buffers Github repository](https://github.com/google/protobuf/releases).
@@ -209,6 +213,7 @@ $ sudo apt-get install apache2
```
Add this line to your `php.ini` file, e.g. `/etc/php5/apache2/php.ini`
+or `/etc/php/7.0/apache2/php.ini`
```sh
extension=grpc.so
@@ -235,7 +240,7 @@ $ cd grpc/src/php
$ composer install
```
-Make sure you have generated the client stub `math.php`
+Make sure you have generated the client stubs
```sh
$ ./bin/generate_proto_php.sh
@@ -247,11 +252,10 @@ Copy the `math_client.php` file into your Apache document root, e.g.
$ cp tests/generated_code/math_client.php /var/www/html
```
-You may have to fix the first two lines to point the includes to your installation:
+You may have to fix the first line to point the includes to your installation:
```php
include 'vendor/autoload.php';
-include 'tests/generated_code/math.php';
```
Connect to `localhost/math_client.php` in your browser, or run this from command line:
@@ -266,6 +270,10 @@ Install `nginx` and `php5-fpm`, in addition to `php5` above
```sh
$ sudo apt-get install nginx php5-fpm
+
+OR
+
+$ sudo apt-get install nginx php7.0-fpm
```
Add this line to your `php.ini` file, e.g. `/etc/php5/fpm/php.ini`
@@ -305,7 +313,7 @@ $ cd grpc/src/php
$ composer install
```
-Make sure you have generated the client stub `math.php`
+Make sure you have generated the client stubs
```sh
$ ./bin/generate_proto_php.sh
@@ -317,11 +325,10 @@ Copy the `math_client.php` file into your Nginx document root, e.g.
$ cp tests/generated_code/math_client.php /var/www/html
```
-You may have to fix the first two lines to point the includes to your installation:
+You may have to fix the first line to point the includes to your installation:
```php
include 'vendor/autoload.php';
-include 'tests/generated_code/math.php';
```
Connect to `localhost/math_client.php` in your browser, or run this from command line:
diff --git a/src/php/composer.json b/src/php/composer.json
index 746474d4e2..8528304c81 100644
--- a/src/php/composer.json
+++ b/src/php/composer.json
@@ -5,7 +5,7 @@
"version": "1.2.0",
"require": {
"php": ">=5.5.0",
- "google/protobuf": "v3.1.0-alpha-1"
+ "google/protobuf": "v3.1.0"
},
"require-dev": {
"google/auth": "v0.9"
diff --git a/src/php/tests/generated_code/math_client.php b/src/php/tests/generated_code/math_client.php
index 6ee92bc465..9ddb1c8f3a 100644
--- a/src/php/tests/generated_code/math_client.php
+++ b/src/php/tests/generated_code/math_client.php
@@ -32,9 +32,10 @@
*
*/
-# Fix the following two lines to point to your installation
+# Fix the following line to point to your installation
+# This assumes that you are using protoc 3.2.0+ and the generated stubs
+# were being autoloaded via composer.
include 'vendor/autoload.php';
-include 'tests/generated_code/math.php';
function p($line)
{
@@ -43,7 +44,7 @@ function p($line)
$host = 'localhost:50051';
p("Connecting to host: $host");
-$client = new math\MathClient($host, [
+$client = new Math\MathClient($host, [
'credentials' => Grpc\ChannelCredentials::createInsecure(),
]);
p('Client class: '.get_class($client));
@@ -52,7 +53,7 @@ p('');
p('Running unary call test:');
$dividend = 7;
$divisor = 4;
-$div_arg = new math\DivArgs();
+$div_arg = new Math\DivArgs();
$div_arg->setDividend($dividend);
$div_arg->setDivisor($divisor);
$call = $client->Div($div_arg);
@@ -65,7 +66,7 @@ p('');
p('Running server streaming test:');
$limit = 7;
-$fib_arg = new math\FibArgs();
+$fib_arg = new Math\FibArgs();
$fib_arg->setLimit($limit);
$call = $client->Fib($fib_arg);
$result_array = iterator_to_array($call->responses());
@@ -79,7 +80,7 @@ p('');
p('Running client streaming test:');
$call = $client->Sum();
for ($i = 0; $i <= $limit; ++$i) {
- $num = new math\Num();
+ $num = new Math\Num();
$num->setNum($i);
$call->write($num);
}
@@ -91,7 +92,7 @@ p('');
p('Running bidi-streaming test:');
$call = $client->DivMany();
for ($i = 0; $i < 7; ++$i) {
- $div_arg = new math\DivArgs();
+ $div_arg = new Math\DivArgs();
$dividend = 2 * $i + 1;
$divisor = 3;
$div_arg->setDividend($dividend);
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 68e40da4cf..6bca3ed1a5 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -180,6 +180,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/surface/server.c',
'src/core/lib/surface/validate_metadata.c',
'src/core/lib/surface/version.c',
+ 'src/core/lib/transport/bdp_estimator.c',
'src/core/lib/transport/byte_stream.c',
'src/core/lib/transport/connectivity_state.c',
'src/core/lib/transport/error_utils.c',