diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-25 11:20:01 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-25 11:20:01 -0700 |
commit | ab6307356a3558ff93f990d333100e9bb218dcdf (patch) | |
tree | c3e17fea47275e4d441fc918518c98b42bb450b9 /src/core | |
parent | 48f0a13f3872876787f4d7588b396db914319b1b (diff) |
Addressing comments
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 149 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 108 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 7 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 28 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 71 |
5 files changed, 170 insertions, 193 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 3d1cd56e61..02c94744ee 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -72,56 +72,56 @@ typedef enum { /* deframer state for the overall http2 stream of bytes */ typedef enum { /* prefix: one entry per http2 connection prefix byte */ - DTS_CLIENT_PREFIX_0 = 0, - DTS_CLIENT_PREFIX_1, - DTS_CLIENT_PREFIX_2, - DTS_CLIENT_PREFIX_3, - DTS_CLIENT_PREFIX_4, - DTS_CLIENT_PREFIX_5, - DTS_CLIENT_PREFIX_6, - DTS_CLIENT_PREFIX_7, - DTS_CLIENT_PREFIX_8, - DTS_CLIENT_PREFIX_9, - DTS_CLIENT_PREFIX_10, - DTS_CLIENT_PREFIX_11, - DTS_CLIENT_PREFIX_12, - DTS_CLIENT_PREFIX_13, - DTS_CLIENT_PREFIX_14, - DTS_CLIENT_PREFIX_15, - DTS_CLIENT_PREFIX_16, - DTS_CLIENT_PREFIX_17, - DTS_CLIENT_PREFIX_18, - DTS_CLIENT_PREFIX_19, - DTS_CLIENT_PREFIX_20, - DTS_CLIENT_PREFIX_21, - DTS_CLIENT_PREFIX_22, - DTS_CLIENT_PREFIX_23, + GRPC_DTS_CLIENT_PREFIX_0 = 0, + GRPC_DTS_CLIENT_PREFIX_1, + GRPC_DTS_CLIENT_PREFIX_2, + GRPC_DTS_CLIENT_PREFIX_3, + GRPC_DTS_CLIENT_PREFIX_4, + GRPC_DTS_CLIENT_PREFIX_5, + GRPC_DTS_CLIENT_PREFIX_6, + GRPC_DTS_CLIENT_PREFIX_7, + GRPC_DTS_CLIENT_PREFIX_8, + GRPC_DTS_CLIENT_PREFIX_9, + GRPC_DTS_CLIENT_PREFIX_10, + GRPC_DTS_CLIENT_PREFIX_11, + GRPC_DTS_CLIENT_PREFIX_12, + GRPC_DTS_CLIENT_PREFIX_13, + GRPC_DTS_CLIENT_PREFIX_14, + GRPC_DTS_CLIENT_PREFIX_15, + GRPC_DTS_CLIENT_PREFIX_16, + GRPC_DTS_CLIENT_PREFIX_17, + GRPC_DTS_CLIENT_PREFIX_18, + GRPC_DTS_CLIENT_PREFIX_19, + GRPC_DTS_CLIENT_PREFIX_20, + GRPC_DTS_CLIENT_PREFIX_21, + GRPC_DTS_CLIENT_PREFIX_22, + GRPC_DTS_CLIENT_PREFIX_23, /* frame header byte 0... */ /* must follow from the prefix states */ - DTS_FH_0, - DTS_FH_1, - DTS_FH_2, - DTS_FH_3, - DTS_FH_4, - DTS_FH_5, - DTS_FH_6, - DTS_FH_7, + GRPC_DTS_FH_0, + GRPC_DTS_FH_1, + GRPC_DTS_FH_2, + GRPC_DTS_FH_3, + GRPC_DTS_FH_4, + GRPC_DTS_FH_5, + GRPC_DTS_FH_6, + GRPC_DTS_FH_7, /* ... frame header byte 8 */ - DTS_FH_8, + GRPC_DTS_FH_8, /* inside a http2 frame */ - DTS_FRAME + GRPC_DTS_FRAME } grpc_chttp2_deframe_transport_state; typedef enum { - WRITE_STATE_OPEN, - WRITE_STATE_QUEUED_CLOSE, - WRITE_STATE_SENT_CLOSE + GRPC_WRITE_STATE_OPEN, + GRPC_WRITE_STATE_QUEUED_CLOSE, + GRPC_WRITE_STATE_SENT_CLOSE } grpc_chttp2_write_state; typedef enum { - DONT_SEND_CLOSED = 0, - SEND_CLOSED, - SEND_CLOSED_WITH_RST_STREAM + GRPC_DONT_SEND_CLOSED = 0, + GRPC_SEND_CLOSED, + GRPC_SEND_CLOSED_WITH_RST_STREAM } grpc_chttp2_send_closed; typedef struct { @@ -143,14 +143,14 @@ typedef enum { /* We keep several sets of connection wide parameters */ typedef enum { /* The settings our peer has asked for (and we have acked) */ - PEER_SETTINGS = 0, + GRPC_PEER_SETTINGS = 0, /* The settings we'd like to have */ - LOCAL_SETTINGS, + GRPC_LOCAL_SETTINGS, /* The settings we've published to our peer */ - SENT_SETTINGS, + GRPC_SENT_SETTINGS, /* The settings the peer has acked */ - ACKED_SETTINGS, - NUM_SETTING_SETS + GRPC_ACKED_SETTINGS, + GRPC_NUM_SETTING_SETS } grpc_chttp2_setting_set; /* Outstanding ping request data */ @@ -183,7 +183,7 @@ typedef struct { /** bitmask of setting indexes to send out */ gpr_uint32 force_send_settings; /** settings values */ - gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; /** has there been a connection level error, and have we notified anyone about it? */ @@ -352,34 +352,6 @@ struct grpc_chttp2_transport { /** closure for notifying transport closure */ grpc_iomgr_closure notify_closed; } channel_callback; - -#if 0 - /* basic state management - what are we doing at the moment? */ - gpr_uint8 reading; - /** are we calling back any grpc_transport_op completion events */ - gpr_uint8 calling_back_ops; - gpr_uint8 destroying; - gpr_uint8 closed; - - /* stream indexing */ - gpr_uint32 next_stream_id; - - /* window management */ - gpr_uint32 outgoing_window_update; - - /* state for a stream that's not yet been created */ - grpc_stream_op_buffer new_stream_sopb; - - /* stream ops that need to be destroyed, but outside of the lock */ - grpc_stream_op_buffer nuke_later_sopb; - - /* pings */ - gpr_int64 ping_counter; - - - grpc_chttp2_stream **accepting_stream; - -#endif }; typedef struct { @@ -451,14 +423,6 @@ struct grpc_chttp2_stream_parsing { /** incoming metadata */ grpc_chttp2_incoming_metadata_buffer incoming_metadata; - - /* - grpc_linked_mdelem *incoming_metadata; - size_t incoming_metadata_count; - size_t incoming_metadata_capacity; - grpc_linked_mdelem *old_incoming_metadata; - gpr_timespec incoming_deadline; - */ }; struct grpc_chttp2_stream { @@ -468,14 +432,6 @@ struct grpc_chttp2_stream { grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; - -#if 0 - gpr_uint32 outgoing_window_update; - gpr_uint8 cancelled; - - grpc_stream_state callback_state; - grpc_stream_op_buffer callback_sopb; -#endif }; /** Transport writing call flow: @@ -502,14 +458,15 @@ void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); -/** Process one slice of incoming data */ +/** Process one slice of incoming data; return 1 if the connection is still + viable after reading, or 0 if the connection should be torn down */ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); /** Get a writable stream - \return non-zero if there was a stream available */ + returns non-zero if there was a stream available */ void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); @@ -622,10 +579,10 @@ void grpc_chttp2_parsing_become_skip_parser( extern int grpc_http_trace; extern int grpc_flowctl_trace; -#define IF_TRACING(stmt) \ - if (!(grpc_http_trace)) \ - ; \ - else \ +#define GRPC_CHTTP2_IF_TRACING(stmt) \ + if (!(grpc_http_trace)) \ + ; \ + else \ stmt #define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \ diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 42f47af65c..8f682e9017 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -117,15 +117,15 @@ void grpc_chttp2_publish_reads( /* update global settings */ if (transport_parsing->settings_updated) { - memcpy(transport_global->settings[PEER_SETTINGS], + memcpy(transport_global->settings[GRPC_PEER_SETTINGS], transport_parsing->settings, sizeof(transport_parsing->settings)); transport_parsing->settings_updated = 0; } /* update settings based on ack if received */ if (transport_parsing->settings_ack_received) { - memcpy(transport_global->settings[ACKED_SETTINGS], - transport_global->settings[SENT_SETTINGS], + memcpy(transport_global->settings[GRPC_ACKED_SETTINGS], + transport_global->settings[GRPC_SENT_SETTINGS], GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); transport_parsing->settings_ack_received = 0; } @@ -238,34 +238,34 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, if (cur == end) return 1; switch (transport_parsing->deframe_state) { - case DTS_CLIENT_PREFIX_0: - case DTS_CLIENT_PREFIX_1: - case DTS_CLIENT_PREFIX_2: - case DTS_CLIENT_PREFIX_3: - case DTS_CLIENT_PREFIX_4: - case DTS_CLIENT_PREFIX_5: - case DTS_CLIENT_PREFIX_6: - case DTS_CLIENT_PREFIX_7: - case DTS_CLIENT_PREFIX_8: - case DTS_CLIENT_PREFIX_9: - case DTS_CLIENT_PREFIX_10: - case DTS_CLIENT_PREFIX_11: - case DTS_CLIENT_PREFIX_12: - case DTS_CLIENT_PREFIX_13: - case DTS_CLIENT_PREFIX_14: - case DTS_CLIENT_PREFIX_15: - case DTS_CLIENT_PREFIX_16: - case DTS_CLIENT_PREFIX_17: - case DTS_CLIENT_PREFIX_18: - case DTS_CLIENT_PREFIX_19: - case DTS_CLIENT_PREFIX_20: - case DTS_CLIENT_PREFIX_21: - case DTS_CLIENT_PREFIX_22: - case DTS_CLIENT_PREFIX_23: - while (cur != end && transport_parsing->deframe_state != DTS_FH_0) { + case GRPC_DTS_CLIENT_PREFIX_0: + case GRPC_DTS_CLIENT_PREFIX_1: + case GRPC_DTS_CLIENT_PREFIX_2: + case GRPC_DTS_CLIENT_PREFIX_3: + case GRPC_DTS_CLIENT_PREFIX_4: + case GRPC_DTS_CLIENT_PREFIX_5: + case GRPC_DTS_CLIENT_PREFIX_6: + case GRPC_DTS_CLIENT_PREFIX_7: + case GRPC_DTS_CLIENT_PREFIX_8: + case GRPC_DTS_CLIENT_PREFIX_9: + case GRPC_DTS_CLIENT_PREFIX_10: + case GRPC_DTS_CLIENT_PREFIX_11: + case GRPC_DTS_CLIENT_PREFIX_12: + case GRPC_DTS_CLIENT_PREFIX_13: + case GRPC_DTS_CLIENT_PREFIX_14: + case GRPC_DTS_CLIENT_PREFIX_15: + case GRPC_DTS_CLIENT_PREFIX_16: + case GRPC_DTS_CLIENT_PREFIX_17: + case GRPC_DTS_CLIENT_PREFIX_18: + case GRPC_DTS_CLIENT_PREFIX_19: + case GRPC_DTS_CLIENT_PREFIX_20: + case GRPC_DTS_CLIENT_PREFIX_21: + case GRPC_DTS_CLIENT_PREFIX_22: + case GRPC_DTS_CLIENT_PREFIX_23: + while (cur != end && transport_parsing->deframe_state != GRPC_DTS_FH_0) { if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing ->deframe_state]) { - gpr_log(GPR_ERROR, + gpr_log(GPR_INFO, "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "at byte %d", GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing @@ -283,74 +283,74 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, } /* fallthrough */ dts_fh_0: - case DTS_FH_0: + case GRPC_DTS_FH_0: GPR_ASSERT(cur < end); transport_parsing->incoming_frame_size = ((gpr_uint32)*cur) << 16; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_1; + transport_parsing->deframe_state = GRPC_DTS_FH_1; return 1; } /* fallthrough */ - case DTS_FH_1: + case GRPC_DTS_FH_1: GPR_ASSERT(cur < end); transport_parsing->incoming_frame_size |= ((gpr_uint32)*cur) << 8; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_2; + transport_parsing->deframe_state = GRPC_DTS_FH_2; return 1; } /* fallthrough */ - case DTS_FH_2: + case GRPC_DTS_FH_2: GPR_ASSERT(cur < end); transport_parsing->incoming_frame_size |= *cur; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_3; + transport_parsing->deframe_state = GRPC_DTS_FH_3; return 1; } /* fallthrough */ - case DTS_FH_3: + case GRPC_DTS_FH_3: GPR_ASSERT(cur < end); transport_parsing->incoming_frame_type = *cur; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_4; + transport_parsing->deframe_state = GRPC_DTS_FH_4; return 1; } /* fallthrough */ - case DTS_FH_4: + case GRPC_DTS_FH_4: GPR_ASSERT(cur < end); transport_parsing->incoming_frame_flags = *cur; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_5; + transport_parsing->deframe_state = GRPC_DTS_FH_5; return 1; } /* fallthrough */ - case DTS_FH_5: + case GRPC_DTS_FH_5: GPR_ASSERT(cur < end); transport_parsing->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_6; + transport_parsing->deframe_state = GRPC_DTS_FH_6; return 1; } /* fallthrough */ - case DTS_FH_6: + case GRPC_DTS_FH_6: GPR_ASSERT(cur < end); transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 16; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_7; + transport_parsing->deframe_state = GRPC_DTS_FH_7; return 1; } /* fallthrough */ - case DTS_FH_7: + case GRPC_DTS_FH_7: GPR_ASSERT(cur < end); transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 8; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_8; + transport_parsing->deframe_state = GRPC_DTS_FH_8; return 1; } /* fallthrough */ - case DTS_FH_8: + case GRPC_DTS_FH_8: GPR_ASSERT(cur < end); transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur); - transport_parsing->deframe_state = DTS_FRAME; + transport_parsing->deframe_state = GRPC_DTS_FRAME; if (!init_frame_parser(transport_parsing)) { return 0; } @@ -364,7 +364,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, } transport_parsing->incoming_stream = NULL; if (++cur == end) { - transport_parsing->deframe_state = DTS_FH_0; + transport_parsing->deframe_state = GRPC_DTS_FH_0; return 1; } goto dts_fh_0; /* loop */ @@ -373,7 +373,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, return 1; } /* fallthrough */ - case DTS_FRAME: + case GRPC_DTS_FRAME: GPR_ASSERT(cur < end); if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) { if (!parse_frame_slice( @@ -381,7 +381,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { return 0; } - transport_parsing->deframe_state = DTS_FH_0; + transport_parsing->deframe_state = GRPC_DTS_FH_0; transport_parsing->incoming_stream = NULL; return 1; } else if ((gpr_uint32)(end - cur) > @@ -582,10 +582,10 @@ static void on_header(void *tp, grpc_mdelem *md) { GPR_ASSERT(stream_parsing); - IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, - transport_parsing->is_client ? "CLI" : "SVR", - grpc_mdstr_as_c_string(md->key), - grpc_mdstr_as_c_string(md->value))); + GRPC_CHTTP2_IF_TRACING(gpr_log( + GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, + transport_parsing->is_client ? "CLI" : "SVR", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); if (md->key == transport_parsing->str_grpc_timeout) { gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 706612e4f0..c6ba12fca8 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -344,10 +344,9 @@ void grpc_chttp2_for_all_streams( void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global)) { grpc_chttp2_stream *s; - for (s = TRANSPORT_FROM_GLOBAL(transport_global) - ->lists[GRPC_CHTTP2_LIST_ALL_STREAMS] - .head; - s; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) { + grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); + for (s = t->lists[GRPC_CHTTP2_LIST_ALL_STREAMS].head; s != NULL; + s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) { cb(transport_global, user_data, &s->global); } } diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index a8e87c3fe3..fdcc300099 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -54,10 +54,10 @@ int grpc_chttp2_unlocking_check_writes( !transport_global->sent_local_settings) { gpr_slice_buffer_add( &transport_writing->outbuf, - grpc_chttp2_settings_create(transport_global->settings[SENT_SETTINGS], - transport_global->settings[LOCAL_SETTINGS], - transport_global->force_send_settings, - GRPC_CHTTP2_NUM_SETTINGS)); + grpc_chttp2_settings_create( + transport_global->settings[GRPC_SENT_SETTINGS], + transport_global->settings[GRPC_LOCAL_SETTINGS], + transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); transport_global->force_send_settings = 0; transport_global->dirtied_local_settings = 0; transport_global->sent_local_settings = 1; @@ -84,16 +84,16 @@ int grpc_chttp2_unlocking_check_writes( transport_global->outgoing_window -= window_delta; stream_global->outgoing_window -= window_delta; - if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE && + if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && stream_global->outgoing_sopb->nops == 0) { if (!transport_global->is_client && !stream_global->read_closed) { - stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM; + stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM; } else { - stream_writing->send_closed = SEND_CLOSED; + stream_writing->send_closed = GRPC_SEND_CLOSED; } } if (stream_writing->sopb.nops > 0 || - stream_writing->send_closed != DONT_SEND_CLOSED) { + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } @@ -112,7 +112,7 @@ int grpc_chttp2_unlocking_check_writes( while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, &stream_global)) { window_delta = - transport_global->settings[LOCAL_SETTINGS] + transport_global->settings[GRPC_LOCAL_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - stream_global->incoming_window; if (!stream_global->read_closed && window_delta > 0) { @@ -128,7 +128,7 @@ int grpc_chttp2_unlocking_check_writes( } /* if the grpc_chttp2_transport is ready to send a window update, do so here - * also */ + also; 3/4 is a magic number that will likely get tuned soon */ if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4) { window_delta = transport_global->connection_window_target - @@ -174,11 +174,11 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, - stream_writing->send_closed != DONT_SEND_CLOSED, + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, stream_writing->id, &transport_writing->hpack_compressor, &transport_writing->outbuf); stream_writing->sopb.nops = 0; - if (stream_writing->send_closed == SEND_CLOSED_WITH_RST_STREAM) { + if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create(stream_writing->id, GRPC_CHTTP2_NO_ERROR)); @@ -201,8 +201,8 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - if (stream_writing->send_closed != DONT_SEND_CLOSED) { - stream_global->write_state = WRITE_STATE_SENT_CLOSE; + if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; if (!transport_global->is_client) { stream_global->read_closed = 1; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index e96f6e0cdc..94659a6bdf 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -230,7 +230,8 @@ static void init_transport(grpc_chttp2_transport *t, t->parsing.is_client = is_client; t->parsing.str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); - t->parsing.deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; + t->parsing.deframe_state = + is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; gpr_slice_buffer_init(&t->global.qbuf); @@ -261,7 +262,7 @@ static void init_transport(grpc_chttp2_transport *t, /* copy in initial settings to all setting sets */ for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value; - for (j = 0; j < NUM_SETTING_SETS; j++) { + for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { t->global.settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; } @@ -388,11 +389,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, GPR_ASSERT(t->parsing_active); s->global.id = (gpr_uint32)(gpr_uintptr)server_data; s->global.outgoing_window = - t->global - .settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + t->global.settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->parsing.incoming_window = s->global.incoming_window = - t->global - .settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + t->global.settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); s->global.in_stream_map = 1; @@ -509,8 +510,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name, value, use_value); } - if (use_value != t->global.settings[LOCAL_SETTINGS][id]) { - t->global.settings[LOCAL_SETTINGS][id] = use_value; + if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) { + t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value; t->global.dirtied_local_settings = 1; } } @@ -569,14 +570,15 @@ static void maybe_start_some_streams( * concurrency */ while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID && transport_global->concurrent_stream_count < - transport_global->settings - [PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && + transport_global + ->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { - IF_TRACING(gpr_log(GPR_DEBUG, - "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", - transport_global->is_client ? "CLI" : "SVR", - stream_global, transport_global->next_stream_id)); + GRPC_CHTTP2_IF_TRACING(gpr_log( + GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", + transport_global->is_client ? "CLI" : "SVR", stream_global, + transport_global->next_stream_id)); GPR_ASSERT(stream_global->id == 0); stream_global->id = transport_global->next_stream_id; @@ -589,11 +591,11 @@ static void maybe_start_some_streams( } stream_global->outgoing_window = - transport_global - ->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + transport_global->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; stream_global->incoming_window = - transport_global - ->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + transport_global->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; grpc_chttp2_stream_map_add( &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global)); @@ -623,11 +625,12 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, stream_global->send_done_closure = op->on_done_send; if (!stream_global->cancelled) { stream_global->outgoing_sopb = op->send_ops; - if (op->is_last_send && stream_global->write_state == WRITE_STATE_OPEN) { - stream_global->write_state = WRITE_STATE_QUEUED_CLOSE; + if (op->is_last_send && + stream_global->write_state == GRPC_WRITE_STATE_OPEN) { + stream_global->write_state = GRPC_WRITE_STATE_QUEUED_CLOSE; } if (stream_global->id == 0) { - IF_TRACING(gpr_log( + GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_DEBUG, "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", transport_global->is_client ? "CLI" : "SVR", stream_global)); @@ -747,7 +750,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, &stream_global)) { GPR_ASSERT(stream_global->in_stream_map); - GPR_ASSERT(stream_global->write_state != WRITE_STATE_OPEN); + GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN); GPR_ASSERT(stream_global->read_closed); remove_stream(t, stream_global->id); grpc_chttp2_list_add_read_write_state_changed(transport_global, @@ -758,7 +761,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, &stream_global)) { if (stream_global->cancelled) { - stream_global->write_state = WRITE_STATE_SENT_CLOSE; + stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; stream_global->read_closed = 1; if (!stream_global->published_cancelled) { char buffer[GPR_LTOA_MIN_BUFSIZE]; @@ -771,7 +774,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { stream_global->published_cancelled = 1; } } - if (stream_global->write_state == WRITE_STATE_SENT_CLOSE && + if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE && stream_global->read_closed && stream_global->in_stream_map) { if (t->parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, @@ -790,7 +793,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { To fix this will require having an edge after stream-closed indicating that the stream is closed AND safe to delete. */ state = compute_state( - stream_global->write_state == WRITE_STATE_SENT_CLOSE && !stream_global->in_stream_map, + stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE && + !stream_global->in_stream_map, stream_global->read_closed); if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) { @@ -842,6 +846,19 @@ static void drop_connection(grpc_chttp2_transport *t) { end_all_the_calls(t); } +/** update window from a settings change */ +static void update_global_window(void *args, gpr_uint32 id, void *stream) { + grpc_chttp2_transport *t = args; + grpc_chttp2_stream *s = stream; + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_stream_global *stream_global = &s->global; + + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global, + outgoing_window, + t->parsing.initial_window_update); + stream_global->outgoing_window += t->parsing.initial_window_update; +} + /* tcp read callback */ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { @@ -888,6 +905,10 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map); + if (t->parsing.initial_window_update != 0) { + grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, + update_global_window, t); + } /* handle higher level things */ grpc_chttp2_publish_reads(&t->global, &t->parsing); t->parsing_active = 0; |