diff options
author | Vijay Pai <vpai@google.com> | 2015-07-24 10:51:27 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-07-24 10:51:27 -0700 |
commit | f0c66c6365212ec259ad3da6f1127bbc8790136b (patch) | |
tree | 9078e79ebe764d04d71149ecc829cc0e9e5f8d48 /src/core/transport | |
parent | 24b3b7e3d4d0c65cbd729bd29c462765d5a74e34 (diff) | |
parent | 6aff7aa335c8d33c091ffbe4b99d3f339897de03 (diff) |
Merge branch 'master' into deepend
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/chttp2/frame_data.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 22 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 5 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 68 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 94 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 33 | ||||
-rw-r--r-- | src/core/transport/transport.c | 4 | ||||
-rw-r--r-- | src/core/transport/transport.h | 3 | ||||
-rw-r--r-- | src/core/transport/transport_impl.h | 3 | ||||
-rw-r--r-- | src/core/transport/transport_op_string.c | 10 |
10 files changed, 141 insertions, 103 deletions
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 7a4c355f23..40bf2ebd79 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -92,7 +92,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->frame_type = *cur; switch (p->frame_type) { case 0: - /* noop */ + p->is_frame_compressed = 0; /* GPR_FALSE */ break; case 1: p->is_frame_compressed = 1; /* GPR_TRUE */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index e5e6f445b7..f0eeb6de50 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -60,7 +60,6 @@ typedef enum { GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_WRITTEN, - GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE, GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING, @@ -286,6 +285,7 @@ struct grpc_chttp2_transport { grpc_endpoint *ep; grpc_mdctx *metadata_context; gpr_refcount refs; + char *peer_string; gpr_mu mu; @@ -382,6 +382,8 @@ typedef struct { gpr_uint8 published_cancelled; /** is this stream in the stream map? (boolean) */ gpr_uint8 in_stream_map; + /** is this stream actively being written? */ + gpr_uint8 writing_now; /** stream state already published to the upper layer */ grpc_stream_state published_state; @@ -474,11 +476,17 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_list_add_first_writable_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); +void grpc_chttp2_list_remove_writable_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); void grpc_chttp2_list_add_incoming_window_updated( grpc_chttp2_transport_global *transport_global, @@ -510,18 +518,6 @@ int grpc_chttp2_list_pop_written_stream( grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); -void grpc_chttp2_list_add_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_writing **stream_writing); -void grpc_chttp2_list_remove_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); - void grpc_chttp2_list_add_parsing_seen_stream( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 904b9afce7..d84960009b 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -182,8 +182,7 @@ void grpc_chttp2_publish_reads( stream_global->max_recv_bytes -= stream_parsing->incoming_window_delta; stream_parsing->incoming_window_delta = 0; - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } /* update outgoing flow control window */ @@ -607,7 +606,7 @@ static void on_header(void *tp, grpc_mdelem *md) { } grpc_chttp2_incoming_metadata_buffer_set_deadline( &stream_parsing->incoming_metadata, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), *cached_timeout)); + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout)); GRPC_MDELEM_UNREF(md); } else { grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 590f6abfbc..9e68c1e146 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -108,6 +108,23 @@ static void stream_list_maybe_remove(grpc_chttp2_transport *t, } } +static void stream_list_add_head(grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + grpc_chttp2_stream_list_id id) { + grpc_chttp2_stream *old_head; + GPR_ASSERT(!s->included[id]); + old_head = t->lists[id].head; + s->links[id].next = old_head; + s->links[id].prev = NULL; + if (old_head) { + old_head->links[id].prev = s; + } else { + t->lists[id].tail = s; + } + t->lists[id].head = s; + s->included[id] = 1; +} + static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) { @@ -119,7 +136,6 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, if (old_tail) { old_tail->links[id].next = s; } else { - s->links[id].prev = NULL; t->lists[id].head = s; } t->lists[id].tail = s; @@ -144,6 +160,18 @@ void grpc_chttp2_list_add_writable_stream( STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); } +void grpc_chttp2_list_add_first_writable_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); + gpr_log(GPR_DEBUG, "add:%d:%d:%d:%d", stream_global->id, + stream_global->write_state, stream_global->in_stream_map, + stream_global->read_closed); + stream_list_add_head(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); +} + int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, @@ -157,6 +185,14 @@ int grpc_chttp2_list_pop_writable_stream( return r; } +void grpc_chttp2_list_remove_writable_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); +} + void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { @@ -202,36 +238,6 @@ int grpc_chttp2_list_pop_written_stream( return r; } -void grpc_chttp2_list_add_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - GPR_ASSERT(stream_global->id != 0); - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); -} - -int grpc_chttp2_list_pop_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_writing **stream_writing) { - grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); - *stream_global = &stream->global; - *stream_writing = &stream->writing; - return r; -} - -void grpc_chttp2_list_remove_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); -} - void grpc_chttp2_list_add_parsing_seen_stream( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index d8ec117aa5..d39b0c42f7 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -44,6 +44,7 @@ int grpc_chttp2_unlocking_check_writes( grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_writing *stream_writing; + grpc_chttp2_stream_global *first_reinserted_stream = NULL; gpr_uint32 window_delta; /* simple writes are queued to qbuf, and flushed here */ @@ -64,50 +65,54 @@ int grpc_chttp2_unlocking_check_writes( } /* for each grpc_chttp2_stream that's become writable, frame it's data - (according to - available window sizes) and add to the output buffer */ - while (grpc_chttp2_list_pop_writable_stream(transport_global, - transport_writing, &stream_global, - &stream_writing)) { + (according to available window sizes) and add to the output buffer */ + while (grpc_chttp2_list_pop_writable_stream( + transport_global, transport_writing, &stream_global, &stream_writing)) { + if (stream_global == first_reinserted_stream) { + /* prevent infinite loop */ + grpc_chttp2_list_add_first_writable_stream(transport_global, + stream_global); + break; + } + stream_writing->id = stream_global->id; - window_delta = grpc_chttp2_preencode( - stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, - GPR_MIN(transport_global->outgoing_window, - stream_global->outgoing_window), - &stream_writing->sopb); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "write", transport_global, outgoing_window, -(gpr_int64)window_delta); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - outgoing_window, -(gpr_int64)window_delta); - transport_global->outgoing_window -= window_delta; - stream_global->outgoing_window -= window_delta; - - if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && - stream_global->outgoing_sopb->nops == 0) { - if (!transport_global->is_client && !stream_global->read_closed) { - stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM; - } else { - stream_writing->send_closed = GRPC_SEND_CLOSED; + stream_writing->send_closed = GRPC_DONT_SEND_CLOSED; + GPR_ASSERT(!stream_global->writing_now); + + if (stream_global->outgoing_sopb) { + window_delta = + grpc_chttp2_preencode(stream_global->outgoing_sopb->ops, + &stream_global->outgoing_sopb->nops, + GPR_MIN(transport_global->outgoing_window, + stream_global->outgoing_window), + &stream_writing->sopb); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "write", transport_global, outgoing_window, -(gpr_int64)window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, + outgoing_window, + -(gpr_int64)window_delta); + transport_global->outgoing_window -= window_delta; + stream_global->outgoing_window -= window_delta; + + if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && + stream_global->outgoing_sopb->nops == 0) { + if (!transport_global->is_client && !stream_global->read_closed) { + stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM; + } else { + stream_writing->send_closed = GRPC_SEND_CLOSED; + } } - } - if (stream_writing->sopb.nops > 0 || - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); - } - if (stream_global->outgoing_window > 0 && - stream_global->outgoing_sopb->nops != 0) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + if (stream_global->outgoing_window > 0 && + stream_global->outgoing_sopb->nops != 0) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + if (first_reinserted_stream == NULL && + transport_global->outgoing_window == 0) { + first_reinserted_stream = stream_global; + } + } } - } - /* for each grpc_chttp2_stream that wants to update its window, add that - * window here */ - while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, - transport_writing, - &stream_global, - &stream_writing)) { - stream_writing->id = stream_global->id; if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { stream_writing->announce_window = stream_global->unannounced_incoming_window; GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, @@ -118,6 +123,11 @@ int grpc_chttp2_unlocking_check_writes( stream_global->unannounced_incoming_window = 0; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); + stream_global->writing_now = 1; + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); + } else if (stream_writing->sopb.nops > 0 || + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + stream_global->writing_now = 1; grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } } @@ -205,6 +215,8 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { + GPR_ASSERT(stream_global->writing_now); + stream_global->writing_now = 0; if (stream_global->outgoing_sopb != NULL && stream_global->outgoing_sopb->nops == 0) { stream_global->outgoing_sopb = NULL; @@ -216,9 +228,9 @@ void grpc_chttp2_cleanup_writing( if (!transport_global->is_client) { stream_global->read_closed = 1; } - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); } + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); } transport_writing->outbuf.count = 0; transport_writing->outbuf.length = 0; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c923d5e42f..5f49b2ddd6 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -168,6 +168,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_mdctx_unref(t->metadata_context); + gpr_free(t->peer_string); gpr_free(t); } @@ -217,6 +218,7 @@ static void init_transport(grpc_chttp2_transport *t, gpr_ref_init(&t->refs, 2); gpr_mu_init(&t->mu); grpc_mdctx_ref(mdctx); + t->peer_string = grpc_endpoint_get_peer(ep); t->metadata_context = mdctx; t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; @@ -393,12 +395,16 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { } grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global); - grpc_chttp2_list_remove_writable_window_update_stream(&t->global, &s->global); + grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); gpr_mu_unlock(&t->mu); for (i = 0; i < STREAM_LIST_COUNT; i++) { - GPR_ASSERT(!s->included[i]); + if (s->included[i]) { + gpr_log(GPR_ERROR, "%s stream %d still included in list %d", + t->global.is_client ? "client" : "server", s->global.id, i); + abort(); + } } GPR_ASSERT(s->global.outgoing_sopb == NULL); @@ -574,8 +580,6 @@ static void maybe_start_some_streams( grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); } /* cancel out streams that will never be started */ @@ -641,8 +645,7 @@ static void perform_stream_op_locked( if (stream_global->id != 0) { grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } @@ -750,6 +753,7 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { if (!s) { s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id); } + grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); GPR_ASSERT(s); s->global.in_stream_map = 0; if (t->parsing.incoming_stream == &s->parsing) { @@ -831,6 +835,9 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { if (!stream_global->publish_sopb) { continue; } + if (stream_global->writing_now) { + continue; + } /* FIXME(ctiller): we include in_stream_map in our computation of whether the stream is write-closed. This is completely bogus, but has the effect of delaying stream-closed until the stream @@ -1069,9 +1076,17 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, * INTEGRATION GLUE */ -static const grpc_transport_vtable vtable = { - sizeof(grpc_chttp2_stream), init_stream, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport}; +static char *chttp2_get_peer(grpc_transport *t) { + return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); +} + +static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), + init_stream, + perform_stream_op, + perform_transport_op, + destroy_stream, + destroy_transport, + chttp2_get_peer}; grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 2689e3028a..69c00b6a4f 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -65,6 +65,10 @@ void grpc_transport_destroy_stream(grpc_transport *transport, transport->vtable->destroy_stream(transport, stream); } +char *grpc_transport_get_peer(grpc_transport *transport) { + return transport->vtable->get_peer(transport); +} + void grpc_transport_stream_op_finish_with_failure( grpc_transport_stream_op *op) { if (op->send_ops) { diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 64503604ee..553779602a 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -182,4 +182,7 @@ void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ void grpc_transport_destroy(grpc_transport *transport); +/* Get the transports peer */ +char *grpc_transport_get_peer(grpc_transport *transport); + #endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 515721dfb6..d3bbdf6c27 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -58,6 +58,9 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_transport *self); + + /* implementation of grpc_transport_get_peer */ + char *(*get_peer)(grpc_transport *self); } grpc_transport_vtable; /* an instance of a grpc transport */ diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 10d796fc15..f62c340e97 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -116,10 +116,9 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { if (op->send_ops) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_strvec_add(&b, gpr_strdup("SEND")); - if (op->is_last_send) { - gpr_strvec_add(&b, gpr_strdup("_LAST")); - } + gpr_asprintf(&tmp, "SEND%s:%p", op->is_last_send ? "_LAST" : "", + op->on_done_send); + gpr_strvec_add(&b, tmp); gpr_strvec_add(&b, gpr_strdup("[")); gpr_strvec_add(&b, grpc_sopb_string(op->send_ops)); gpr_strvec_add(&b, gpr_strdup("]")); @@ -128,7 +127,8 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { if (op->recv_ops) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes); + gpr_asprintf(&tmp, "RECV:%p:max_recv_bytes=%d", op->on_done_recv, + op->max_recv_bytes); gpr_strvec_add(&b, tmp); } |