aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-05-01 10:57:14 -0700
committerGravatar Yang Gao <yangg@google.com>2015-05-01 10:57:14 -0700
commit4db2954d1e78c9ecfd7843a7a9b87a3d9a583d32 (patch)
treee8f23d5e23ce0c4d0dd278ed140631ebc42ba8c0 /src/core/transport/chttp2_transport.c
parentc9e39c0728da270c1bced4938ee9745aeb208890 (diff)
parente217307a6276cb5f08b07728197348a48595d0d6 (diff)
merge with head
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c504
1 files changed, 321 insertions, 183 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 786d9cf6eb..0bee37c614 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -91,10 +91,9 @@ typedef enum {
/* streams that are waiting to start because there are too many concurrent
streams on the connection */
WAITING_FOR_CONCURRENCY,
- /* streams that want to callback the application */
- PENDING_CALLBACKS,
- /* streams that *ARE* calling back to the application */
- EXECUTING_CALLBACKS,
+ /* streams that have finished reading: we wait until unlock to coalesce
+ all changes into one callback */
+ FINISHED_READ_OP,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
@@ -141,6 +140,12 @@ typedef enum {
DTS_FRAME
} deframe_transport_state;
+typedef enum {
+ WRITE_STATE_OPEN,
+ WRITE_STATE_QUEUED_CLOSE,
+ WRITE_STATE_SENT_CLOSE
+} WRITE_STATE;
+
typedef struct {
stream *head;
stream *tail;
@@ -182,6 +187,18 @@ typedef struct {
gpr_slice debug;
} pending_goaway;
+typedef struct {
+ void (*cb)(void *user_data, int success);
+ void *user_data;
+ int success;
+} op_closure;
+
+typedef struct {
+ op_closure *callbacks;
+ size_t count;
+ size_t capacity;
+} op_closure_array;
+
struct transport {
grpc_transport base; /* must be first */
const grpc_transport_callbacks *cb;
@@ -202,6 +219,10 @@ struct transport {
gpr_uint8 closed;
error_state error_state;
+ /* queued callbacks */
+ op_closure_array pending_callbacks;
+ op_closure_array executing_callbacks;
+
/* stream indexing */
gpr_uint32 next_stream_id;
gpr_uint32 last_incoming_stream_id;
@@ -281,13 +302,13 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- gpr_uint8 queued_write_closed;
- gpr_uint8 sending_write_closed;
- gpr_uint8 sent_write_closed;
+ WRITE_STATE write_state;
+ gpr_uint8 send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
- gpr_uint8 allow_window_updates;
- gpr_uint8 published_close;
+
+ op_closure send_done_closure;
+ op_closure recv_done_closure;
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
@@ -296,10 +317,14 @@ struct stream {
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
+ grpc_linked_mdelem *old_incoming_metadata;
gpr_timespec incoming_deadline;
/* sops from application */
- grpc_stream_op_buffer outgoing_sopb;
+ grpc_stream_op_buffer *outgoing_sopb;
+ grpc_stream_op_buffer *incoming_sopb;
+ grpc_stream_state *publish_state;
+ grpc_stream_state published_state;
/* sops that have passed flow control to be written */
grpc_stream_op_buffer writing_sopb;
@@ -337,7 +362,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_chttp2_error_code error_code, int send_rst);
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst);
+ grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message, int send_rst);
static void finalize_cancellations(transport *t);
static stream *lookup_stream(transport *t, gpr_uint32 id);
static void remove_from_stream_map(transport *t, stream *s);
@@ -348,6 +374,14 @@ static void become_skip_parser(transport *t);
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
+static void schedule_cb(transport *t, op_closure closure, int success);
+static void maybe_finish_read(transport *t, stream *s);
+static void maybe_join_window_updates(transport *t, stream *s);
+static void finish_reads(transport *t);
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
+static void add_metadata_batch(transport *t, stream *s);
+
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -387,6 +421,9 @@ static void destruct_transport(transport *t) {
}
gpr_free(t->pings);
+ gpr_free(t->pending_callbacks.callbacks);
+ gpr_free(t->executing_callbacks.callbacks);
+
for (i = 0; i < t->num_pending_goaways; i++) {
gpr_slice_unref(t->pending_goaways[i].debug);
}
@@ -416,6 +453,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
+ memset(t, 0, sizeof(*t));
+
t->base.vtable = &vtable;
t->ep = ep;
/* one ref is for destroy, the other for when ep becomes NULL */
@@ -427,27 +466,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->reading = 1;
- t->writing = 0;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
- t->last_incoming_stream_id = 0;
- t->destroying = 0;
- t->closed = 0;
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
- t->expect_continuation_stream_id = 0;
- t->pings = NULL;
- t->ping_count = 0;
- t->ping_capacity = 0;
t->ping_counter = gpr_now().tv_nsec;
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- t->pending_goaways = NULL;
- t->num_pending_goaways = 0;
- t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
@@ -462,7 +490,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
needed.
TODO(ctiller): tune this */
grpc_chttp2_stream_map_init(&t->stream_map, 8);
- memset(&t->lists, 0, sizeof(t->lists));
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
@@ -503,7 +530,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_mu_lock(&t->mu);
t->calling_back = 1;
- ref_transport(t);
+ ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
@@ -515,7 +542,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
- ref_transport(t);
+ ref_transport(t); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
unref_transport(t);
@@ -573,10 +600,12 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
}
static int init_stream(grpc_transport *gt, grpc_stream *gs,
- const void *server_data) {
+ const void *server_data, grpc_transport_op *initial_op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
+ memset(s, 0, sizeof(*s));
+
ref_transport(t);
if (!server_data) {
@@ -585,6 +614,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->outgoing_window = 0;
s->incoming_window = 0;
} else {
+ /* already locked */
s->id = (gpr_uint32)(gpr_uintptr)server_data;
s->outgoing_window =
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
@@ -594,24 +624,13 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
}
- s->queued_write_closed = 0;
- s->sending_write_closed = 0;
- s->sent_write_closed = 0;
- s->read_closed = 0;
- s->cancelled = 0;
- s->allow_window_updates = 0;
- s->published_close = 0;
- s->incoming_metadata_count = 0;
- s->incoming_metadata_capacity = 0;
- s->incoming_metadata = NULL;
s->incoming_deadline = gpr_inf_future;
- memset(&s->links, 0, sizeof(s->links));
- memset(&s->included, 0, sizeof(s->included));
- grpc_sopb_init(&s->outgoing_sopb);
grpc_sopb_init(&s->writing_sopb);
grpc_sopb_init(&s->callback_sopb);
grpc_chttp2_data_parser_init(&s->parser);
+ if (initial_op) perform_op_locked(t, s, initial_op);
+
if (!server_data) {
unlock(t);
}
@@ -644,10 +663,16 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock(&t->mu);
- grpc_sopb_destroy(&s->outgoing_sopb);
+ GPR_ASSERT(s->outgoing_sopb == NULL);
+ GPR_ASSERT(s->incoming_sopb == NULL);
grpc_sopb_destroy(&s->writing_sopb);
grpc_sopb_destroy(&s->callback_sopb);
grpc_chttp2_data_parser_destroy(&s->parser);
+ for (i = 0; i < s->incoming_metadata_count; i++) {
+ grpc_mdelem_unref(s->incoming_metadata[i].md);
+ }
+ gpr_free(s->incoming_metadata);
+ gpr_free(s->old_incoming_metadata);
unref_transport(t);
}
@@ -710,8 +735,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
- if (id == PENDING_CALLBACKS)
- GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@@ -720,6 +743,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) {
static void remove_from_stream_map(transport *t, stream *s) {
if (s->id == 0) return;
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
+ t->is_client ? "CLI" : "SVR", s->id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
maybe_start_some_streams(t);
}
@@ -764,6 +789,8 @@ static void unlock(transport *t) {
finalize_cancellations(t);
}
+ finish_reads(t);
+
/* gather any callbacks that need to be made */
if (!t->calling_back && cb) {
perform_callbacks = prepare_callbacks(t);
@@ -867,21 +894,24 @@ static int prepare_write(transport *t) {
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
s->outgoing_window > 0) {
window_delta = grpc_chttp2_preencode(
- s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
+ s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;
- s->sending_write_closed =
- s->queued_write_closed && s->outgoing_sopb.nops == 0;
- if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
+ if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
+ s->outgoing_sopb->nops == 0) {
+ s->send_closed = 1;
+ }
+ if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
}
- /* if there are still writes to do and the stream still has window
- available, then schedule a further write */
- if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) {
- GPR_ASSERT(!t->outgoing_window);
+ /* we should either exhaust window or have no ops left, but not both */
+ if (s->outgoing_sopb->nops == 0) {
+ s->outgoing_sopb = NULL;
+ schedule_cb(t, s->send_done_closure, 1);
+ } else if (s->outgoing_window) {
stream_list_add_tail(t, s, WRITABLE);
}
}
@@ -914,10 +944,9 @@ static void finalize_outbuf(transport *t) {
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
- s->sending_write_closed, s->id, &t->hpack_compressor,
- &t->outbuf);
+ s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
s->writing_sopb.nops = 0;
- if (s->sending_write_closed) {
+ if (s->send_closed) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
@@ -931,8 +960,10 @@ static void finish_write_common(transport *t, int success) {
drop_connection(t);
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
- s->sent_write_closed = 1;
- if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
+ s->write_state = WRITE_STATE_SENT_CLOSE;
+ if (1||!s->cancelled) {
+ maybe_finish_read(t, s);
+ }
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@@ -982,6 +1013,9 @@ static void maybe_start_some_streams(transport *t) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) break;
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
+ t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
+
GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id;
t->next_stream_id += 2;
@@ -994,43 +1028,63 @@ static void maybe_start_some_streams(transport *t) {
}
}
-static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
- size_t ops_count, int is_last) {
- transport *t = (transport *)gt;
- stream *s = (stream *)gs;
-
- lock(t);
-
- if (is_last) {
- s->queued_write_closed = 1;
- }
- if (!s->cancelled) {
- grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
- if (s->id == 0) {
- stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
- maybe_start_some_streams(t);
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ cancel_stream(
+ t, s, op->cancel_with_status,
+ grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
+ op->cancel_message, 1);
+ }
+
+ if (op->send_ops) {
+ GPR_ASSERT(s->outgoing_sopb == NULL);
+ s->send_done_closure.cb = op->on_done_send;
+ s->send_done_closure.user_data = op->send_user_data;
+ if (!s->cancelled) {
+ s->outgoing_sopb = op->send_ops;
+ if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
+ s->write_state = WRITE_STATE_QUEUED_CLOSE;
+ }
+ if (s->id == 0) {
+ IF_TRACING(gpr_log(GPR_DEBUG,
+ "HTTP:%s: New stream %p waiting for concurrency",
+ t->is_client ? "CLI" : "SVR", s));
+ stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
+ maybe_start_some_streams(t);
+ } else if (s->outgoing_window > 0) {
+ stream_list_join(t, s, WRITABLE);
+ }
} else {
- stream_list_join(t, s, WRITABLE);
+ schedule_nuke_sopb(t, op->send_ops);
+ schedule_cb(t, s->send_done_closure, 0);
}
- } else {
- grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
}
- if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
- !s->published_close) {
- stream_list_join(t, s, PENDING_CALLBACKS);
+
+ if (op->recv_ops) {
+ GPR_ASSERT(s->incoming_sopb == NULL);
+ s->recv_done_closure.cb = op->on_done_recv;
+ s->recv_done_closure.user_data = op->recv_user_data;
+ s->incoming_sopb = op->recv_ops;
+ s->incoming_sopb->nops = 0;
+ s->publish_state = op->recv_state;
+ gpr_free(s->old_incoming_metadata);
+ s->old_incoming_metadata = NULL;
+ maybe_finish_read(t, s);
+ maybe_join_window_updates(t, s);
}
- unlock(t);
+ if (op->bind_pollset) {
+ add_to_pollset_locked(t, op->bind_pollset);
+ }
}
-static void abort_stream(grpc_transport *gt, grpc_stream *gs,
- grpc_status_code status) {
+static void perform_op(grpc_transport *gt, grpc_stream *gs,
+ grpc_transport_op *op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
lock(t);
- cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
- 1);
+ perform_op_locked(t, s, op);
unlock(t);
}
@@ -1069,8 +1123,8 @@ static void finalize_cancellations(transport *t) {
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->read_closed = 1;
- s->sent_write_closed = 1;
- stream_list_join(t, s, PENDING_CALLBACKS);
+ s->write_state = WRITE_STATE_SENT_CLOSE;
+ maybe_finish_read(t, s);
}
}
@@ -1088,18 +1142,24 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
- int send_rst) {
+ grpc_mdstr *optional_message, int send_rst) {
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
- had_outgoing = s->outgoing_sopb.nops != 0;
+ had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
- schedule_nuke_sopb(t, &s->outgoing_sopb);
+ if (s->outgoing_sopb) {
+ schedule_nuke_sopb(t, s->outgoing_sopb);
+ s->outgoing_sopb = NULL;
+ stream_list_remove(t, s, WRITABLE);
+ schedule_cb(t, s->send_done_closure, 0);
+ }
if (s->cancelled) {
send_rst = 0;
- } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
+ } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
+ had_outgoing) {
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);
@@ -1107,17 +1167,26 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
add_incoming_metadata(
t, s,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- switch (local_status) {
- case GRPC_STATUS_CANCELLED:
- add_incoming_metadata(
- t, s, grpc_mdelem_from_strings(t->metadata_context,
- "grpc-message", "Cancelled"));
- break;
- default:
- break;
+ if (!optional_message) {
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
+ } else {
+ add_incoming_metadata(
+ t, s,
+ grpc_mdelem_from_metadata_strings(
+ t->metadata_context,
+ grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
+ grpc_mdstr_ref(optional_message)));
}
-
- stream_list_join(t, s, PENDING_CALLBACKS);
+ add_metadata_batch(t, s);
+ maybe_finish_read(t, s);
}
}
if (!id) send_rst = 0;
@@ -1125,24 +1194,29 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
gpr_slice_buffer_add(&t->qbuf,
grpc_chttp2_rst_stream_create(id, error_code));
}
+ if (optional_message) {
+ grpc_mdstr_unref(optional_message);
+ }
}
static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst) {
cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
- send_rst);
+ NULL, send_rst);
}
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst) {
- cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
+ grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message, int send_rst) {
+ cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
+ send_rst);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
- GRPC_CHTTP2_INTERNAL_ERROR, 0);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
}
static void end_all_the_calls(transport *t) {
@@ -1156,8 +1230,14 @@ static void drop_connection(transport *t) {
end_all_the_calls(t);
}
+static void maybe_finish_read(transport *t, stream *s) {
+ if (s->incoming_sopb) {
+ stream_list_join(t, s, FINISHED_READ_OP);
+ }
+}
+
static void maybe_join_window_updates(transport *t, stream *s) {
- if (s->allow_window_updates &&
+ if (s->incoming_sopb != NULL &&
s->incoming_window <
t->settings[LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
@@ -1166,21 +1246,6 @@ static void maybe_join_window_updates(transport *t, stream *s) {
}
}
-static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
- int allow) {
- transport *t = (transport *)tp;
- stream *s = (stream *)sp;
-
- lock(t);
- s->allow_window_updates = allow;
- if (allow) {
- maybe_join_window_updates(t, s);
- } else {
- stream_list_remove(t, s, WINDOW_UPDATE);
- }
- unlock(t);
-}
-
static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
if (t->incoming_frame_size > t->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
@@ -1254,7 +1319,7 @@ static int init_data_frame_parser(transport *t) {
case GRPC_CHTTP2_STREAM_ERROR:
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_INTERNAL_ERROR),
- GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
return init_skip_frame(t, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
drop_connection(t);
@@ -1273,11 +1338,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT(s);
- IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
- grpc_mdstr_as_c_string(md->key),
- grpc_mdstr_as_c_string(md->value)));
+ IF_TRACING(gpr_log(
+ GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
- stream_list_join(t, s, PENDING_CALLBACKS);
if (md->key == t->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
if (!cached_timeout) {
@@ -1296,6 +1360,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
} else {
add_incoming_metadata(t, s, md);
}
+ maybe_finish_read(t, s);
}
static int init_header_frame_parser(transport *t, int is_continuation) {
@@ -1333,7 +1398,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
gpr_log(GPR_ERROR,
"ignoring out of order new stream request on server; last stream "
"id=%d, new stream id=%d",
- t->last_incoming_stream_id, t->incoming_stream);
+ t->last_incoming_stream_id, t->incoming_stream_id);
+ return init_skip_frame(t, 1);
+ } else if ((t->incoming_stream_id & 1) == 0) {
+ gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id);
return init_skip_frame(t, 1);
}
t->incoming_stream = NULL;
@@ -1470,33 +1538,20 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
-static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
-
static void add_metadata_batch(transport *t, stream *s) {
grpc_metadata_batch b;
- size_t i;
- b.list.head = &s->incoming_metadata[0];
- b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
+ b.list.head = NULL;
+ /* Store away the last element of the list, so that in patch_metadata_ops
+ we can reconstitute the list.
+ We can't do list building here as later incoming metadata may reallocate
+ the underlying array. */
+ b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = s->incoming_deadline;
-
- for (i = 1; i < s->incoming_metadata_count; i++) {
- s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
- s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
- }
- s->incoming_metadata[0].prev = NULL;
- s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
+ s->incoming_deadline = gpr_inf_future;
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
- grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
- s->incoming_metadata);
-
- /* reset */
- s->incoming_deadline = gpr_inf_future;
- s->incoming_metadata = NULL;
- s->incoming_metadata_count = 0;
- s->incoming_metadata_capacity = 0;
}
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
@@ -1507,14 +1562,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
case GRPC_CHTTP2_PARSE_OK:
if (st.end_of_stream) {
t->incoming_stream->read_closed = 1;
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.need_flush_reads) {
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.metadata_boundary) {
add_metadata_batch(t, t->incoming_stream);
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.ack_settings) {
gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
@@ -1551,11 +1606,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
}
if (st.initial_window_update) {
for (i = 0; i < t->stream_map.count; i++) {
- stream *s = (stream*)(t->stream_map.values[i]);
+ stream *s = (stream *)(t->stream_map.values[i]);
int was_window_empty = s->outgoing_window <= 0;
s->outgoing_window += st.initial_window_update;
- if (was_window_empty && s->outgoing_window > 0 &&
- s->outgoing_sopb.nops > 0) {
+ if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
+ s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
@@ -1569,12 +1624,13 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_FLOW_CONTROL_ERROR),
- GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
+ GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else {
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
flag that */
- if (was_window_empty && s->outgoing_sopb.nops) {
+ if (was_window_empty && s->outgoing_sopb &&
+ s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
@@ -1836,53 +1892,135 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN;
}
-static int prepare_callbacks(transport *t) {
- stream *s;
- int n = 0;
- while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
- int execute = 1;
-
- s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
- if (s->callback_state == GRPC_STREAM_CLOSED) {
- remove_from_stream_map(t, s);
- if (s->published_close) {
- execute = 0;
- } else if (s->incoming_metadata_count) {
- add_metadata_batch(t, s);
- }
- s->published_close = 1;
+static void patch_metadata_ops(stream *s) {
+ grpc_stream_op *ops = s->incoming_sopb->ops;
+ size_t nops = s->incoming_sopb->nops;
+ size_t i;
+ size_t j;
+ size_t mdidx = 0;
+ size_t last_mdidx;
+ int found_metadata = 0;
+
+ /* rework the array of metadata into a linked list, making use
+ of the breadcrumbs we left in metadata batches during
+ add_metadata_batch */
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ found_metadata = 1;
+ /* we left a breadcrumb indicating where the end of this list is,
+ and since we add sequentially, we know from the end of the last
+ segment where this segment begins */
+ last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
+ GPR_ASSERT(last_mdidx > mdidx);
+ GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
+ /* turn the array into a doubly linked list */
+ op->data.metadata.list.head = &s->incoming_metadata[mdidx];
+ op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
+ for (j = mdidx + 1; j < last_mdidx; j++) {
+ s->incoming_metadata[j].prev = &s->incoming_metadata[j-1];
+ s->incoming_metadata[j-1].next = &s->incoming_metadata[j];
+ }
+ s->incoming_metadata[mdidx].prev = NULL;
+ s->incoming_metadata[last_mdidx-1].next = NULL;
+ /* track where we're up to */
+ mdidx = last_mdidx;
+ }
+ if (found_metadata) {
+ s->old_incoming_metadata = s->incoming_metadata;
+ if (mdidx != s->incoming_metadata_count) {
+ /* we have a partially read metadata batch still in incoming_metadata */
+ size_t new_count = s->incoming_metadata_count - mdidx;
+ size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count;
+ GPR_ASSERT(mdidx < s->incoming_metadata_count);
+ s->incoming_metadata = gpr_malloc(copy_bytes);
+ memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes);
+ s->incoming_metadata_count = s->incoming_metadata_capacity = new_count;
+ } else {
+ s->incoming_metadata = NULL;
+ s->incoming_metadata_count = 0;
+ s->incoming_metadata_capacity = 0;
}
+ }
+}
- grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
+static void finish_reads(transport *t) {
+ stream *s;
- if (execute) {
- stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
- n = 1;
+ while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
+ int publish = 0;
+ GPR_ASSERT(s->incoming_sopb);
+ *s->publish_state =
+ compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
+ if (*s->publish_state != s->published_state) {
+ s->published_state = *s->publish_state;
+ publish = 1;
+ if (s->published_state == GRPC_STREAM_CLOSED) {
+ remove_from_stream_map(t, s);
+ }
+ }
+ if (s->parser.incoming_sopb.nops > 0) {
+ grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
+ publish = 1;
+ }
+ if (publish) {
+ if (s->incoming_metadata_count > 0) {
+ patch_metadata_ops(s);
+ }
+ s->incoming_sopb = NULL;
+ schedule_cb(t, s->recv_done_closure, 1);
}
}
- return n;
+
+}
+
+static void schedule_cb(transport *t, op_closure closure, int success) {
+ if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
+ t->pending_callbacks.capacity =
+ GPR_MAX(t->pending_callbacks.capacity * 2, 8);
+ t->pending_callbacks.callbacks =
+ gpr_realloc(t->pending_callbacks.callbacks,
+ t->pending_callbacks.capacity *
+ sizeof(*t->pending_callbacks.callbacks));
+ }
+ closure.success = success;
+ t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
+}
+
+static int prepare_callbacks(transport *t) {
+ op_closure_array temp = t->pending_callbacks;
+ t->pending_callbacks = t->executing_callbacks;
+ t->executing_callbacks = temp;
+ return t->executing_callbacks.count > 0;
}
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
- stream *s;
- while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
- size_t nops = s->callback_sopb.nops;
- s->callback_sopb.nops = 0;
- cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
- s->callback_sopb.ops, nops, s->callback_state);
+ size_t i;
+ for (i = 0; i < t->executing_callbacks.count; i++) {
+ op_closure c = t->executing_callbacks.callbacks[i];
+ c.cb(c.user_data, c.success);
}
+ t->executing_callbacks.count = 0;
}
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
cb->closed(t->cb_user_data, &t->base);
}
-static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
- transport *t = (transport *)gt;
- lock(t);
+/*
+ * POLLSET STUFF
+ */
+
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
if (t->ep) {
grpc_endpoint_add_to_pollset(t->ep, pollset);
}
+}
+
+static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
+ transport *t = (transport *)gt;
+ lock(t);
+ add_to_pollset_locked(t, pollset);
unlock(t);
}
@@ -1891,9 +2029,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
- sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
- send_ping, destroy_transport};
+ sizeof(stream), init_stream, perform_op,
+ add_to_pollset, destroy_stream, goaway,
+ close_transport, send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,