aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-11 16:26:03 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-11 16:26:03 -0700
commit99f8055965d8577d876763a70d5feb75f5752d24 (patch)
tree05b3cd61daa4e0659ffa5c5f2d4637fe5b764fb8
parent3cbfcb4dcd3cbf785f7391a8e4c5f36e6615c9d4 (diff)
Splitting progress
-rw-r--r--src/core/transport/chttp2_transport.c149
1 files changed, 111 insertions, 38 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index f3af5b1320..c4fa13c86c 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -221,8 +221,6 @@ typedef struct {
struct transport {
grpc_transport base; /* must be first */
- const grpc_transport_callbacks *cb;
- void *cb_user_data;
grpc_endpoint *ep;
grpc_mdctx *metadata_context;
gpr_refcount refs;
@@ -233,10 +231,6 @@ struct transport {
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
- gpr_uint8 parsing;
- gpr_uint8 writing;
- /** are we calling back (via cb) with a channel-level event */
- gpr_uint8 calling_back_channel;
/** are we calling back any grpc_transport_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
@@ -304,9 +298,6 @@ struct transport {
grpc_chttp2_parse_state *state,
gpr_slice slice, int is_last);
- gpr_slice_buffer outbuf;
- gpr_slice_buffer qbuf;
-
stream_list lists[STREAM_LIST_COUNT];
grpc_chttp2_stream_map stream_map;
@@ -318,9 +309,49 @@ struct transport {
size_t ping_count;
size_t ping_capacity;
gpr_int64 ping_counter;
+
+ struct {
+ /** data to write next write */
+ gpr_slice_buffer qbuf;
+ } global;
+
+ struct {
+ /** is a thread currently writing */
+ gpr_uint8 executing;
+ /** closure to execute this action */
+ grpc_iomgr_closure action;
+ /** data to write now */
+ gpr_slice_buffer outbuf;
+ } writing;
+
+ struct {
+ /** is a thread currently parsing */
+ gpr_uint8 executing;
+ /** data to write later - after parsing */
+ gpr_slice_buffer qbuf;
+ } parsing;
+
+ struct {
+ /** is a thread currently performing channel callbacks */
+ gpr_uint8 executing;
+ const grpc_transport_callbacks *cb;
+ void *cb_user_data;
+ } channel_callback;
};
struct stream {
+ struct {
+ int unused;
+ } global;
+
+ struct {
+ int unused;
+ } writing;
+
+ struct {
+ int unused;
+ } parsing;
+
gpr_uint32 id;
gpr_uint32 incoming_window;
@@ -361,6 +392,13 @@ struct stream {
grpc_stream_op_buffer callback_sopb;
};
+#define MAX_POST_ACTIONS 8
+
+typedef struct {
+ size_t num_post_actions;
+ grpc_iomgr_closure *post_actions[MAX_POST_ACTIONS];
+} unlock_ctx;
+
static const grpc_transport_vtable vtable;
static void push_setting(transport *t, grpc_chttp2_setting_id id,
@@ -376,6 +414,12 @@ static void perform_write(transport *t, grpc_endpoint *ep);
static void lock(transport *t);
static void unlock(transport *t);
+static void unlock_check_writes(transport* t, unlock_ctx *uctx);
+ static void unlock_check_cancellations(transport* t, unlock_ctx *uctx);
+ static void unlock_check_parser(transport* t, unlock_ctx *uctx);
+ static void unlock_check_op_callbacks(transport* t, unlock_ctx *uctx);
+ static void unlock_check_channel_callbacks(transport* t, unlock_ctx *uctx);
+
static void drop_connection(transport *t);
static void end_all_the_calls(transport *t);
@@ -426,8 +470,9 @@ static void destruct_transport(transport *t) {
GPR_ASSERT(t->ep == NULL);
- gpr_slice_buffer_destroy(&t->outbuf);
- gpr_slice_buffer_destroy(&t->qbuf);
+ gpr_slice_buffer_destroy(&t->global.qbuf);
+ gpr_slice_buffer_destroy(&t->writing.outbuf);
+ gpr_slice_buffer_destroy(&t->parsing.qbuf);
grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
@@ -509,12 +554,13 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->ping_counter = gpr_now().tv_nsec;
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- gpr_slice_buffer_init(&t->outbuf);
- gpr_slice_buffer_init(&t->qbuf);
+ gpr_slice_buffer_init(&t->global.qbuf);
+ gpr_slice_buffer_init(&t->writing.outbuf);
+ gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
if (is_client) {
- gpr_slice_buffer_add(&t->qbuf,
+ gpr_slice_buffer_add(&t->global.qbuf,
gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
@@ -575,16 +621,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
}
gpr_mu_lock(&t->mu);
- t->calling_back_channel = 1;
+ t->channel_callback.executing = 1;
ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
lock(t);
- t->cb = sr.callbacks;
- t->cb_user_data = sr.user_data;
- t->calling_back_channel = 0;
+ t->channel_callback.cb = sr.callbacks;
+ t->channel_callback.cb_user_data = sr.user_data;
+ t->channel_callback.executing = 0;
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
@@ -605,7 +651,7 @@ static void destroy_transport(grpc_transport *gt) {
We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */
- while (t->calling_back_channel || t->writing) {
+ while (t->channel_callback.executing || t->writing.executing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
}
drop_connection(t);
@@ -618,7 +664,7 @@ static void destroy_transport(grpc_transport *gt) {
It's shutdown path, so I don't believe an extra lock pair is going to be
problematic for performance. */
lock(t);
- GPR_ASSERT(!t->cb);
+ GPR_ASSERT(!t->channel_callback.cb);
unlock(t);
unref_transport(t);
@@ -646,7 +692,7 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
lock(t);
grpc_chttp2_goaway_append(t->last_incoming_stream_id,
grpc_chttp2_grpc_status_to_http2_error(status),
- debug_data, &t->qbuf);
+ debug_data, &t->global.qbuf);
unlock(t);
}
@@ -806,6 +852,26 @@ static void remove_from_stream_map(transport *t, stream *s) {
static void lock(transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(transport *t) {
+ unlock_ctx uctx;
+ size_t i;
+
+ memset(&uctx, 0, sizeof(uctx));
+
+ unlock_check_writes(t, &uctx);
+ unlock_check_cancellations(t, &uctx);
+ unlock_check_parser(t, &uctx);
+ unlock_check_op_callbacks(t, &uctx);
+ unlock_check_channel_callbacks(t, &uctx);
+
+ gpr_mu_unlock(&t->mu);
+
+ for (i = 0; i < uctx.num_post_actions; i++) {
+ grpc_iomgr_closure* closure = uctx.post_actions[i];
+ closure->cb(closure->cb_arg, 1);
+ }
+
+
+#if 0
int start_write = 0;
int perform_callbacks = 0;
int call_closed = 0;
@@ -814,7 +880,7 @@ static void unlock(transport *t) {
pending_goaway *goaways = NULL;
grpc_endpoint *ep = t->ep;
grpc_stream_op_buffer nuke_now;
- const grpc_transport_callbacks *cb = t->cb;
+ const grpc_transport_callbacks *cb = t->channel_callback.cb;
GRPC_TIMER_BEGIN(GRPC_PTAG_HTTP2_UNLOCK, 0);
@@ -824,18 +890,18 @@ static void unlock(transport *t) {
}
/* see if we need to trigger a write - and if so, get the data ready */
- if (ep && !t->writing) {
- t->writing = start_write = prepare_write(t);
+ if (ep && !t->writing.executing) {
+ t->writing.executing = start_write = prepare_write(t);
if (start_write) {
ref_transport(t);
}
}
- if (!t->writing) {
+ if (!t->writing.executing) {
finalize_cancellations(t);
}
- if (!t->parsing) {
+ if (!t->parsing.executing) {
finish_reads(t);
}
@@ -845,8 +911,8 @@ static void unlock(transport *t) {
if (perform_callbacks) ref_transport(t);
}
- if (!t->calling_back_channel && cb) {
- if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
+ if (!t->channel_callback.executing && cb) {
+ if (t->error_state == ERROR_STATE_SEEN && !t->writing.executing) {
call_closed = 1;
t->calling_back_channel = 1;
t->cb = NULL; /* no more callbacks */
@@ -906,6 +972,7 @@ static void unlock(transport *t) {
gpr_free(goaways);
GRPC_TIMER_END(GRPC_PTAG_HTTP2_UNLOCK, 0);
+#endif
}
/*
@@ -927,19 +994,22 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
}
}
-static int prepare_write(transport *t) {
+static void unlock_check_writes(transport *t, unlock_ctx *uctx) {
stream *s;
gpr_uint32 window_delta;
- /* simple writes are queued to qbuf, and flushed here */
- if (!t->parsing) {
- gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
- GPR_ASSERT(t->qbuf.count == 0);
+ /* don't do anything if we are already writing */
+ if (t->writing.executing) {
+ return;
}
+ /* simple writes are queued to qbuf, and flushed here */
+ gpr_slice_buffer_swap(&t->global.qbuf, &t->writing.outbuf);
+ GPR_ASSERT(t->global.qbuf.count == 0);
+
if (t->dirtied_local_settings && !t->sent_local_settings) {
gpr_slice_buffer_add(
- &t->outbuf, grpc_chttp2_settings_create(
+ &t->writing.outbuf, grpc_chttp2_settings_create(
t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
t->force_send_settings = 0;
@@ -980,7 +1050,7 @@ static int prepare_write(transport *t) {
}
}
- if (!t->parsing) {
+ if (!t->parsing.executing) {
/* for each stream that wants to update its window, add that window here */
while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
window_delta =
@@ -988,7 +1058,7 @@ static int prepare_write(transport *t) {
s->incoming_window;
if (!s->read_closed && window_delta) {
gpr_slice_buffer_add(
- &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
+ &t->writing.outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
s->incoming_window += window_delta;
}
@@ -997,14 +1067,17 @@ static int prepare_write(transport *t) {
/* if the transport is ready to send a window update, do so here also */
if (t->incoming_window < t->connection_window_target * 3 / 4) {
window_delta = t->connection_window_target - t->incoming_window;
- gpr_slice_buffer_add(&t->outbuf,
+ gpr_slice_buffer_add(&t->writing.outbuf,
grpc_chttp2_window_update_create(0, window_delta));
FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
t->incoming_window += window_delta;
}
}
- return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
+ if (t->writing.outbuf.length > 0) {
+ uctx->post_actions[uctx->num_post_actions++] = &t->writing.action;
+ t->writing.executing = 1;
+ }
}
static void finalize_outbuf(transport *t) {