diff options
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 350 |
1 files changed, 115 insertions, 235 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 13ddeacc02..5db9b92727 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -138,34 +138,31 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); - grpc_mdstr_unref(t->constants.str_grpc_timeout); + grpc_mdstr_unref(t->parsing.str_grpc_timeout); for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(t->lists[i].head == NULL); GPR_ASSERT(t->lists[i].tail == NULL); } - GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0); + GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0); + GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0); - grpc_chttp2_stream_map_destroy(&t->stream_map); + grpc_chttp2_stream_map_destroy(&t->parsing_stream_map); + grpc_chttp2_stream_map_destroy(&t->new_stream_map); gpr_mu_unlock(&t->mu); gpr_mu_destroy(&t->mu); - gpr_cv_destroy(&t->cv); /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ - for (i = 0; i < t->ping_count; i++) { - t->pings[i].cb(t->pings[i].user_data); + while (t->global.pings.next != &t->global.pings) { + grpc_chttp2_outstanding_ping *ping = t->global.pings.next; + grpc_iomgr_add_delayed_callback(ping->on_recv, 0); + ping->next->prev = ping->prev; + ping->prev->next = ping->next; + gpr_free(ping); } - gpr_free(t->pings); - - for (i = 0; i < t->num_pending_goaways; i++) { - gpr_slice_unref(t->pending_goaways[i].debug); - } - gpr_free(t->pending_goaways); - - grpc_sopb_destroy(&t->nuke_later_sopb); grpc_mdctx_unref(t->metadata_context); @@ -187,7 +184,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba int j; grpc_transport_setup_result sr; - GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); + GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); memset(t, 0, sizeof(*t)); @@ -196,20 +193,20 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba /* one ref is for destroy, the other for when ep becomes NULL */ gpr_ref_init(&t->refs, 2); gpr_mu_init(&t->mu); - gpr_cv_init(&t->cv); grpc_mdctx_ref(mdctx); t->metadata_context = mdctx; - t->constants.str_grpc_timeout = - grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); - t->reading = 1; - t->error_state = ERROR_STATE_NONE; - t->next_stream_id = is_client ? 1 : 2; - t->constants.is_client = is_client; + t->endpoint_reading = 1; + t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NONE; + t->global.next_stream_id = is_client ? 1 : 2; + t->global.is_client = is_client; t->global.outgoing_window = DEFAULT_WINDOW; t->global.incoming_window = DEFAULT_WINDOW; t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; - t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; - t->ping_counter = gpr_now().tv_nsec; + t->global.ping_counter = 1; + t->parsing.is_client = is_client; + t->parsing.str_grpc_timeout = + grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); + t->parsing.deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; gpr_slice_buffer_init(&t->global.qbuf); @@ -222,17 +219,17 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t); - grpc_sopb_init(&t->nuke_later_sopb); if (is_client) { gpr_slice_buffer_add(&t->global.qbuf, - gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); + gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's needed. TODO(ctiller): tune this */ - grpc_chttp2_stream_map_init(&t->stream_map, 8); + grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8); + grpc_chttp2_stream_map_init(&t->new_stream_map, 8); /* copy in initial settings to all setting sets */ for (i = 0; i < NUM_SETTING_SETS; i++) { @@ -247,7 +244,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba t->global.sent_local_settings = 0; /* configure http2 the way we like it */ - if (t->constants.is_client) { + if (is_client) { push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } @@ -257,7 +254,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) { - if (t->constants.is_client) { + if (is_client) { gpr_log(GPR_ERROR, "%s: is ignored on the client", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) { @@ -272,13 +269,13 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba if (channel_args->args[i].type != GRPC_ARG_INTEGER) { gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER); - } else if ((t->next_stream_id & 1) != + } else if ((t->global.next_stream_id & 1) != (channel_args->args[i].value.integer & 1)) { gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1, - t->constants.is_client ? "client" : "server"); + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->global.next_stream_id & 1, + is_client ? "client" : "server"); } else { - t->next_stream_id = channel_args->args[i].value.integer; + t->global.next_stream_id = channel_args->args[i].value.integer; } } } @@ -295,7 +292,6 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba t->channel_callback.cb = sr.callbacks; t->channel_callback.cb_user_data = sr.user_data; t->channel_callback.executing = 0; - if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); ref_transport(t); /* matches unref inside recv_data */ @@ -309,28 +305,9 @@ static void destroy_transport(grpc_transport *gt) { lock(t); t->destroying = 1; - /* Wait for pending stuff to finish. - We need to be not calling back to ensure that closed() gets a chance to - trigger if needed during unlock() before we die. - We need to be not writing as cancellation finalization may produce some - callbacks that NEED to be made to close out some streams when t->writing - becomes 0. */ - while (t->channel_callback.executing || t->writing_active) { - gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); - } drop_connection(t); unlock(t); - /* The drop_connection() above puts the grpc_chttp2_transport into an error state, and - the follow-up unlock should then (as part of the cleanup work it does) - ensure that cb is NULL, and therefore not call back anything further. - This check validates this very subtle behavior. - It's shutdown path, so I don't believe an extra lock pair is going to be - problematic for performance. */ - lock(t); - GPR_ASSERT(t->error_state == ERROR_STATE_NOTIFIED); - unlock(t); - unref_transport(t); } @@ -354,7 +331,7 @@ static void goaway(grpc_transport *gt, grpc_status_code status, gpr_slice debug_data) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; lock(t); - grpc_chttp2_goaway_append(t->last_incoming_stream_id, + grpc_chttp2_goaway_append(t->global.last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(status), debug_data, &t->global.qbuf); unlock(t); @@ -367,41 +344,30 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, memset(s, 0, sizeof(*s)); + s->parsing.incoming_deadline = gpr_inf_future; + grpc_sopb_init(&s->writing.sopb); + grpc_chttp2_data_parser_init(&s->parsing.data_parser); + ref_transport(t); lock(t); - if (!server_data) { - s->global.id = 0; - s->global.outgoing_window = 0; - s->global.incoming_window = 0; - } else { - /* already locked */ + if (server_data) { + GPR_ASSERT(t->parsing_active); s->global.id = (gpr_uint32)(gpr_uintptr)server_data; s->global.outgoing_window = t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->global.incoming_window = t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - t->incoming_stream = s; - grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s); + *t->accepting_stream = s; + grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s); } - s->incoming_deadline = gpr_inf_future; - grpc_sopb_init(&s->writing.sopb); - grpc_sopb_init(&s->callback_sopb); - grpc_chttp2_data_parser_init(&s->parser); - if (initial_op) perform_op_locked(t, s, initial_op); - unlock(t); return 0; } -static void schedule_nuke_sopb(grpc_chttp2_transport *t, grpc_stream_op_buffer *sopb) { - grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops); - sopb->nops = 0; -} - static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -409,7 +375,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_lock(&t->mu); - GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->global.id == 0); + GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0); for (i = 0; i < STREAM_LIST_COUNT; i++) { stream_list_remove(t, s, i); @@ -418,15 +384,14 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_unlock(&t->mu); GPR_ASSERT(s->global.outgoing_sopb == NULL); - GPR_ASSERT(s->incoming_sopb == NULL); + GPR_ASSERT(s->global.incoming_sopb == NULL); grpc_sopb_destroy(&s->writing.sopb); - grpc_sopb_destroy(&s->callback_sopb); - grpc_chttp2_data_parser_destroy(&s->parser); - for (i = 0; i < s->incoming_metadata_count; i++) { - grpc_mdelem_unref(s->incoming_metadata[i].md); + grpc_chttp2_data_parser_destroy(&s->parsing.data_parser); + for (i = 0; i < s->parsing.incoming_metadata_count; i++) { + grpc_mdelem_unref(s->parsing.incoming_metadata[i].md); } - gpr_free(s->incoming_metadata); - gpr_free(s->old_incoming_metadata); + gpr_free(s->parsing.incoming_metadata); + gpr_free(s->parsing.old_incoming_metadata); unref_transport(t); } @@ -495,14 +460,16 @@ static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gr stream_list_add_tail(t, s, id); } +#if 0 static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { if (s->global.id == 0) return; IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing grpc_chttp2_stream %d", - t->constants.is_client ? "CLI" : "SVR", s->global.id)); + t->global.is_client ? "CLI" : "SVR", s->global.id)); if (grpc_chttp2_stream_map_delete(&t->stream_map, s->global.id)) { maybe_start_some_streams(t); } } +#endif /* * LOCK MANAGEMENT @@ -518,7 +485,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_chttp2_transport *t) { grpc_iomgr_closure *run_closures; - if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->constants, &t->global, &t->writing)) { + if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; ref_transport(t); schedule_cb(t, &t->writing_action, 1); @@ -568,15 +535,12 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ } /* cleanup writing related jazz */ - grpc_chttp2_cleanup_writing(&t->constants, &t->global, &t->writing); + grpc_chttp2_cleanup_writing(&t->global, &t->writing); /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ t->writing_active = 0; - if (t->destroying) { - gpr_cv_signal(&t->cv); - } - if (!t->reading) { + if (!t->endpoint_reading) { grpc_endpoint_destroy(t->ep); t->ep = NULL; unref_transport(t); /* safe because we'll still have the ref for write */ @@ -595,50 +559,42 @@ static void writing_action(void *gt, int iomgr_success_ignored) { static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error, gpr_slice goaway_text) { - if (t->num_pending_goaways == t->cap_pending_goaways) { - t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); - t->pending_goaways = gpr_realloc( - t->pending_goaways, sizeof(grpc_chttp2_pending_goaway) * t->cap_pending_goaways); - } - t->pending_goaways[t->num_pending_goaways].status = - grpc_chttp2_http2_error_to_grpc_status(goaway_error); - t->pending_goaways[t->num_pending_goaways].debug = goaway_text; - t->num_pending_goaways++; + gpr_slice_unref(t->channel_callback.goaway_text); + t->channel_callback.have_goaway = 1; + t->channel_callback.goaway_text = goaway_text; + t->channel_callback.goaway_error = goaway_error; } static void maybe_start_some_streams(grpc_chttp2_transport *t) { + grpc_chttp2_stream *s; /* start streams where we have free grpc_chttp2_stream ids and free concurrency */ - while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID && - grpc_chttp2_stream_map_size(&t->stream_map) < + while (t->global.next_stream_id <= MAX_CLIENT_STREAM_ID && + t->global.concurrent_stream_count < t->global.settings[PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { - grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); - if (!s) return; - + [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && + (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) { IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", - t->constants.is_client ? "CLI" : "SVR", s, t->next_stream_id)); + t->global.is_client ? "CLI" : "SVR", s, t->global.next_stream_id)); - if (t->next_stream_id == MAX_CLIENT_STREAM_ID) { + if (t->global.next_stream_id == MAX_CLIENT_STREAM_ID) { add_goaway( t, GRPC_CHTTP2_NO_ERROR, gpr_slice_from_copied_string("Exceeded sequence number limit")); } GPR_ASSERT(s->global.id == 0); - s->global.id = t->next_stream_id; - t->next_stream_id += 2; + s->global.id = t->global.next_stream_id; + t->global.next_stream_id += 2; s->global.outgoing_window = t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->global.incoming_window = t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s); + grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s); + t->global.concurrent_stream_count++; stream_list_join(t, s, WRITABLE); } /* cancel out streams that will never be started */ - while (t->next_stream_id > MAX_CLIENT_STREAM_ID) { - grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); - if (!s) return; - + while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID && (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) { cancel_stream( t, s, GRPC_STATUS_UNAVAILABLE, grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, @@ -646,6 +602,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) { } } +#if 0 static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_stream( @@ -665,27 +622,27 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g if (s->global.id == 0) { IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", - t->constants.is_client ? "CLI" : "SVR", s)); + t->global.is_client ? "CLI" : "SVR", s)); stream_list_join(t, s, WAITING_FOR_CONCURRENCY); maybe_start_some_streams(t); } else if (s->global.outgoing_window > 0) { stream_list_join(t, s, WRITABLE); } } else { - schedule_nuke_sopb(t, op->send_ops); + grpc_sopb_reset(op->send_ops); schedule_cb(t, s->global.send_done_closure, 0); } } if (op->recv_ops) { - GPR_ASSERT(s->incoming_sopb == NULL); - GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED); + GPR_ASSERT(s->global.incoming_sopb == NULL); + GPR_ASSERT(s->global.published_state != GRPC_STREAM_CLOSED); s->global.recv_done_closure = op->on_done_recv; - s->incoming_sopb = op->recv_ops; - s->incoming_sopb->nops = 0; - s->publish_state = op->recv_state; - gpr_free(s->old_incoming_metadata); - s->old_incoming_metadata = NULL; + s->global.incoming_sopb = op->recv_ops; + s->global.incoming_sopb->nops = 0; + s->global.publish_state = op->recv_state; + gpr_free(s->global.old_incoming_metadata); + s->global.old_incoming_metadata = NULL; maybe_finish_read(t, s, 0); maybe_join_window_updates(t, s); } @@ -698,6 +655,7 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g schedule_cb(t, op->on_consumed, 1); } } +#endif static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { @@ -709,28 +667,23 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs, unlock(t); } -static void send_ping(grpc_transport *gt, void (*cb)(void *user_data), - void *user_data) { +static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_outstanding_ping *p; + grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); lock(t); - if (t->ping_capacity == t->ping_count) { - t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2); - t->pings = - gpr_realloc(t->pings, sizeof(grpc_chttp2_outstanding_ping) * t->ping_capacity); - } - p = &t->pings[t->ping_count++]; - p->id[0] = (t->ping_counter >> 56) & 0xff; - p->id[1] = (t->ping_counter >> 48) & 0xff; - p->id[2] = (t->ping_counter >> 40) & 0xff; - p->id[3] = (t->ping_counter >> 32) & 0xff; - p->id[4] = (t->ping_counter >> 24) & 0xff; - p->id[5] = (t->ping_counter >> 16) & 0xff; - p->id[6] = (t->ping_counter >> 8) & 0xff; - p->id[7] = t->ping_counter & 0xff; - p->cb = cb; - p->user_data = user_data; + p->next = &t->global.pings; + p->prev = p->next->prev; + p->prev->next = p->next->prev = p; + p->id[0] = (t->global.ping_counter >> 56) & 0xff; + p->id[1] = (t->global.ping_counter >> 48) & 0xff; + p->id[2] = (t->global.ping_counter >> 40) & 0xff; + p->id[3] = (t->global.ping_counter >> 32) & 0xff; + p->id[4] = (t->global.ping_counter >> 24) & 0xff; + p->id[5] = (t->global.ping_counter >> 16) & 0xff; + p->id[6] = (t->global.ping_counter >> 8) & 0xff; + p->id[7] = t->global.ping_counter & 0xff; + p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); unlock(t); } @@ -753,6 +706,7 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) { } } +#if 0 static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, @@ -844,15 +798,17 @@ static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *grpc_chttp2_s static void end_all_the_calls(grpc_chttp2_transport *t) { grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t); } +#endif static void drop_connection(grpc_chttp2_transport *t) { - if (t->error_state == ERROR_STATE_NONE) { - t->error_state = ERROR_STATE_SEEN; + if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { + t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN; } close_transport_locked(t); end_all_the_calls(t); } +#if 0 static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int is_parser) { if (is_parser) { stream_list_join(t, s, MAYBE_FINISH_READ_AFTER_PARSE); @@ -860,6 +816,7 @@ static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, i stream_list_join(t, s, FINISHED_READ_OP); } } +#endif static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { if (t->parsing.executing) { @@ -875,15 +832,16 @@ static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stre } } +#if 0 static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) { return grpc_chttp2_stream_map_find(&t->stream_map, id); } +#endif /* tcp read callback */ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { grpc_chttp2_transport *t = tp; - grpc_chttp2_stream *s; size_t i; int keep_reading = 0; @@ -893,8 +851,8 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, case GRPC_ENDPOINT_CB_ERROR: lock(t); drop_connection(t); - t->reading = 0; - if (!t->writing.executing && t->ep) { + t->endpoint_reading = 0; + if (!t->writing_active && t->ep) { grpc_endpoint_destroy(t->ep); t->ep = NULL; unref_transport(t); /* safe as we still have a ref for read */ @@ -904,9 +862,10 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, break; case GRPC_ENDPOINT_CB_OK: lock(t); - GPR_ASSERT(!t->parsing.executing); - if (t->error_state == ERROR_STATE_NONE) { - t->parsing.executing = 1; + GPR_ASSERT(!t->parsing_active); + if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) { + t->parsing_active = 1; + grpc_chttp2_prepare_to_read(&t->global, &t->parsing); gpr_mu_unlock(&t->mu); for (i = 0; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++) ; @@ -914,8 +873,14 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, if (i != nslices) { drop_connection(t); } - t->parsing.executing = 0; + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); + t->global.concurrent_stream_count = grpc_stream_map_size(&t->parsing_stream_map); + /* handle higher level things */ + grpc_chttp2_publish_reads(&t->global, &t->parsing); + t->parsing_active = 0; } +#if 0 while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) { maybe_finish_read(t, s, 0); } @@ -940,6 +905,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, t->global.outgoing_window += t->global.outgoing_window_update; t->global.outgoing_window_update = 0; maybe_start_some_streams(t); +#endif unlock(t); keep_reading = 1; break; @@ -964,92 +930,6 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, return GRPC_STREAM_OPEN; } -static void patch_metadata_ops(grpc_chttp2_stream *s) { - grpc_stream_op *ops = s->incoming_sopb->ops; - size_t nops = s->incoming_sopb->nops; - size_t i; - size_t j; - size_t mdidx = 0; - size_t last_mdidx; - int found_metadata = 0; - - /* rework the array of metadata into a linked list, making use - of the breadcrumbs we left in metadata batches during - add_metadata_batch */ - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - found_metadata = 1; - /* we left a breadcrumb indicating where the end of this list is, - and since we add sequentially, we know from the end of the last - segment where this segment begins */ - last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); - GPR_ASSERT(last_mdidx > mdidx); - GPR_ASSERT(last_mdidx <= s->incoming_metadata_count); - /* turn the array into a doubly linked list */ - op->data.metadata.list.head = &s->incoming_metadata[mdidx]; - op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1]; - for (j = mdidx + 1; j < last_mdidx; j++) { - s->incoming_metadata[j].prev = &s->incoming_metadata[j - 1]; - s->incoming_metadata[j - 1].next = &s->incoming_metadata[j]; - } - s->incoming_metadata[mdidx].prev = NULL; - s->incoming_metadata[last_mdidx - 1].next = NULL; - /* track where we're up to */ - mdidx = last_mdidx; - } - if (found_metadata) { - s->old_incoming_metadata = s->incoming_metadata; - if (mdidx != s->incoming_metadata_count) { - /* we have a partially read metadata batch still in incoming_metadata */ - size_t new_count = s->incoming_metadata_count - mdidx; - size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count; - GPR_ASSERT(mdidx < s->incoming_metadata_count); - s->incoming_metadata = gpr_malloc(copy_bytes); - memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, - copy_bytes); - s->incoming_metadata_count = s->incoming_metadata_capacity = new_count; - } else { - s->incoming_metadata = NULL; - s->incoming_metadata_count = 0; - s->incoming_metadata_capacity = 0; - } - } -} - -static void unlock_check_parser(grpc_chttp2_transport *t) { - grpc_chttp2_stream *s; - - if (t->parsing.executing) { - return; - } - - while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { - int publish = 0; - GPR_ASSERT(s->incoming_sopb); - *s->publish_state = - compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); - if (*s->publish_state != s->published_state) { - s->published_state = *s->publish_state; - publish = 1; - if (s->published_state == GRPC_STREAM_CLOSED) { - remove_from_stream_map(t, s); - } - } - if (s->parser.incoming_sopb.nops > 0) { - grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); - publish = 1; - } - if (publish) { - if (s->incoming_metadata_count > 0) { - patch_metadata_ops(s); - } - s->incoming_sopb = NULL; - schedule_cb(t, s->global.recv_done_closure, 1); - } - } -} - typedef struct { grpc_chttp2_transport *t; grpc_chttp2_pending_goaway *goaways; |