aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/channel/client_channel.c10
-rw-r--r--src/core/channel/http_client_filter.c13
-rw-r--r--src/core/channel/http_server_filter.c11
-rw-r--r--src/core/iomgr/iomgr.h2
-rw-r--r--src/core/surface/call.c37
-rw-r--r--src/core/surface/lame_client.c6
-rw-r--r--src/core/surface/server.c12
-rw-r--r--src/core/transport/chttp2_transport.c330
-rw-r--r--src/core/transport/transport.c6
-rw-r--r--src/core/transport/transport.h9
10 files changed, 243 insertions, 193 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 726196e996..711e105464 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -157,7 +157,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
channel_data *chand = elem->channel_data;
if (op->send_ops) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send(op->send_user_data, 0);
+ op->on_done_send->cb(op->on_done_send->cb_arg, 0);
}
if (op->recv_ops) {
char status[GPR_LTOA_MIN_BUFSIZE];
@@ -176,10 +176,10 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
mdb.deadline = gpr_inf_future;
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv(op->recv_user_data, 1);
+ op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
}
if (op->on_consumed) {
- op->on_consumed(op->on_consumed_user_data, 0);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
@@ -266,17 +266,15 @@ static void cc_start_transport_op(grpc_call_element *elem,
calld->s.waiting_op.send_ops = op->send_ops;
calld->s.waiting_op.is_last_send = op->is_last_send;
calld->s.waiting_op.on_done_send = op->on_done_send;
- calld->s.waiting_op.send_user_data = op->send_user_data;
}
if (op->recv_ops) {
calld->s.waiting_op.recv_ops = op->recv_ops;
calld->s.waiting_op.recv_state = op->recv_state;
calld->s.waiting_op.on_done_recv = op->on_done_recv;
- calld->s.waiting_op.recv_user_data = op->recv_user_data;
}
gpr_mu_unlock(&chand->mu);
if (op->on_consumed) {
- op->on_consumed(op->on_consumed_user_data, 0);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
break;
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 9805f325a6..62a7a1e7d9 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -43,8 +43,9 @@ typedef struct call_data {
int got_initial_metadata;
grpc_stream_op_buffer *recv_ops;
- void (*on_done_recv)(void *user_data, int success);
- void *recv_user_data;
+ grpc_iomgr_closure *on_done_recv;
+
+ grpc_iomgr_closure hc_on_recv;
} call_data;
typedef struct channel_data {
@@ -84,7 +85,7 @@ static void hc_on_recv(void *user_data, int success) {
grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
}
}
- calld->on_done_recv(calld->recv_user_data, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
@@ -117,9 +118,7 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* substitute our callback for the higher callback */
calld->recv_ops = op->recv_ops;
calld->on_done_recv = op->on_done_recv;
- calld->recv_user_data = op->recv_user_data;
- op->on_done_recv = hc_on_recv;
- op->recv_user_data = elem;
+ op->on_done_recv = &calld->hc_on_recv;
}
}
@@ -154,6 +153,8 @@ static void init_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
+ calld->on_done_recv = NULL;
+ grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
if (initial_op) hc_mutate_op(elem, initial_op);
}
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 11a53b1e70..e5ce7a5dd8 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -47,8 +47,8 @@ typedef struct call_data {
grpc_linked_mdelem status;
grpc_stream_op_buffer *recv_ops;
- void (*on_done_recv)(void *user_data, int success);
- void *recv_user_data;
+ grpc_iomgr_closure *on_done_recv;
+ grpc_iomgr_closure hs_on_recv;
} call_data;
typedef struct channel_data {
@@ -174,7 +174,7 @@ static void hs_on_recv(void *user_data, int success) {
}
}
}
- calld->on_done_recv(calld->recv_user_data, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
@@ -200,9 +200,7 @@ static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* substitute our callback for the higher callback */
calld->recv_ops = op->recv_ops;
calld->on_done_recv = op->on_done_recv;
- calld->recv_user_data = op->recv_user_data;
- op->on_done_recv = hs_on_recv;
- op->recv_user_data = elem;
+ op->on_done_recv = &calld->hs_on_recv;
}
}
@@ -238,6 +236,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
+ grpc_iomgr_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
if (initial_op) hs_mutate_op(elem, initial_op);
}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index a10e481e48..265c817db1 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -73,4 +73,6 @@ void grpc_iomgr_shutdown(void);
* Can be called from within a callback or from anywhere else */
void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 641c1cd435..872705e996 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -237,6 +237,9 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
grpc_iomgr_closure destroy_closure;
+ grpc_iomgr_closure on_done_recv;
+ grpc_iomgr_closure on_done_send;
+ grpc_iomgr_closure on_done_bind;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -255,6 +258,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description);
+static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
@@ -298,6 +302,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message);
+ grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
+ grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
+ grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
/* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata.
@@ -306,8 +313,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
memset(&initial_op, 0, sizeof(initial_op));
initial_op.recv_ops = &call->recv_ops;
initial_op.recv_state = &call->recv_state;
- initial_op.on_done_recv = call_on_done_recv;
- initial_op.recv_user_data = call;
+ initial_op.on_done_recv = &call->on_done_recv;
initial_op.context = call->context;
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
@@ -460,8 +466,7 @@ static void unlock(grpc_call *call) {
if (!call->receiving && need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
- op.on_done_recv = call_on_done_recv;
- op.recv_user_data = call;
+ op.on_done_recv = &call->on_done_recv;
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;
@@ -929,8 +934,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
break;
}
if (op->send_ops) {
- op->on_done_send = call_on_done_send;
- op->send_user_data = call;
+ op->on_done_send = &call->on_done_send;
}
return op->send_ops != NULL;
}
@@ -1105,14 +1109,31 @@ static void finished_loose_op(void *call, int success_ignored) {
GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
}
+typedef struct {
+ grpc_call *call;
+ grpc_iomgr_closure closure;
+} finished_loose_op_allocated_args;
+
+static void finished_loose_op_allocated(void *alloc, int success) {
+ finished_loose_op_allocated_args *args = alloc;
+ finished_loose_op(args->call, success);
+ gpr_free(args);
+}
+
static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);
if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
GRPC_CALL_INTERNAL_REF(call, "loose-op");
- op->on_consumed = finished_loose_op;
- op->on_consumed_user_data = call;
+ if (op->bind_pollset) {
+ op->on_consumed = &call->on_done_bind;
+ } else {
+ finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
+ args->call = call;
+ grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args);
+ op->on_consumed = &args->closure;
+ }
}
elem = CALL_ELEM_FROM_CALL(call, 0);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index b667128aef..85e1ab5554 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -56,7 +56,7 @@ static void lame_start_transport_op(grpc_call_element *elem,
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->send_ops) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send(op->send_user_data, 0);
+ op->on_done_send->cb(op->on_done_send->cb_arg, 0);
}
if (op->recv_ops) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
@@ -75,10 +75,10 @@ static void lame_start_transport_op(grpc_call_element *elem,
mdb.deadline = gpr_inf_future;
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv(op->recv_user_data, 1);
+ op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
}
if (op->on_consumed) {
- op->on_consumed(op->on_consumed_user_data, 0);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 3671efe0d0..a76a6c7812 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -183,9 +183,9 @@ struct call_data {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
- void (*on_done_recv)(void *user_data, int success);
- void *recv_user_data;
+ grpc_iomgr_closure *on_done_recv;
+ grpc_iomgr_closure server_on_recv;
grpc_iomgr_closure kill_zombie_closure;
call_data **root[CALL_LIST_COUNT];
@@ -503,7 +503,7 @@ static void server_on_recv(void *ptr, int success) {
break;
}
- calld->on_done_recv(calld->recv_user_data, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
@@ -514,9 +514,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
calld->recv_ops = op->recv_ops;
calld->recv_state = op->recv_state;
calld->on_done_recv = op->on_done_recv;
- calld->recv_user_data = op->recv_user_data;
- op->on_done_recv = server_on_recv;
- op->recv_user_data = elem;
+ op->on_done_recv = &calld->server_on_recv;
}
}
@@ -612,6 +610,8 @@ static void init_call_elem(grpc_call_element *elem,
calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem);
+ grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
+
gpr_mu_lock(&chand->server->mu);
call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
chand->num_calls++;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index c4fa13c86c..f354c9441a 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -207,18 +207,6 @@ typedef struct {
gpr_slice debug;
} pending_goaway;
-typedef struct {
- void (*cb)(void *user_data, int success);
- void *user_data;
- int success;
-} op_closure;
-
-typedef struct {
- op_closure *callbacks;
- size_t count;
- size_t capacity;
-} op_closure_array;
-
struct transport {
grpc_transport base; /* must be first */
grpc_endpoint *ep;
@@ -237,10 +225,6 @@ struct transport {
gpr_uint8 closed;
error_state error_state;
- /* queued callbacks */
- op_closure_array pending_callbacks;
- op_closure_array executing_callbacks;
-
/* stream indexing */
gpr_uint32 next_stream_id;
gpr_uint32 last_incoming_stream_id;
@@ -266,9 +250,6 @@ struct transport {
gpr_uint32 incoming_frame_size;
gpr_uint32 incoming_stream_id;
- /* hpack encoding */
- grpc_chttp2_hpack_compressor hpack_compressor;
-
/* various parsers */
grpc_chttp2_hpack_parser hpack_parser;
/* simple one shot parsers */
@@ -313,6 +294,8 @@ struct transport {
struct {
/** data to write next write */
gpr_slice_buffer qbuf;
+ /* queued callbacks */
+ grpc_iomgr_closure *pending_closures;
} global;
struct {
@@ -322,6 +305,8 @@ struct transport {
grpc_iomgr_closure action;
/** data to write now */
gpr_slice_buffer outbuf;
+ /* hpack encoding */
+ grpc_chttp2_hpack_compressor hpack_compressor;
} writing;
struct {
@@ -334,18 +319,26 @@ struct transport {
struct {
/** is a thread currently performing channel callbacks */
gpr_uint8 executing;
+ /** transport channel-level callback */
const grpc_transport_callbacks *cb;
+ /** user data for cb calls */
void *cb_user_data;
+ /** closure for notifying transport closure */
+ grpc_iomgr_closure notify_closed;
} channel_callback;
};
struct stream {
struct {
- int unused;
+ grpc_iomgr_closure *send_done_closure;
+ grpc_iomgr_closure *recv_done_closure;
} global;
struct {
- int unused;
+ /* sops that have passed flow control to be written */
+ grpc_stream_op_buffer sopb;
+ /* how strongly should we indicate closure with the next write */
+ send_closed send_closed;
} writing;
struct {
@@ -361,13 +354,9 @@ struct stream {
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
write_state write_state;
- send_closed send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
- op_closure send_done_closure;
- op_closure recv_done_closure;
-
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
@@ -383,8 +372,6 @@ struct stream {
grpc_stream_op_buffer *incoming_sopb;
grpc_stream_state *publish_state;
grpc_stream_state published_state;
- /* sops that have passed flow control to be written */
- grpc_stream_op_buffer writing_sopb;
grpc_chttp2_data_parser parser;
@@ -392,33 +379,21 @@ struct stream {
grpc_stream_op_buffer callback_sopb;
};
-#define MAX_POST_ACTIONS 8
-
-typedef struct {
- size_t num_post_actions;
- grpc_iomgr_closure *post_actions[MAX_POST_ACTIONS];
-} unlock_ctx;
-
static const grpc_transport_vtable vtable;
static void push_setting(transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
-static int prepare_callbacks(transport *t);
-static void run_callbacks(transport *t);
-static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
-
-static int prepare_write(transport *t);
-static void perform_write(transport *t, grpc_endpoint *ep);
-
static void lock(transport *t);
static void unlock(transport *t);
-static void unlock_check_writes(transport* t, unlock_ctx *uctx);
- static void unlock_check_cancellations(transport* t, unlock_ctx *uctx);
- static void unlock_check_parser(transport* t, unlock_ctx *uctx);
- static void unlock_check_op_callbacks(transport* t, unlock_ctx *uctx);
- static void unlock_check_channel_callbacks(transport* t, unlock_ctx *uctx);
+static void unlock_check_writes(transport* t);
+ static void unlock_check_cancellations(transport* t);
+ static void unlock_check_parser(transport* t);
+ static void unlock_check_channel_callbacks(transport* t);
+
+static void writing_action(void *t, int iomgr_success_ignored);
+static void notify_closed(void *t, int iomgr_success_ignored);
static void drop_connection(transport *t);
static void end_all_the_calls(transport *t);
@@ -435,7 +410,6 @@ static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst);
-static void finalize_cancellations(transport *t);
static stream *lookup_stream(transport *t, gpr_uint32 id);
static void remove_from_stream_map(transport *t, stream *s);
static void maybe_start_some_streams(transport *t);
@@ -445,10 +419,9 @@ static void become_skip_parser(transport *t);
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
-static void schedule_cb(transport *t, op_closure closure, int success);
+static void schedule_cb(transport *t, grpc_iomgr_closure *closure, int success);
static void maybe_finish_read(transport *t, stream *s, int is_parser);
static void maybe_join_window_updates(transport *t, stream *s);
-static void finish_reads(transport *t);
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
static void add_metadata_batch(transport *t, stream *s);
@@ -471,10 +444,12 @@ static void destruct_transport(transport *t) {
GPR_ASSERT(t->ep == NULL);
gpr_slice_buffer_destroy(&t->global.qbuf);
+
gpr_slice_buffer_destroy(&t->writing.outbuf);
+ grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
+
gpr_slice_buffer_destroy(&t->parsing.qbuf);
grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
- grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
grpc_mdstr_unref(t->str_grpc_timeout);
@@ -499,9 +474,6 @@ static void destruct_transport(transport *t) {
}
gpr_free(t->pings);
- gpr_free(t->pending_callbacks.callbacks);
- gpr_free(t->executing_callbacks.callbacks);
-
for (i = 0; i < t->num_pending_goaways; i++) {
gpr_slice_unref(t->pending_goaways[i].debug);
}
@@ -552,11 +524,13 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
t->ping_counter = gpr_now().tv_nsec;
- grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
gpr_slice_buffer_init(&t->global.qbuf);
gpr_slice_buffer_init(&t->writing.outbuf);
+ grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
+ grpc_iomgr_closure_init(&t->writing.action, writing_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
+ grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t);
grpc_sopb_init(&t->nuke_later_sopb);
grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
if (is_client) {
@@ -664,7 +638,7 @@ static void destroy_transport(grpc_transport *gt) {
It's shutdown path, so I don't believe an extra lock pair is going to be
problematic for performance. */
lock(t);
- GPR_ASSERT(!t->channel_callback.cb);
+ GPR_ASSERT(t->error_state == ERROR_STATE_NOTIFIED);
unlock(t);
unref_transport(t);
@@ -722,7 +696,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
}
s->incoming_deadline = gpr_inf_future;
- grpc_sopb_init(&s->writing_sopb);
+ grpc_sopb_init(&s->writing.sopb);
grpc_sopb_init(&s->callback_sopb);
grpc_chttp2_data_parser_init(&s->parser);
@@ -755,7 +729,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
GPR_ASSERT(s->outgoing_sopb == NULL);
GPR_ASSERT(s->incoming_sopb == NULL);
- grpc_sopb_destroy(&s->writing_sopb);
+ grpc_sopb_destroy(&s->writing.sopb);
grpc_sopb_destroy(&s->callback_sopb);
grpc_chttp2_data_parser_destroy(&s->parser);
for (i = 0; i < s->incoming_metadata_count; i++) {
@@ -771,9 +745,11 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
* LIST MANAGEMENT
*/
+#if 0
static int stream_list_empty(transport *t, stream_list_id id) {
return t->lists[id].head == NULL;
}
+#endif
static stream *stream_list_remove_head(transport *t, stream_list_id id) {
stream *s = t->lists[id].head;
@@ -852,25 +828,24 @@ static void remove_from_stream_map(transport *t, stream *s) {
static void lock(transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(transport *t) {
- unlock_ctx uctx;
- size_t i;
+ grpc_iomgr_closure *run_closures;
- memset(&uctx, 0, sizeof(uctx));
+ unlock_check_writes(t);
+ unlock_check_cancellations(t);
+ unlock_check_parser(t);
+ unlock_check_channel_callbacks(t);
- unlock_check_writes(t, &uctx);
- unlock_check_cancellations(t, &uctx);
- unlock_check_parser(t, &uctx);
- unlock_check_op_callbacks(t, &uctx);
- unlock_check_channel_callbacks(t, &uctx);
+ run_closures = t->global.pending_closures;
+ t->global.pending_closures = NULL;
gpr_mu_unlock(&t->mu);
- for (i = 0; i < uctx.num_post_actions; i++) {
- grpc_iomgr_closure* closure = uctx.post_actions[i];
- closure->cb(closure->cb_arg, 1);
+ while (run_closures) {
+ grpc_iomgr_closure *next = run_closures->next;
+ run_closures->cb(run_closures->cb_arg, run_closures->success);
+ run_closures = next;
}
-
#if 0
int start_write = 0;
int perform_callbacks = 0;
@@ -994,7 +969,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
}
}
-static void unlock_check_writes(transport *t, unlock_ctx *uctx) {
+static void unlock_check_writes(transport *t) {
stream *s;
gpr_uint32 window_delta;
@@ -1023,7 +998,7 @@ static void unlock_check_writes(transport *t, unlock_ctx *uctx) {
s->outgoing_window > 0) {
window_delta = grpc_chttp2_preencode(
s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
- GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
+ GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing.sopb);
FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
t->outgoing_window -= window_delta;
@@ -1032,19 +1007,19 @@ static void unlock_check_writes(transport *t, unlock_ctx *uctx) {
if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
s->outgoing_sopb->nops == 0) {
if (!t->is_client && !s->read_closed) {
- s->send_closed = SEND_CLOSED_WITH_RST_STREAM;
+ s->writing.send_closed = SEND_CLOSED_WITH_RST_STREAM;
} else {
- s->send_closed = SEND_CLOSED;
+ s->writing.send_closed = SEND_CLOSED;
}
}
- if (s->writing_sopb.nops > 0 || s->send_closed) {
+ if (s->writing.sopb.nops > 0 || s->writing.send_closed) {
stream_list_join(t, s, WRITING);
}
/* we should either exhaust window or have no ops left, but not both */
if (s->outgoing_sopb->nops == 0) {
s->outgoing_sopb = NULL;
- schedule_cb(t, s->send_done_closure, 1);
+ schedule_cb(t, s->global.send_done_closure, 1);
} else if (s->outgoing_window) {
stream_list_add_tail(t, s, WRITABLE);
}
@@ -1075,30 +1050,31 @@ static void unlock_check_writes(transport *t, unlock_ctx *uctx) {
}
if (t->writing.outbuf.length > 0) {
- uctx->post_actions[uctx->num_post_actions++] = &t->writing.action;
t->writing.executing = 1;
+ ref_transport(t);
+ schedule_cb(t, &t->writing.action, 1);
}
}
-static void finalize_outbuf(transport *t) {
+static void writing_finalize_outbuf(transport *t) {
stream *s;
while ((s = stream_list_remove_head(t, WRITING))) {
- grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
- s->send_closed != DONT_SEND_CLOSED, s->id,
- &t->hpack_compressor, &t->outbuf);
- s->writing_sopb.nops = 0;
- if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
- gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(
+ grpc_chttp2_encode(s->writing.sopb.ops, s->writing.sopb.nops,
+ s->writing.send_closed != DONT_SEND_CLOSED, s->id,
+ &t->writing.hpack_compressor, &t->writing.outbuf);
+ s->writing.sopb.nops = 0;
+ if (s->writing.send_closed == SEND_CLOSED_WITH_RST_STREAM) {
+ gpr_slice_buffer_add(&t->writing.outbuf, grpc_chttp2_rst_stream_create(
s->id, GRPC_CHTTP2_NO_ERROR));
}
- if (s->send_closed != DONT_SEND_CLOSED) {
+ if (s->writing.send_closed != DONT_SEND_CLOSED) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
}
-static void finish_write_common(transport *t, int success) {
+static void writing_finish(transport *t, int success) {
stream *s;
lock(t);
@@ -1112,11 +1088,11 @@ static void finish_write_common(transport *t, int success) {
}
maybe_finish_read(t, s, 0);
}
- t->outbuf.count = 0;
- t->outbuf.length = 0;
+ t->writing.outbuf.count = 0;
+ t->writing.outbuf.length = 0;
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
- t->writing = 0;
+ t->writing.executing = 0;
if (t->destroying) {
gpr_cv_signal(&t->cv);
}
@@ -1130,27 +1106,35 @@ static void finish_write_common(transport *t, int success) {
unref_transport(t);
}
-static void finish_write(void *tp, grpc_endpoint_cb_status error) {
+static void writing_finish_write_cb(void *tp, grpc_endpoint_cb_status error) {
transport *t = tp;
- finish_write_common(t, error == GRPC_ENDPOINT_CB_OK);
+ writing_finish(t, error == GRPC_ENDPOINT_CB_OK);
}
-static void perform_write(transport *t, grpc_endpoint *ep) {
- finalize_outbuf(t);
+static void writing_action(void *gt, int iomgr_success_ignored) {
+ transport *t = gt;
- GPR_ASSERT(t->outbuf.count > 0);
+ writing_finalize_outbuf(t);
- switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
- finish_write, t)) {
+ GPR_ASSERT(t->writing.outbuf.count > 0);
+
+ switch (grpc_endpoint_write(t->ep, t->writing.outbuf.slices, t->writing.outbuf.count,
+ writing_finish_write_cb, t)) {
case GRPC_ENDPOINT_WRITE_DONE:
- finish_write_common(t, 1);
+ writing_finish(t, 1);
break;
case GRPC_ENDPOINT_WRITE_ERROR:
- finish_write_common(t, 0);
+ writing_finish(t, 0);
break;
case GRPC_ENDPOINT_WRITE_PENDING:
break;
}
+
+ lock(t);
+ t->writing.executing = 0;
+ unlock(t);
+
+ unref_transport(t);
}
static void add_goaway(transport *t, gpr_uint32 goaway_error,
@@ -1168,7 +1152,7 @@ static void add_goaway(transport *t, gpr_uint32 goaway_error,
static void maybe_start_some_streams(transport *t) {
/* start streams where we have free stream ids and free concurrency */
- while (!t->parsing && t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
+ while (!t->parsing.executing && t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) <
t->settings[PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
@@ -1216,8 +1200,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
if (op->send_ops) {
GPR_ASSERT(s->outgoing_sopb == NULL);
- s->send_done_closure.cb = op->on_done_send;
- s->send_done_closure.user_data = op->send_user_data;
+ s->global.send_done_closure = op->on_done_send;
if (!s->cancelled) {
s->outgoing_sopb = op->send_ops;
if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
@@ -1234,15 +1217,14 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
}
} else {
schedule_nuke_sopb(t, op->send_ops);
- schedule_cb(t, s->send_done_closure, 0);
+ schedule_cb(t, s->global.send_done_closure, 0);
}
}
if (op->recv_ops) {
GPR_ASSERT(s->incoming_sopb == NULL);
GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED);
- s->recv_done_closure.cb = op->on_done_recv;
- s->recv_done_closure.user_data = op->recv_user_data;
+ s->global.recv_done_closure = op->on_done_recv;
s->incoming_sopb = op->recv_ops;
s->incoming_sopb->nops = 0;
s->publish_state = op->recv_state;
@@ -1257,10 +1239,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
}
if (op->on_consumed) {
- op_closure c;
- c.cb = op->on_consumed;
- c.user_data = op->on_consumed_user_data;
- schedule_cb(t, c, 1);
+ schedule_cb(t, op->on_consumed, 1);
}
}
@@ -1296,7 +1275,7 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
p->id[7] = t->ping_counter & 0xff;
p->cb = cb;
p->user_data = user_data;
- gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
+ gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
unlock(t);
}
@@ -1304,9 +1283,13 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
* INPUT PROCESSING
*/
-static void finalize_cancellations(transport *t) {
+static void unlock_check_cancellations(transport *t) {
stream *s;
+ if (t->writing.executing) {
+ return;
+ }
+
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->read_closed = 1;
s->write_state = WRITE_STATE_SENT_CLOSE;
@@ -1342,7 +1325,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
schedule_nuke_sopb(t, s->outgoing_sopb);
s->outgoing_sopb = NULL;
stream_list_remove(t, s, WRITABLE);
- schedule_cb(t, s->send_done_closure, 0);
+ schedule_cb(t, s->global.send_done_closure, 0);
}
}
if (s->cancelled) {
@@ -1383,7 +1366,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
}
if (!id) send_rst = 0;
if (send_rst) {
- gpr_slice_buffer_add(&t->qbuf,
+ gpr_slice_buffer_add(&t->global.qbuf,
grpc_chttp2_rst_stream_create(id, error_code));
}
if (optional_message) {
@@ -1434,7 +1417,7 @@ static void maybe_finish_read(transport *t, stream *s, int is_parser) {
}
static void maybe_join_window_updates(transport *t, stream *s) {
- if (t->parsing) {
+ if (t->parsing.executing) {
stream_list_join(t, s, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE);
return;
}
@@ -1610,7 +1593,7 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
}
t->incoming_stream = NULL;
/* if stream is accepted, we set incoming_stream in init_stream */
- t->cb->accept_stream(t->cb_user_data, &t->base,
+ t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data, &t->base,
(void *)(gpr_uintptr)t->incoming_stream_id);
s = t->incoming_stream;
if (!s) {
@@ -1795,11 +1778,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
maybe_finish_read(t, t->incoming_stream, 1);
}
if (st.ack_settings) {
- gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
+ gpr_slice_buffer_add(&t->parsing.qbuf, grpc_chttp2_settings_ack_create());
}
if (st.send_ping_ack) {
gpr_slice_buffer_add(
- &t->qbuf,
+ &t->parsing.qbuf,
grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
}
if (st.goaway) {
@@ -2056,7 +2039,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
lock(t);
drop_connection(t);
t->reading = 0;
- if (!t->writing && t->ep) {
+ if (!t->writing.executing && t->ep) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
unref_transport(t); /* safe as we still have a ref for read */
@@ -2065,16 +2048,16 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
unref_transport(t);
break;
case GRPC_ENDPOINT_CB_OK:
- gpr_mu_lock(&t->mu);
- GPR_ASSERT(!t->parsing);
- t->parsing = 1;
- gpr_mu_unlock(&t->mu);
- if (t->cb) {
+ lock(t);
+ GPR_ASSERT(!t->parsing.executing);
+ t->parsing.executing = 1;
+ if (t->error_state == ERROR_STATE_NONE) {
+ gpr_mu_unlock(&t->mu);
for (i = 0; i < nslices && process_read(t, slices[i]); i++)
;
+ gpr_mu_unlock(&t->mu);
}
- lock(t);
- t->parsing = 0;
+ t->parsing.executing = 0;
while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) {
maybe_finish_read(t, s, 0);
}
@@ -2176,9 +2159,13 @@ static void patch_metadata_ops(stream *s) {
}
}
-static void finish_reads(transport *t) {
+static void unlock_check_parser(transport *t) {
stream *s;
+ if (t->parsing.executing) {
+ return;
+ }
+
while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
int publish = 0;
GPR_ASSERT(s->incoming_sopb);
@@ -2200,42 +2187,87 @@ static void finish_reads(transport *t) {
patch_metadata_ops(s);
}
s->incoming_sopb = NULL;
- schedule_cb(t, s->recv_done_closure, 1);
+ schedule_cb(t, s->global.recv_done_closure, 1);
}
}
}
-static void schedule_cb(transport *t, op_closure closure, int success) {
- if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
- t->pending_callbacks.capacity =
- GPR_MAX(t->pending_callbacks.capacity * 2, 8);
- t->pending_callbacks.callbacks =
- gpr_realloc(t->pending_callbacks.callbacks,
- t->pending_callbacks.capacity *
- sizeof(*t->pending_callbacks.callbacks));
+typedef struct {
+ transport *t;
+ pending_goaway *goaways;
+ size_t num_goaways;
+ grpc_iomgr_closure closure;
+} notify_goaways_args;
+
+static void notify_goaways(void *p, int iomgr_success_ignored) {
+ size_t i;
+ notify_goaways_args *a = p;
+ transport *t = a->t;
+
+ for (i = 0; i < a->num_goaways; i++) {
+ t->channel_callback.cb->goaway(
+ t->channel_callback.cb_user_data,
+ &t->base,
+ a->goaways[i].status,
+ a->goaways[i].debug);
}
- closure.success = success;
- t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
-}
-static int prepare_callbacks(transport *t) {
- op_closure_array temp = t->pending_callbacks;
- t->pending_callbacks = t->executing_callbacks;
- t->executing_callbacks = temp;
- return t->executing_callbacks.count > 0;
+ gpr_free(a->goaways);
+ gpr_free(a);
+
+ lock(t);
+ t->channel_callback.executing = 0;
+ unlock(t);
+
+ unref_transport(t);
}
-static void run_callbacks(transport *t) {
- size_t i;
- for (i = 0; i < t->executing_callbacks.count; i++) {
- op_closure c = t->executing_callbacks.callbacks[i];
- c.cb(c.user_data, c.success);
+static void unlock_check_channel_callbacks(transport *t) {
+ if (t->channel_callback.executing) {
+ return;
}
- t->executing_callbacks.count = 0;
+ if (t->parsing.executing) {
+ return;
+ }
+ if (t->num_pending_goaways) {
+ notify_goaways_args *a = gpr_malloc(sizeof(*a));
+ a->goaways = t->pending_goaways;
+ a->num_goaways = t->num_pending_goaways;
+ t->pending_goaways = NULL;
+ t->num_pending_goaways = 0;
+ t->cap_pending_goaways = 0;
+ t->channel_callback.executing = 1;
+ grpc_iomgr_closure_init(&a->closure, notify_goaways, a);
+ ref_transport(t);
+ schedule_cb(t, &a->closure, 1);
+ return;
+ }
+ if (t->writing.executing) {
+ return;
+ }
+ if (t->error_state == ERROR_STATE_SEEN) {
+ t->error_state = ERROR_STATE_NOTIFIED;
+ t->channel_callback.executing = 1;
+ ref_transport(t);
+ schedule_cb(t, &t->channel_callback.notify_closed, 1);
+ }
+}
+
+static void notify_closed(void *gt, int iomgr_success_ignored) {
+ transport *t = gt;
+ t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base);
+
+ lock(t);
+ t->channel_callback.executing = 0;
+ unlock(t);
+
+ unref_transport(t);
}
-static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
- cb->closed(t->cb_user_data, &t->base);
+static void schedule_cb(transport *t, grpc_iomgr_closure *closure, int success) {
+ closure->success = success;
+ closure->next = t->global.pending_closures;
+ t->global.pending_closures = closure;
}
/*
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index a9948cd4b2..167970d992 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -98,13 +98,13 @@ void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup,
void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
if (op->send_ops) {
- op->on_done_send(op->send_user_data, 0);
+ op->on_done_send->cb(op->on_done_send->cb_arg, 0);
}
if (op->recv_ops) {
- op->on_done_recv(op->recv_user_data, 0);
+ op->on_done_recv->cb(op->on_done_recv->cb_arg, 0);
}
if (op->on_consumed) {
- op->on_consumed(op->on_consumed_user_data, 0);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 7f60fdc037..9d43581a0a 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -64,18 +64,15 @@ typedef enum grpc_stream_state {
/* Transport op: a set of operations to perform on a transport */
typedef struct grpc_transport_op {
- void (*on_consumed)(void *user_data, int success);
- void *on_consumed_user_data;
+ grpc_iomgr_closure *on_consumed;
grpc_stream_op_buffer *send_ops;
int is_last_send;
- void (*on_done_send)(void *user_data, int success);
- void *send_user_data;
+ grpc_iomgr_closure *on_done_send;
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
- void (*on_done_recv)(void *user_data, int success);
- void *recv_user_data;
+ grpc_iomgr_closure *on_done_recv;
grpc_pollset *bind_pollset;