aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c88
1 files changed, 62 insertions, 26 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 86bd5d2098..9aee7ca4f1 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -91,10 +91,6 @@ typedef enum {
/* streams that are waiting to start because there are too many concurrent
streams on the connection */
WAITING_FOR_CONCURRENCY,
- /* streams that want to callback the application */
- PENDING_CALLBACKS,
- /* streams that *ARE* calling back to the application */
- EXECUTING_CALLBACKS,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
@@ -182,6 +178,18 @@ typedef struct {
gpr_slice debug;
} pending_goaway;
+typedef struct {
+ void (*cb)(void *user_data, int success);
+ void *user_data;
+ int status;
+} op_closure;
+
+typedef struct {
+ op_closure *callbacks;
+ size_t count;
+ size_t capacity;
+} op_closure_array;
+
struct transport {
grpc_transport base; /* must be first */
const grpc_transport_callbacks *cb;
@@ -202,6 +210,10 @@ struct transport {
gpr_uint8 closed;
error_state error_state;
+ /* queued callbacks */
+ op_closure_array pending_callbacks;
+ op_closure_array executing_callbacks;
+
/* stream indexing */
gpr_uint32 next_stream_id;
gpr_uint32 last_incoming_stream_id;
@@ -289,6 +301,9 @@ struct stream {
gpr_uint8 allow_window_updates;
gpr_uint8 published_close;
+ op_closure send_done_closure;
+ op_closure recv_done_closure;
+
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
@@ -416,6 +431,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
+ memset(t, 0, sizeof(*t));
+
t->base.vtable = &vtable;
t->ep = ep;
/* one ref is for destroy, the other for when ep becomes NULL */
@@ -427,27 +444,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->reading = 1;
- t->writing = 0;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
- t->last_incoming_stream_id = 0;
- t->destroying = 0;
- t->closed = 0;
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
- t->expect_continuation_stream_id = 0;
- t->pings = NULL;
- t->ping_count = 0;
- t->ping_capacity = 0;
t->ping_counter = gpr_now().tv_nsec;
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- t->pending_goaways = NULL;
- t->num_pending_goaways = 0;
- t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
@@ -462,7 +468,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
needed.
TODO(ctiller): tune this */
grpc_chttp2_stream_map_init(&t->stream_map, 8);
- memset(&t->lists, 0, sizeof(t->lists));
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
@@ -708,8 +713,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
- if (id == PENDING_CALLBACKS)
- GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@@ -988,6 +991,32 @@ static void maybe_start_some_streams(transport *t) {
}
}
+static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) {
+ transport *t = (transport *)gt;
+ stream *s = (stream *)gs;
+
+ lock(t);
+
+ if (op->send_ops) {
+ abort();
+ }
+
+ if (op->recv_ops) {
+ abort();
+ }
+
+ if (op->bind_pollset) {
+ abort();
+ }
+
+ if (op->cancel_with_status) {
+ abort();
+ }
+
+ unlock(t);
+}
+
+#if 0
static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
size_t ops_count, int is_last) {
transport *t = (transport *)gt;
@@ -1016,6 +1045,7 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
unlock(t);
}
+#endif
static void abort_stream(grpc_transport *gt, grpc_stream *gs,
grpc_status_code status) {
@@ -1831,6 +1861,12 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
}
static int prepare_callbacks(transport *t) {
+ op_closure_array temp = t->pending_callbacks;
+ t->pending_callbacks = t->executing_callbacks;
+ t->executing_callbacks = temp;
+ return t->executing_callbacks.count > 0;
+
+#if 0
stream *s;
int n = 0;
while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
@@ -1855,16 +1891,16 @@ static int prepare_callbacks(transport *t) {
}
}
return n;
+#endif
}
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
- stream *s;
- while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
- size_t nops = s->callback_sopb.nops;
- s->callback_sopb.nops = 0;
- cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
- s->callback_sopb.ops, nops, s->callback_state);
+ size_t i;
+ for (i = 0; i < t->executing_callbacks.count; i++) {
+ op_closure c = t->executing_callbacks.callbacks[i];
+ c.cb(c.user_data, c.status);
}
+ t->executing_callbacks.count = 0;
}
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
@@ -1885,8 +1921,8 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
- sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
+ sizeof(stream), init_stream, perform_op,
+ add_to_pollset, destroy_stream, goaway, close_transport,
send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,