aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c350
1 files changed, 115 insertions, 235 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 13ddeacc02..5db9b92727 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -138,34 +138,31 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
- grpc_mdstr_unref(t->constants.str_grpc_timeout);
+ grpc_mdstr_unref(t->parsing.str_grpc_timeout);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
GPR_ASSERT(t->lists[i].head == NULL);
GPR_ASSERT(t->lists[i].tail == NULL);
}
- GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
+ GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0);
+ GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0);
- grpc_chttp2_stream_map_destroy(&t->stream_map);
+ grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
+ grpc_chttp2_stream_map_destroy(&t->new_stream_map);
gpr_mu_unlock(&t->mu);
gpr_mu_destroy(&t->mu);
- gpr_cv_destroy(&t->cv);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
- for (i = 0; i < t->ping_count; i++) {
- t->pings[i].cb(t->pings[i].user_data);
+ while (t->global.pings.next != &t->global.pings) {
+ grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
+ grpc_iomgr_add_delayed_callback(ping->on_recv, 0);
+ ping->next->prev = ping->prev;
+ ping->prev->next = ping->next;
+ gpr_free(ping);
}
- gpr_free(t->pings);
-
- for (i = 0; i < t->num_pending_goaways; i++) {
- gpr_slice_unref(t->pending_goaways[i].debug);
- }
- gpr_free(t->pending_goaways);
-
- grpc_sopb_destroy(&t->nuke_later_sopb);
grpc_mdctx_unref(t->metadata_context);
@@ -187,7 +184,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
int j;
grpc_transport_setup_result sr;
- GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
+ GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
memset(t, 0, sizeof(*t));
@@ -196,20 +193,20 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
/* one ref is for destroy, the other for when ep becomes NULL */
gpr_ref_init(&t->refs, 2);
gpr_mu_init(&t->mu);
- gpr_cv_init(&t->cv);
grpc_mdctx_ref(mdctx);
t->metadata_context = mdctx;
- t->constants.str_grpc_timeout =
- grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
- t->reading = 1;
- t->error_state = ERROR_STATE_NONE;
- t->next_stream_id = is_client ? 1 : 2;
- t->constants.is_client = is_client;
+ t->endpoint_reading = 1;
+ t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NONE;
+ t->global.next_stream_id = is_client ? 1 : 2;
+ t->global.is_client = is_client;
t->global.outgoing_window = DEFAULT_WINDOW;
t->global.incoming_window = DEFAULT_WINDOW;
t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
- t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
- t->ping_counter = gpr_now().tv_nsec;
+ t->global.ping_counter = 1;
+ t->parsing.is_client = is_client;
+ t->parsing.str_grpc_timeout =
+ grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
+ t->parsing.deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
gpr_slice_buffer_init(&t->global.qbuf);
@@ -222,17 +219,17 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t);
- grpc_sopb_init(&t->nuke_later_sopb);
if (is_client) {
gpr_slice_buffer_add(&t->global.qbuf,
- gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
+ gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
large enough that the exponential growth should happen nicely when it's
needed.
TODO(ctiller): tune this */
- grpc_chttp2_stream_map_init(&t->stream_map, 8);
+ grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8);
+ grpc_chttp2_stream_map_init(&t->new_stream_map, 8);
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
@@ -247,7 +244,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
t->global.sent_local_settings = 0;
/* configure http2 the way we like it */
- if (t->constants.is_client) {
+ if (is_client) {
push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
@@ -257,7 +254,7 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
for (i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
- if (t->constants.is_client) {
+ if (is_client) {
gpr_log(GPR_ERROR, "%s: is ignored on the client",
GRPC_ARG_MAX_CONCURRENT_STREAMS);
} else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
@@ -272,13 +269,13 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s: must be an integer",
GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER);
- } else if ((t->next_stream_id & 1) !=
+ } else if ((t->global.next_stream_id & 1) !=
(channel_args->args[i].value.integer & 1)) {
gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
- GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
- t->constants.is_client ? "client" : "server");
+ GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->global.next_stream_id & 1,
+ is_client ? "client" : "server");
} else {
- t->next_stream_id = channel_args->args[i].value.integer;
+ t->global.next_stream_id = channel_args->args[i].value.integer;
}
}
}
@@ -295,7 +292,6 @@ static void init_transport(grpc_chttp2_transport *t, grpc_transport_setup_callba
t->channel_callback.cb = sr.callbacks;
t->channel_callback.cb_user_data = sr.user_data;
t->channel_callback.executing = 0;
- if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
ref_transport(t); /* matches unref inside recv_data */
@@ -309,28 +305,9 @@ static void destroy_transport(grpc_transport *gt) {
lock(t);
t->destroying = 1;
- /* Wait for pending stuff to finish.
- We need to be not calling back to ensure that closed() gets a chance to
- trigger if needed during unlock() before we die.
- We need to be not writing as cancellation finalization may produce some
- callbacks that NEED to be made to close out some streams when t->writing
- becomes 0. */
- while (t->channel_callback.executing || t->writing_active) {
- gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
- }
drop_connection(t);
unlock(t);
- /* The drop_connection() above puts the grpc_chttp2_transport into an error state, and
- the follow-up unlock should then (as part of the cleanup work it does)
- ensure that cb is NULL, and therefore not call back anything further.
- This check validates this very subtle behavior.
- It's shutdown path, so I don't believe an extra lock pair is going to be
- problematic for performance. */
- lock(t);
- GPR_ASSERT(t->error_state == ERROR_STATE_NOTIFIED);
- unlock(t);
-
unref_transport(t);
}
@@ -354,7 +331,7 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
gpr_slice debug_data) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
- grpc_chttp2_goaway_append(t->last_incoming_stream_id,
+ grpc_chttp2_goaway_append(t->global.last_incoming_stream_id,
grpc_chttp2_grpc_status_to_http2_error(status),
debug_data, &t->global.qbuf);
unlock(t);
@@ -367,41 +344,30 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
memset(s, 0, sizeof(*s));
+ s->parsing.incoming_deadline = gpr_inf_future;
+ grpc_sopb_init(&s->writing.sopb);
+ grpc_chttp2_data_parser_init(&s->parsing.data_parser);
+
ref_transport(t);
lock(t);
- if (!server_data) {
- s->global.id = 0;
- s->global.outgoing_window = 0;
- s->global.incoming_window = 0;
- } else {
- /* already locked */
+ if (server_data) {
+ GPR_ASSERT(t->parsing_active);
s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
s->global.outgoing_window =
t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- t->incoming_stream = s;
- grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s);
+ *t->accepting_stream = s;
+ grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
}
- s->incoming_deadline = gpr_inf_future;
- grpc_sopb_init(&s->writing.sopb);
- grpc_sopb_init(&s->callback_sopb);
- grpc_chttp2_data_parser_init(&s->parser);
-
if (initial_op) perform_op_locked(t, s, initial_op);
-
unlock(t);
return 0;
}
-static void schedule_nuke_sopb(grpc_chttp2_transport *t, grpc_stream_op_buffer *sopb) {
- grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
- sopb->nops = 0;
-}
-
static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@@ -409,7 +375,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_lock(&t->mu);
- GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->global.id == 0);
+ GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
stream_list_remove(t, s, i);
@@ -418,15 +384,14 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock(&t->mu);
GPR_ASSERT(s->global.outgoing_sopb == NULL);
- GPR_ASSERT(s->incoming_sopb == NULL);
+ GPR_ASSERT(s->global.incoming_sopb == NULL);
grpc_sopb_destroy(&s->writing.sopb);
- grpc_sopb_destroy(&s->callback_sopb);
- grpc_chttp2_data_parser_destroy(&s->parser);
- for (i = 0; i < s->incoming_metadata_count; i++) {
- grpc_mdelem_unref(s->incoming_metadata[i].md);
+ grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
+ for (i = 0; i < s->parsing.incoming_metadata_count; i++) {
+ grpc_mdelem_unref(s->parsing.incoming_metadata[i].md);
}
- gpr_free(s->incoming_metadata);
- gpr_free(s->old_incoming_metadata);
+ gpr_free(s->parsing.incoming_metadata);
+ gpr_free(s->parsing.old_incoming_metadata);
unref_transport(t);
}
@@ -495,14 +460,16 @@ static void stream_list_join(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gr
stream_list_add_tail(t, s, id);
}
+#if 0
static void remove_from_stream_map(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
if (s->global.id == 0) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing grpc_chttp2_stream %d",
- t->constants.is_client ? "CLI" : "SVR", s->global.id));
+ t->global.is_client ? "CLI" : "SVR", s->global.id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->global.id)) {
maybe_start_some_streams(t);
}
}
+#endif
/*
* LOCK MANAGEMENT
@@ -518,7 +485,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
- if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->constants, &t->global, &t->writing)) {
+ if (!t->writing_active && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
ref_transport(t);
schedule_cb(t, &t->writing_action, 1);
@@ -568,15 +535,12 @@ void grpc_chttp2_terminate_writing(grpc_chttp2_transport_writing *transport_writ
}
/* cleanup writing related jazz */
- grpc_chttp2_cleanup_writing(&t->constants, &t->global, &t->writing);
+ grpc_chttp2_cleanup_writing(&t->global, &t->writing);
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
t->writing_active = 0;
- if (t->destroying) {
- gpr_cv_signal(&t->cv);
- }
- if (!t->reading) {
+ if (!t->endpoint_reading) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
unref_transport(t); /* safe because we'll still have the ref for write */
@@ -595,50 +559,42 @@ static void writing_action(void *gt, int iomgr_success_ignored) {
static void add_goaway(grpc_chttp2_transport *t, gpr_uint32 goaway_error,
gpr_slice goaway_text) {
- if (t->num_pending_goaways == t->cap_pending_goaways) {
- t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
- t->pending_goaways = gpr_realloc(
- t->pending_goaways, sizeof(grpc_chttp2_pending_goaway) * t->cap_pending_goaways);
- }
- t->pending_goaways[t->num_pending_goaways].status =
- grpc_chttp2_http2_error_to_grpc_status(goaway_error);
- t->pending_goaways[t->num_pending_goaways].debug = goaway_text;
- t->num_pending_goaways++;
+ gpr_slice_unref(t->channel_callback.goaway_text);
+ t->channel_callback.have_goaway = 1;
+ t->channel_callback.goaway_text = goaway_text;
+ t->channel_callback.goaway_error = goaway_error;
}
static void maybe_start_some_streams(grpc_chttp2_transport *t) {
+ grpc_chttp2_stream *s;
/* start streams where we have free grpc_chttp2_stream ids and free concurrency */
- while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
- grpc_chttp2_stream_map_size(&t->stream_map) <
+ while (t->global.next_stream_id <= MAX_CLIENT_STREAM_ID &&
+ t->global.concurrent_stream_count <
t->global.settings[PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
- grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
- if (!s) return;
-
+ [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
+ (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
- t->constants.is_client ? "CLI" : "SVR", s, t->next_stream_id));
+ t->global.is_client ? "CLI" : "SVR", s, t->global.next_stream_id));
- if (t->next_stream_id == MAX_CLIENT_STREAM_ID) {
+ if (t->global.next_stream_id == MAX_CLIENT_STREAM_ID) {
add_goaway(
t, GRPC_CHTTP2_NO_ERROR,
gpr_slice_from_copied_string("Exceeded sequence number limit"));
}
GPR_ASSERT(s->global.id == 0);
- s->global.id = t->next_stream_id;
- t->next_stream_id += 2;
+ s->global.id = t->global.next_stream_id;
+ t->global.next_stream_id += 2;
s->global.outgoing_window =
t->global.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->global.incoming_window =
t->global.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s);
+ grpc_chttp2_stream_map_add(&t->new_stream_map, s->global.id, s);
+ t->global.concurrent_stream_count++;
stream_list_join(t, s, WRITABLE);
}
/* cancel out streams that will never be started */
- while (t->next_stream_id > MAX_CLIENT_STREAM_ID) {
- grpc_chttp2_stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
- if (!s) return;
-
+ while (t->global.next_stream_id > MAX_CLIENT_STREAM_ID && (s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY))) {
cancel_stream(
t, s, GRPC_STATUS_UNAVAILABLE,
grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL,
@@ -646,6 +602,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport *t) {
}
}
+#if 0
static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_transport_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_stream(
@@ -665,27 +622,27 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g
if (s->global.id == 0) {
IF_TRACING(gpr_log(GPR_DEBUG,
"HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
- t->constants.is_client ? "CLI" : "SVR", s));
+ t->global.is_client ? "CLI" : "SVR", s));
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
} else if (s->global.outgoing_window > 0) {
stream_list_join(t, s, WRITABLE);
}
} else {
- schedule_nuke_sopb(t, op->send_ops);
+ grpc_sopb_reset(op->send_ops);
schedule_cb(t, s->global.send_done_closure, 0);
}
}
if (op->recv_ops) {
- GPR_ASSERT(s->incoming_sopb == NULL);
- GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED);
+ GPR_ASSERT(s->global.incoming_sopb == NULL);
+ GPR_ASSERT(s->global.published_state != GRPC_STREAM_CLOSED);
s->global.recv_done_closure = op->on_done_recv;
- s->incoming_sopb = op->recv_ops;
- s->incoming_sopb->nops = 0;
- s->publish_state = op->recv_state;
- gpr_free(s->old_incoming_metadata);
- s->old_incoming_metadata = NULL;
+ s->global.incoming_sopb = op->recv_ops;
+ s->global.incoming_sopb->nops = 0;
+ s->global.publish_state = op->recv_state;
+ gpr_free(s->global.old_incoming_metadata);
+ s->global.old_incoming_metadata = NULL;
maybe_finish_read(t, s, 0);
maybe_join_window_updates(t, s);
}
@@ -698,6 +655,7 @@ static void perform_op_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s, g
schedule_cb(t, op->on_consumed, 1);
}
}
+#endif
static void perform_op(grpc_transport *gt, grpc_stream *gs,
grpc_transport_op *op) {
@@ -709,28 +667,23 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs,
unlock(t);
}
-static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
- void *user_data) {
+static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_outstanding_ping *p;
+ grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
lock(t);
- if (t->ping_capacity == t->ping_count) {
- t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
- t->pings =
- gpr_realloc(t->pings, sizeof(grpc_chttp2_outstanding_ping) * t->ping_capacity);
- }
- p = &t->pings[t->ping_count++];
- p->id[0] = (t->ping_counter >> 56) & 0xff;
- p->id[1] = (t->ping_counter >> 48) & 0xff;
- p->id[2] = (t->ping_counter >> 40) & 0xff;
- p->id[3] = (t->ping_counter >> 32) & 0xff;
- p->id[4] = (t->ping_counter >> 24) & 0xff;
- p->id[5] = (t->ping_counter >> 16) & 0xff;
- p->id[6] = (t->ping_counter >> 8) & 0xff;
- p->id[7] = t->ping_counter & 0xff;
- p->cb = cb;
- p->user_data = user_data;
+ p->next = &t->global.pings;
+ p->prev = p->next->prev;
+ p->prev->next = p->next->prev = p;
+ p->id[0] = (t->global.ping_counter >> 56) & 0xff;
+ p->id[1] = (t->global.ping_counter >> 48) & 0xff;
+ p->id[2] = (t->global.ping_counter >> 40) & 0xff;
+ p->id[3] = (t->global.ping_counter >> 32) & 0xff;
+ p->id[4] = (t->global.ping_counter >> 24) & 0xff;
+ p->id[5] = (t->global.ping_counter >> 16) & 0xff;
+ p->id[6] = (t->global.ping_counter >> 8) & 0xff;
+ p->id[7] = t->global.ping_counter & 0xff;
+ p->on_recv = on_recv;
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
unlock(t);
}
@@ -753,6 +706,7 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) {
}
}
+#if 0
static void cancel_stream_inner(grpc_chttp2_transport *t, grpc_chttp2_stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
@@ -844,15 +798,17 @@ static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *grpc_chttp2_s
static void end_all_the_calls(grpc_chttp2_transport *t) {
grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
}
+#endif
static void drop_connection(grpc_chttp2_transport *t) {
- if (t->error_state == ERROR_STATE_NONE) {
- t->error_state = ERROR_STATE_SEEN;
+ if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
+ t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN;
}
close_transport_locked(t);
end_all_the_calls(t);
}
+#if 0
static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int is_parser) {
if (is_parser) {
stream_list_join(t, s, MAYBE_FINISH_READ_AFTER_PARSE);
@@ -860,6 +816,7 @@ static void maybe_finish_read(grpc_chttp2_transport *t, grpc_chttp2_stream *s, i
stream_list_join(t, s, FINISHED_READ_OP);
}
}
+#endif
static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
if (t->parsing.executing) {
@@ -875,15 +832,16 @@ static void maybe_join_window_updates(grpc_chttp2_transport *t, grpc_chttp2_stre
}
}
+#if 0
static grpc_chttp2_stream *lookup_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
return grpc_chttp2_stream_map_find(&t->stream_map, id);
}
+#endif
/* tcp read callback */
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) {
grpc_chttp2_transport *t = tp;
- grpc_chttp2_stream *s;
size_t i;
int keep_reading = 0;
@@ -893,8 +851,8 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
case GRPC_ENDPOINT_CB_ERROR:
lock(t);
drop_connection(t);
- t->reading = 0;
- if (!t->writing.executing && t->ep) {
+ t->endpoint_reading = 0;
+ if (!t->writing_active && t->ep) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
unref_transport(t); /* safe as we still have a ref for read */
@@ -904,9 +862,10 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
break;
case GRPC_ENDPOINT_CB_OK:
lock(t);
- GPR_ASSERT(!t->parsing.executing);
- if (t->error_state == ERROR_STATE_NONE) {
- t->parsing.executing = 1;
+ GPR_ASSERT(!t->parsing_active);
+ if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
+ t->parsing_active = 1;
+ grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu);
for (i = 0; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); i++)
;
@@ -914,8 +873,14 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
if (i != nslices) {
drop_connection(t);
}
- t->parsing.executing = 0;
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
+ t->global.concurrent_stream_count = grpc_stream_map_size(&t->parsing_stream_map);
+ /* handle higher level things */
+ grpc_chttp2_publish_reads(&t->global, &t->parsing);
+ t->parsing_active = 0;
}
+#if 0
while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) {
maybe_finish_read(t, s, 0);
}
@@ -940,6 +905,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
t->global.outgoing_window += t->global.outgoing_window_update;
t->global.outgoing_window_update = 0;
maybe_start_some_streams(t);
+#endif
unlock(t);
keep_reading = 1;
break;
@@ -964,92 +930,6 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN;
}
-static void patch_metadata_ops(grpc_chttp2_stream *s) {
- grpc_stream_op *ops = s->incoming_sopb->ops;
- size_t nops = s->incoming_sopb->nops;
- size_t i;
- size_t j;
- size_t mdidx = 0;
- size_t last_mdidx;
- int found_metadata = 0;
-
- /* rework the array of metadata into a linked list, making use
- of the breadcrumbs we left in metadata batches during
- add_metadata_batch */
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- if (op->type != GRPC_OP_METADATA) continue;
- found_metadata = 1;
- /* we left a breadcrumb indicating where the end of this list is,
- and since we add sequentially, we know from the end of the last
- segment where this segment begins */
- last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
- GPR_ASSERT(last_mdidx > mdidx);
- GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
- /* turn the array into a doubly linked list */
- op->data.metadata.list.head = &s->incoming_metadata[mdidx];
- op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
- for (j = mdidx + 1; j < last_mdidx; j++) {
- s->incoming_metadata[j].prev = &s->incoming_metadata[j - 1];
- s->incoming_metadata[j - 1].next = &s->incoming_metadata[j];
- }
- s->incoming_metadata[mdidx].prev = NULL;
- s->incoming_metadata[last_mdidx - 1].next = NULL;
- /* track where we're up to */
- mdidx = last_mdidx;
- }
- if (found_metadata) {
- s->old_incoming_metadata = s->incoming_metadata;
- if (mdidx != s->incoming_metadata_count) {
- /* we have a partially read metadata batch still in incoming_metadata */
- size_t new_count = s->incoming_metadata_count - mdidx;
- size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count;
- GPR_ASSERT(mdidx < s->incoming_metadata_count);
- s->incoming_metadata = gpr_malloc(copy_bytes);
- memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata,
- copy_bytes);
- s->incoming_metadata_count = s->incoming_metadata_capacity = new_count;
- } else {
- s->incoming_metadata = NULL;
- s->incoming_metadata_count = 0;
- s->incoming_metadata_capacity = 0;
- }
- }
-}
-
-static void unlock_check_parser(grpc_chttp2_transport *t) {
- grpc_chttp2_stream *s;
-
- if (t->parsing.executing) {
- return;
- }
-
- while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
- int publish = 0;
- GPR_ASSERT(s->incoming_sopb);
- *s->publish_state =
- compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
- if (*s->publish_state != s->published_state) {
- s->published_state = *s->publish_state;
- publish = 1;
- if (s->published_state == GRPC_STREAM_CLOSED) {
- remove_from_stream_map(t, s);
- }
- }
- if (s->parser.incoming_sopb.nops > 0) {
- grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
- publish = 1;
- }
- if (publish) {
- if (s->incoming_metadata_count > 0) {
- patch_metadata_ops(s);
- }
- s->incoming_sopb = NULL;
- schedule_cb(t, s->global.recv_done_closure, 1);
- }
- }
-}
-
typedef struct {
grpc_chttp2_transport *t;
grpc_chttp2_pending_goaway *goaways;