diff options
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 88 |
1 files changed, 62 insertions, 26 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 86bd5d2098..9aee7ca4f1 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -91,10 +91,6 @@ typedef enum { /* streams that are waiting to start because there are too many concurrent streams on the connection */ WAITING_FOR_CONCURRENCY, - /* streams that want to callback the application */ - PENDING_CALLBACKS, - /* streams that *ARE* calling back to the application */ - EXECUTING_CALLBACKS, STREAM_LIST_COUNT /* must be last */ } stream_list_id; @@ -182,6 +178,18 @@ typedef struct { gpr_slice debug; } pending_goaway; +typedef struct { + void (*cb)(void *user_data, int success); + void *user_data; + int status; +} op_closure; + +typedef struct { + op_closure *callbacks; + size_t count; + size_t capacity; +} op_closure_array; + struct transport { grpc_transport base; /* must be first */ const grpc_transport_callbacks *cb; @@ -202,6 +210,10 @@ struct transport { gpr_uint8 closed; error_state error_state; + /* queued callbacks */ + op_closure_array pending_callbacks; + op_closure_array executing_callbacks; + /* stream indexing */ gpr_uint32 next_stream_id; gpr_uint32 last_incoming_stream_id; @@ -289,6 +301,9 @@ struct stream { gpr_uint8 allow_window_updates; gpr_uint8 published_close; + op_closure send_done_closure; + op_closure recv_done_closure; + stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; @@ -416,6 +431,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); + memset(t, 0, sizeof(*t)); + t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ @@ -427,27 +444,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); t->reading = 1; - t->writing = 0; t->error_state = ERROR_STATE_NONE; t->next_stream_id = is_client ? 1 : 2; - t->last_incoming_stream_id = 0; - t->destroying = 0; - t->closed = 0; t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; - t->expect_continuation_stream_id = 0; - t->pings = NULL; - t->ping_count = 0; - t->ping_capacity = 0; t->ping_counter = gpr_now().tv_nsec; grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); grpc_chttp2_goaway_parser_init(&t->goaway_parser); - t->pending_goaways = NULL; - t->num_pending_goaways = 0; - t->cap_pending_goaways = 0; gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->qbuf); grpc_sopb_init(&t->nuke_later_sopb); @@ -462,7 +468,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, needed. TODO(ctiller): tune this */ grpc_chttp2_stream_map_init(&t->stream_map, 8); - memset(&t->lists, 0, sizeof(t->lists)); /* copy in initial settings to all setting sets */ for (i = 0; i < NUM_SETTING_SETS; i++) { @@ -708,8 +713,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { } static void stream_list_join(transport *t, stream *s, stream_list_id id) { - if (id == PENDING_CALLBACKS) - GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE); if (s->included[id]) { return; } @@ -988,6 +991,32 @@ static void maybe_start_some_streams(transport *t) { } } +static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { + transport *t = (transport *)gt; + stream *s = (stream *)gs; + + lock(t); + + if (op->send_ops) { + abort(); + } + + if (op->recv_ops) { + abort(); + } + + if (op->bind_pollset) { + abort(); + } + + if (op->cancel_with_status) { + abort(); + } + + unlock(t); +} + +#if 0 static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, size_t ops_count, int is_last) { transport *t = (transport *)gt; @@ -1016,6 +1045,7 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, unlock(t); } +#endif static void abort_stream(grpc_transport *gt, grpc_stream *gs, grpc_status_code status) { @@ -1831,6 +1861,12 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, } static int prepare_callbacks(transport *t) { + op_closure_array temp = t->pending_callbacks; + t->pending_callbacks = t->executing_callbacks; + t->executing_callbacks = temp; + return t->executing_callbacks.count > 0; + +#if 0 stream *s; int n = 0; while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { @@ -1855,16 +1891,16 @@ static int prepare_callbacks(transport *t) { } } return n; +#endif } static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { - stream *s; - while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) { - size_t nops = s->callback_sopb.nops; - s->callback_sopb.nops = 0; - cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, - s->callback_sopb.ops, nops, s->callback_state); + size_t i; + for (i = 0; i < t->executing_callbacks.count; i++) { + op_closure c = t->executing_callbacks.callbacks[i]; + c.cb(c.user_data, c.status); } + t->executing_callbacks.count = 0; } static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) { @@ -1885,8 +1921,8 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { */ static const grpc_transport_vtable vtable = { - sizeof(stream), init_stream, send_batch, set_allow_window_updates, - add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, + sizeof(stream), init_stream, perform_op, + add_to_pollset, destroy_stream, goaway, close_transport, send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, |