aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-17 22:59:53 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-17 22:59:53 -0700
commitb5980be9a08678212e5dbd6549b923f545d83539 (patch)
treee0318622c5a5576377537783608b60516b46c4cf /src/core/transport
parenta14215a67841ea7920260c655c01e4570595a3db (diff)
parentf87a0984ab727e95b068237f3bb0689d9685c8ea (diff)
Merge github.com:grpc/grpc into sometimes-its-good-just-to-check-in-with-each-other
Diffstat (limited to 'src/core/transport')
-rw-r--r--src/core/transport/chttp2/frame_window_update.c4
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c4
-rw-r--r--src/core/transport/chttp2/internal.h20
-rw-r--r--src/core/transport/chttp2/parsing.c9
-rw-r--r--src/core/transport/chttp2/stream_encoder.c3
-rw-r--r--src/core/transport/chttp2/stream_lists.c7
-rw-r--r--src/core/transport/chttp2/timeout_encoding.c14
-rw-r--r--src/core/transport/chttp2/writing.c49
-rw-r--r--src/core/transport/chttp2_transport.c30
-rw-r--r--src/core/transport/stream_op.c2
-rw-r--r--src/core/transport/transport.h4
-rw-r--r--src/core/transport/transport_op_string.c5
12 files changed, 106 insertions, 45 deletions
diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c
index b817df7745..d624298ad2 100644
--- a/src/core/transport/chttp2/frame_window_update.c
+++ b/src/core/transport/chttp2/frame_window_update.c
@@ -94,8 +94,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
}
GPR_ASSERT(is_last);
- if (transport_parsing->incoming_stream_id) {
- if (stream_parsing) {
+ if (transport_parsing->incoming_stream_id != 0) {
+ if (stream_parsing != NULL) {
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing,
stream_parsing, outgoing_window_update,
p->amount);
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index 77162a6864..974b864ffb 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -42,7 +42,7 @@
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer) {
- buffer->deadline = gpr_inf_future;
+ buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
void grpc_chttp2_incoming_metadata_buffer_destroy(
@@ -87,7 +87,7 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
b.list.tail = (void *)(gpr_intptr)buffer->count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = buffer->deadline;
- buffer->deadline = gpr_inf_future;
+ buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(sopb, b);
}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index bdd4b432eb..e5e6f445b7 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -353,7 +353,19 @@ typedef struct {
/** window available for us to send to peer */
gpr_int64 outgoing_window;
- /** window available for peer to send to us - updated after parse */
+ /** The number of bytes the upper layers have offered to receive.
+ As the upper layer offers more bytes, this value increases.
+ As bytes are read, this value decreases. */
+ gpr_uint32 max_recv_bytes;
+ /** The number of bytes the upper layer has offered to read but we have
+ not yet announced to HTTP2 flow control.
+ As the upper layers offer to read more bytes, this value increases.
+ As we advertise incoming flow control window, this value decreases. */
+ gpr_uint32 unannounced_incoming_window;
+ /** The number of bytes of HTTP2 flow control we have advertised.
+ As we advertise incoming flow control window, this value increases.
+ As bytes are read, this value decreases.
+ Updated after parse. */
gpr_uint32 incoming_window;
/** stream ops the transport user would like to send */
grpc_stream_op_buffer *outgoing_sopb;
@@ -391,6 +403,8 @@ typedef struct {
grpc_stream_op_buffer sopb;
/** how strongly should we indicate closure with the next write */
grpc_chttp2_send_closed send_closed;
+ /** how much window should we announce? */
+ gpr_uint32 announce_window;
} grpc_chttp2_stream_writing;
struct grpc_chttp2_stream_parsing {
@@ -501,7 +515,9 @@ void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global **stream_global);
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_remove_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 9597395aab..aa32f2e44a 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -173,7 +173,14 @@ void grpc_chttp2_publish_reads(
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"parsed", transport_parsing, stream_parsing, incoming_window_delta,
-(gpr_int64)stream_parsing->incoming_window_delta);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "parsed", transport_parsing, stream_global, max_recv_bytes,
+ -(gpr_int64)stream_parsing->incoming_window_delta);
stream_global->incoming_window -= stream_parsing->incoming_window_delta;
+ GPR_ASSERT(stream_global->max_recv_bytes >=
+ stream_parsing->incoming_window_delta);
+ stream_global->max_recv_bytes -=
+ stream_parsing->incoming_window_delta;
stream_parsing->incoming_window_delta = 0;
grpc_chttp2_list_add_writable_window_update_stream(transport_global,
stream_global);
@@ -594,7 +601,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
cached_timeout)) {
gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
grpc_mdstr_as_c_string(md->value));
- *cached_timeout = gpr_inf_future;
+ *cached_timeout = gpr_inf_future(GPR_CLOCK_REALTIME);
}
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index d553d80085..d7fc2da5d3 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -585,7 +585,8 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
l->md = hpack_enc(compressor, l->md, &st);
need_unref |= l->md != NULL;
}
- if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(op->data.metadata.deadline,
+ gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
deadline_enc(compressor, op->data.metadata.deadline, &st);
}
curop++;
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 4fea058c19..590f6abfbc 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -139,6 +139,7 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
+ GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
}
@@ -204,6 +205,7 @@ int grpc_chttp2_list_pop_written_stream(
void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
+ GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
@@ -211,11 +213,14 @@ void grpc_chttp2_list_add_writable_window_update_stream(
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global **stream_global) {
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
*stream_global = &stream->global;
+ *stream_writing = &stream->writing;
return r;
}
diff --git a/src/core/transport/chttp2/timeout_encoding.c b/src/core/transport/chttp2/timeout_encoding.c
index 33915c4039..1dd768ada4 100644
--- a/src/core/transport/chttp2/timeout_encoding.c
+++ b/src/core/transport/chttp2/timeout_encoding.c
@@ -147,7 +147,7 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) {
gpr_uint32 xp = x * 10 + *p - '0';
have_digit = 1;
if (xp < x) {
- *timeout = gpr_inf_future;
+ *timeout = gpr_inf_future(GPR_CLOCK_REALTIME);
return 1;
}
x = xp;
@@ -159,22 +159,22 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) {
/* decode unit specifier */
switch (*p) {
case 'n':
- *timeout = gpr_time_from_nanos(x);
+ *timeout = gpr_time_from_nanos(x, GPR_TIMESPAN);
break;
case 'u':
- *timeout = gpr_time_from_micros(x);
+ *timeout = gpr_time_from_micros(x, GPR_TIMESPAN);
break;
case 'm':
- *timeout = gpr_time_from_millis(x);
+ *timeout = gpr_time_from_millis(x, GPR_TIMESPAN);
break;
case 'S':
- *timeout = gpr_time_from_seconds(x);
+ *timeout = gpr_time_from_seconds(x, GPR_TIMESPAN);
break;
case 'M':
- *timeout = gpr_time_from_minutes(x);
+ *timeout = gpr_time_from_minutes(x, GPR_TIMESPAN);
break;
case 'H':
- *timeout = gpr_time_from_hours(x);
+ *timeout = gpr_time_from_hours(x, GPR_TIMESPAN);
break;
default:
return 0;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index a78654334e..d8ec117aa5 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -66,11 +66,9 @@ int grpc_chttp2_unlocking_check_writes(
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to
available window sizes) and add to the output buffer */
- while (transport_global->outgoing_window &&
- grpc_chttp2_list_pop_writable_stream(transport_global,
+ while (grpc_chttp2_list_pop_writable_stream(transport_global,
transport_writing, &stream_global,
- &stream_writing) &&
- stream_global->outgoing_window > 0) {
+ &stream_writing)) {
stream_writing->id = stream_global->id;
window_delta = grpc_chttp2_preencode(
stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops,
@@ -106,20 +104,21 @@ int grpc_chttp2_unlocking_check_writes(
/* for each grpc_chttp2_stream that wants to update its window, add that
* window here */
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global,
- &stream_global)) {
- window_delta =
- transport_global->settings[GRPC_LOCAL_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
- stream_global->incoming_window;
- if (!stream_global->read_closed && window_delta > 0) {
- gpr_slice_buffer_add(
- &transport_writing->outbuf,
- grpc_chttp2_window_update_create(stream_global->id, window_delta));
+ transport_writing,
+ &stream_global,
+ &stream_writing)) {
+ stream_writing->id = stream_global->id;
+ if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) {
+ stream_writing->announce_window = stream_global->unannounced_incoming_window;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- incoming_window, window_delta);
- stream_global->incoming_window += window_delta;
+ incoming_window, stream_global->unannounced_incoming_window);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
+ unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window);
+ stream_global->incoming_window += stream_global->unannounced_incoming_window;
+ stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
}
@@ -169,10 +168,19 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
- grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
- stream_writing->id, &transport_writing->hpack_compressor,
- &transport_writing->outbuf);
+ if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
+ grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
+ stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
+ stream_writing->id, &transport_writing->hpack_compressor,
+ &transport_writing->outbuf);
+ }
+ if (stream_writing->announce_window > 0) {
+ gpr_slice_buffer_add(
+ &transport_writing->outbuf,
+ grpc_chttp2_window_update_create(
+ stream_writing->id, stream_writing->announce_window));
+ stream_writing->announce_window = 0;
+ }
stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf,
@@ -197,7 +205,8 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- if (stream_global->outgoing_sopb->nops == 0) {
+ if (stream_global->outgoing_sopb != NULL &&
+ stream_global->outgoing_sopb->nops == 0) {
stream_global->outgoing_sopb = NULL;
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 1);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1ad5066f27..ac8a4665db 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -360,7 +360,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->parsing.incoming_window = s->global.incoming_window =
+ s->global.max_recv_bytes =
+ s->parsing.incoming_window =
+ s->global.incoming_window =
t->global.settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
@@ -564,6 +566,8 @@ static void maybe_start_some_streams(
stream_global->incoming_window =
transport_global->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ stream_global->max_recv_bytes =
+ GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes);
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
@@ -572,6 +576,9 @@ static void maybe_start_some_streams(
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_list_add_writable_window_update_stream(transport_global,
+ stream_global);
+
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -622,12 +629,23 @@ static void perform_stream_op_locked(
stream_global->publish_sopb = op->recv_ops;
stream_global->publish_sopb->nops = 0;
stream_global->publish_state = op->recv_state;
+ if (stream_global->max_recv_bytes < op->max_recv_bytes) {
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global,
+ max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "op", transport_global, stream_global, unannounced_incoming_window,
+ op->max_recv_bytes - stream_global->max_recv_bytes);
+ stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes;
+ stream_global->max_recv_bytes = op->max_recv_bytes;
+ }
grpc_chttp2_incoming_metadata_live_op_buffer_end(
&stream_global->outstanding_metadata);
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
- grpc_chttp2_list_add_writable_window_update_stream(transport_global,
- stream_global);
+ if (stream_global->id != 0) {
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
+ grpc_chttp2_list_add_writable_window_update_stream(transport_global,
+ stream_global);
+ }
}
if (op->bind_pollset) {
@@ -1056,7 +1074,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
identifier = gpr_strdup(context_scope);
}
gpr_log(GPR_INFO,
- "FLOWCTL: %s %-10s %8s %-23s %8lld %c %8lld = %8lld %-10s [%s:%d]",
+ "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]",
is_client ? "client" : "server", identifier, context_thread, var,
current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta,
current_value + delta, reason, file, line);
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index fdb50c6b71..71061fe0c7 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -205,7 +205,7 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) {
void grpc_metadata_batch_init(grpc_metadata_batch *batch) {
batch->list.head = batch->list.tail = batch->garbage.head = batch->garbage.tail =
NULL;
- batch->deadline = gpr_inf_future;
+ batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
void grpc_metadata_batch_destroy(grpc_metadata_batch *batch) {
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 5af81232ac..aac42303a9 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -72,6 +72,10 @@ typedef struct grpc_transport_stream_op {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
+ /** The number of bytes this peer is currently prepared to receive.
+ These bytes will be eventually used to replenish per-stream flow control
+ windows. */
+ gpr_uint32 max_recv_bytes;
grpc_iomgr_closure *on_done_recv;
grpc_pollset *bind_pollset;
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 0da396a320..9d127c5472 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -61,7 +61,7 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", "));
put_metadata(b, m->md);
}
- if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(md.deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
char *tmp;
gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
md.deadline.tv_nsec);
@@ -128,7 +128,8 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
if (op->recv_ops) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_strvec_add(&b, gpr_strdup("RECV"));
+ gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes);
+ gpr_strvec_add(&b, tmp);
}
if (op->bind_pollset) {