diff options
Diffstat (limited to 'src/core')
63 files changed, 716 insertions, 549 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index d66f29228b..fa318c4e54 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -128,7 +128,8 @@ static void server_start_transport_op(grpc_call_element* elem, static void client_init_call_elem(grpc_call_element* elem, const void* server_transport_data, - grpc_transport_stream_op* initial_op) { + grpc_transport_stream_op* initial_op, + grpc_call_list* call_list) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); @@ -144,7 +145,8 @@ static void client_destroy_call_elem(grpc_call_element* elem, static void server_init_call_elem(grpc_call_element* elem, const void* server_transport_data, - grpc_transport_stream_op* initial_op) { + grpc_transport_stream_op* initial_op, + grpc_call_list* call_list) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index c767a87b20..10fbd520c3 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -153,7 +153,8 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack, void grpc_call_stack_init(grpc_channel_stack *channel_stack, const void *transport_server_data, grpc_transport_stream_op *initial_op, - grpc_call_stack *call_stack) { + grpc_call_stack *call_stack, + grpc_call_list *call_list) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); size_t count = channel_stack->count; grpc_call_element *call_elems; @@ -171,7 +172,7 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack, call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data, - initial_op); + initial_op, call_list); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 190247b3f1..5afe7f258a 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -85,7 +85,8 @@ typedef struct { argument.*/ void (*init_call_elem)(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op); + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list); /* Destroy per call data. The filter does not need to do any chaining */ void (*destroy_call_elem)(grpc_call_element *elem, grpc_call_list *call_list); @@ -172,7 +173,8 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack, void grpc_call_stack_init(grpc_channel_stack *channel_stack, const void *transport_server_data, grpc_transport_stream_op *initial_op, - grpc_call_stack *call_stack); + grpc_call_stack *call_stack, + grpc_call_list *call_list); /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_call_stack *stack, grpc_call_list *call_list); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 6618336e93..62f81daf44 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -459,7 +459,7 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success, on_lb_policy_state_changed_locked(w, call_list); gpr_mu_unlock(&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); + GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy", call_list); gpr_free(w); } @@ -551,7 +551,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success, GRPC_LB_POLICY_UNREF(lb_policy, "config_change", call_list); } - GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); + GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver", call_list); } static void cc_start_transport_op(grpc_channel_element *elem, @@ -610,7 +610,8 @@ static void cc_start_transport_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; /* TODO(ctiller): is there something useful we can do here? */ @@ -688,7 +689,7 @@ static void destroy_channel_elem(grpc_channel_element *elem, if (chand->lb_policy != NULL) { GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel", call_list); } - grpc_connectivity_state_destroy(&chand->state_tracker); + grpc_connectivity_state_destroy(&chand->state_tracker, call_list); grpc_pollset_set_destroy(&chand->pollset_set); gpr_mu_destroy(&chand->mu_config); } diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 5a7403ccfd..e2be603e26 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -282,7 +282,8 @@ static void compress_start_transport_stream_op(grpc_call_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 00a4d61afe..1af5eae947 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -69,21 +69,22 @@ static void con_start_transport_stream_op(grpc_call_element *elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_transport_perform_stream_op(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); + grpc_transport_perform_stream_op( + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op, call_list); } static void con_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; - grpc_transport_perform_op(chand->transport, op); + grpc_transport_perform_op(chand->transport, op, call_list); } /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r; @@ -91,7 +92,7 @@ static void init_call_elem(grpc_call_element *elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - server_transport_data, initial_op); + server_transport_data, initial_op, call_list); GPR_ASSERT(r == 0); } @@ -101,8 +102,8 @@ static void destroy_call_elem(grpc_call_element *elem, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - grpc_transport_destroy_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld)); + grpc_transport_destroy_stream( + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), call_list); } /* Constructor for channel_data */ @@ -121,12 +122,12 @@ static void destroy_channel_elem(grpc_channel_element *elem, grpc_call_list *call_list) { channel_data *cd = (channel_data *)elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - grpc_transport_destroy(cd->transport); + grpc_transport_destroy(cd->transport, call_list); } static char *con_get_peer(grpc_call_element *elem, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; - return grpc_transport_get_peer(chand->transport); + return grpc_transport_get_peer(chand->transport, call_list); } const grpc_channel_filter grpc_connected_channel_filter = { diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 4e9eac72ba..d0adafc048 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -165,7 +165,8 @@ static void hc_start_transport_op(grpc_call_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 201adf4f35..70cc4f298a 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -238,7 +238,8 @@ static void hs_start_transport_op(grpc_call_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index d4f1acb4c5..de75f83654 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -74,7 +74,8 @@ static void noop_start_transport_stream_op(grpc_call_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 6dc52f43ce..8ea774bebc 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -101,7 +101,7 @@ void pf_destroy(grpc_lb_policy *pol, grpc_call_list *call_list) { for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", call_list); } - grpc_connectivity_state_destroy(&p->state_tracker); + grpc_connectivity_state_destroy(&p->state_tracker, call_list); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); gpr_free(p); diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 5750db4b43..0fc89ed864 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -197,7 +197,7 @@ static void dns_destroy(grpc_resolver *gr, grpc_call_list *call_list) { if (r->resolved_config) { grpc_client_config_unref(r->resolved_config, call_list); } - grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_subchannel_factory_unref(r->subchannel_factory, call_list); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r->lb_policy_name); diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 38293b0f13..2729036d4f 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -159,7 +159,7 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r, static void sockaddr_destroy(grpc_resolver *gr, grpc_call_list *call_list) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); - grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_subchannel_factory_unref(r->subchannel_factory, call_list); gpr_free(r->addrs); gpr_free(r->addrs_len); gpr_free(r->lb_policy_name); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index bae705f9c3..48df4bbd31 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -143,7 +143,8 @@ struct grpc_subchannel_call { #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) -static grpc_subchannel_call *create_call(connection *con); +static grpc_subchannel_call *create_call(connection *con, + grpc_call_list *call_list); static void connectivity_state_changed_locked(grpc_subchannel *c, const char *reason, grpc_call_list *call_list); @@ -262,7 +263,7 @@ static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list) { grpc_channel_args_destroy(c->args); gpr_free(c->addr); grpc_mdctx_unref(c->mdctx); - grpc_connectivity_state_destroy(&c->state_tracker); + grpc_connectivity_state_destroy(&c->state_tracker, call_list); grpc_connector_unref(c->connector, call_list); gpr_free(c); } @@ -355,7 +356,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, CONNECTION_REF_LOCKED(con, "call"); gpr_mu_unlock(&c->mu); - *target = create_call(con); + *target = create_call(con, call_list); notify->cb(notify->cb_arg, 1, call_list); } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); @@ -561,7 +562,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { gpr_free(sw); gpr_free(filters); grpc_channel_stack_destroy(stk, call_list); - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list); GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list); return; } @@ -582,7 +583,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { op.on_connectivity_state_change = &sw->closure; op.bind_pollset_set = c->pollset_set; SUBCHANNEL_REF_LOCKED(c, "state_watcher"); - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list); GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); @@ -650,7 +651,7 @@ static void on_alarm(void *arg, int iomgr_success, grpc_call_list *call_list) { update_reconnect_parameters(c); continue_connect(c, call_list); } else { - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list); GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list); } } @@ -746,13 +747,14 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call, top_elem->filter->start_transport_stream_op(top_elem, op, call_list); } -grpc_subchannel_call *create_call(connection *con) { +static grpc_subchannel_call *create_call(connection *con, + grpc_call_list *call_list) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); call->connection = con; gpr_ref_init(&call->refs, 1); - grpc_call_stack_init(chanstk, NULL, NULL, callstk); + grpc_call_stack_init(chanstk, NULL, NULL, callstk, call_list); return call; } diff --git a/src/core/client_config/subchannel_factory.c b/src/core/client_config/subchannel_factory.c index 2a569aba13..e252665000 100644 --- a/src/core/client_config/subchannel_factory.c +++ b/src/core/client_config/subchannel_factory.c @@ -36,8 +36,10 @@ void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory) { factory->vtable->ref(factory); } -void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory) { - factory->vtable->unref(factory); + +void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory, + grpc_call_list *call_list) { + factory->vtable->unref(factory, call_list); } grpc_subchannel *grpc_subchannel_factory_create_subchannel( diff --git a/src/core/client_config/subchannel_factory.h b/src/core/client_config/subchannel_factory.h index b588580edb..2ff9d3c402 100644 --- a/src/core/client_config/subchannel_factory.h +++ b/src/core/client_config/subchannel_factory.h @@ -48,14 +48,15 @@ struct grpc_subchannel_factory { struct grpc_subchannel_factory_vtable { void (*ref)(grpc_subchannel_factory *factory); - void (*unref)(grpc_subchannel_factory *factory); + void (*unref)(grpc_subchannel_factory *factory, grpc_call_list *call_list); grpc_subchannel *(*create_subchannel)(grpc_subchannel_factory *factory, grpc_subchannel_args *args, grpc_call_list *call_list); }; void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory); -void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory); +void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory, + grpc_call_list *call_list); /** Create a new grpc_subchannel */ grpc_subchannel *grpc_subchannel_factory_create_subchannel( diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c index b2c9797b1a..00dff6343c 100644 --- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c +++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c @@ -47,10 +47,11 @@ static void merge_args_factory_ref(grpc_subchannel_factory *scf) { gpr_ref(&f->refs); } -static void merge_args_factory_unref(grpc_subchannel_factory *scf) { +static void merge_args_factory_unref(grpc_subchannel_factory *scf, + grpc_call_list *call_list) { merge_args_factory *f = (merge_args_factory *)scf; if (gpr_unref(&f->refs)) { - grpc_subchannel_factory_unref(f->wrapped); + grpc_subchannel_factory_unref(f->wrapped, call_list); grpc_channel_args_destroy(f->merge_args); gpr_free(f); } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 40ea2e9688..fc45eda2de 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -246,7 +246,7 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, grpc_call_list *call_list) { char *name; if (g_get_override && - g_get_override(request, deadline, on_response, user_data)) { + g_get_override(request, deadline, on_response, user_data, call_list)) { return; } gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); @@ -263,8 +263,9 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, grpc_httpcli_response_cb on_response, void *user_data, grpc_call_list *call_list) { char *name; - if (g_post_override && g_post_override(request, body_bytes, body_size, - deadline, on_response, user_data)) { + if (g_post_override && + g_post_override(request, body_bytes, body_size, deadline, on_response, + user_data, call_list)) { return; } gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path); diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index 74bb123042..6d19da37fb 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -147,13 +147,15 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, typedef int (*grpc_httpcli_get_override)(const grpc_httpcli_request *request, gpr_timespec deadline, grpc_httpcli_response_cb on_response, - void *user_data); + void *user_data, + grpc_call_list *call_list); typedef int (*grpc_httpcli_post_override)(const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, grpc_httpcli_response_cb on_response, - void *user_data); + void *user_data, + grpc_call_list *call_list); void grpc_httpcli_set_override(grpc_httpcli_get_override get, grpc_httpcli_post_override post); diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 6e0d516f0c..146bda477d 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -291,6 +291,7 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now, gpr_mu_lock(&shard->mu); while ((alarm = pop_one(shard, now))) { grpc_call_list_add(call_list, &alarm->closure, success); + n++; } *new_min_deadline = compute_min_deadline(shard); gpr_mu_unlock(&shard->mu); @@ -332,7 +333,7 @@ static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next, gpr_mu_unlock(&g_checker_mu); } - return n > 0; + return (int)n; } int grpc_alarm_check(gpr_timespec now, gpr_timespec *next, diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 98653412bb..95ba694ff6 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -73,9 +73,12 @@ void grpc_pollset_destroy(grpc_pollset *pollset); grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will not be released by grpc_pollset_work AFTER worker has been destroyed. - Tries not to block past deadline. */ + Tries not to block past deadline. + May call grpc_call_list_run on grpc_call_list, without holding the pollset + lock */ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec now, gpr_timespec deadline); + gpr_timespec now, gpr_timespec deadline, + grpc_call_list *call_list); /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 885cb29234..1040716179 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -174,21 +174,21 @@ static void finish_shutdown(grpc_pollset *pollset, grpc_call_list *call_list) { } void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec now, gpr_timespec deadline) { + gpr_timespec now, gpr_timespec deadline, + grpc_call_list *call_list) { /* pollset->mu already held */ int added_worker = 0; int locked = 1; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); if (!grpc_pollset_has_workers(pollset) && !grpc_call_list_empty(pollset->idle_jobs)) { - grpc_call_list_move(&pollset->idle_jobs, &call_list); + grpc_call_list_move(&pollset->idle_jobs, call_list); goto done; } - if (grpc_alarm_check(now, &deadline, &call_list)) { + if (grpc_alarm_check(now, &deadline, call_list)) { goto done; } if (pollset->shutting_down) { @@ -212,14 +212,8 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->kicked_without_pollers = 0; } done: - if (!grpc_call_list_empty(call_list)) { - if (locked) { - gpr_mu_unlock(&pollset->mu); - locked = 0; - } - grpc_call_list_run(&call_list); - } if (!locked) { + grpc_call_list_run(call_list); gpr_mu_lock(&pollset->mu); locked = 1; } @@ -233,8 +227,8 @@ done: } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); - finish_shutdown(pollset, &call_list); - grpc_call_list_run(&call_list); + finish_shutdown(pollset, call_list); + grpc_call_list_run(call_list); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * grpc_pollset_work. diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h index ec8d83fffa..72b9c1cc87 100644 --- a/src/core/iomgr/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -34,6 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H #define GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H +#include <stddef.h> #include "src/core/iomgr/iomgr.h" #define GRPC_MAX_SOCKADDR_SIZE 128 diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 0a2de4f7bb..1d06df8533 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -280,7 +280,8 @@ static void auth_start_transport_op(grpc_call_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; calld->creds = NULL; calld->host = NULL; diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 5da29f5ad5..8c3d85cc58 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -123,7 +123,7 @@ static int is_stack_running_on_compute_engine(void) { while (!detector.is_done) { grpc_pollset_worker worker; grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); } gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 2376cbbeb3..201288bbdd 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -200,7 +200,8 @@ static void auth_start_transport_op(grpc_call_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index e6e2eee658..0662839105 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -67,6 +67,7 @@ typedef struct grpc_server_secure_state { gpr_mu mu; gpr_refcount refcount; grpc_closure destroy_closure; + grpc_closure *destroy_callback; } grpc_server_secure_state; static void state_ref(grpc_server_secure_state *state) { @@ -86,7 +87,8 @@ static void state_unref(grpc_server_secure_state *state) { } static void setup_transport(void *statep, grpc_transport *transport, - grpc_mdctx *mdctx, grpc_workqueue *workqueue) { + grpc_mdctx *mdctx, grpc_workqueue *workqueue, + grpc_call_list *call_list) { static grpc_channel_filter const *extra_filters[] = { &grpc_server_auth_filter, &grpc_http_server_filter}; grpc_server_secure_state *state = statep; @@ -100,7 +102,7 @@ static void setup_transport(void *statep, grpc_transport *transport, GPR_ARRAY_SIZE(args_to_add)); grpc_server_setup_transport(state->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, - args_copy); + args_copy, call_list); grpc_channel_args_destroy(args_copy); } @@ -142,9 +144,9 @@ static void on_secure_transport_setup_done(void *statep, workqueue = grpc_workqueue_create(call_list); transport = grpc_create_chttp2_transport( grpc_server_get_channel_args(state->server), secure_endpoint, mdctx, - 0); - setup_transport(state, transport, mdctx, workqueue); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + 0, call_list); + setup_transport(state, transport, mdctx, workqueue, call_list); + grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list); } else { /* We need to consume this here, because the server may already have gone * away. */ @@ -185,7 +187,8 @@ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets, static void destroy_done(void *statep, int success, grpc_call_list *call_list) { grpc_server_secure_state *state = statep; - grpc_server_listener_destroy_done(state->server); + state->destroy_callback->cb(state->destroy_callback->cb_arg, success, + call_list); gpr_mu_lock(&state->mu); while (state->handshaking_tcp_endpoints != NULL) { grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint, @@ -199,12 +202,13 @@ static void destroy_done(void *statep, int success, grpc_call_list *call_list) { /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ -static void destroy(grpc_server *server, void *statep, +static void destroy(grpc_server *server, void *statep, grpc_closure *callback, grpc_call_list *call_list) { grpc_server_secure_state *state = statep; grpc_tcp_server *tcp; gpr_mu_lock(&state->mu); state->is_shutdown = 1; + state->destroy_callback = callback; tcp = state->tcp; gpr_mu_unlock(&state->mu); grpc_closure_init(&state->destroy_closure, destroy_done, state); @@ -283,7 +287,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, gpr_ref_init(&state->refcount, 1); /* Register with the server only upon success */ - grpc_server_add_listener(server, state, start, destroy); + grpc_server_add_listener(server, state, start, destroy, &call_list); grpc_call_list_run(&call_list); return port_num; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index f2f8f0a6ed..bbaf65759d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -356,7 +356,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, initial_op_ptr = &initial_op; } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, - CALL_STACK_FROM_CALL(call)); + CALL_STACK_FROM_CALL(call), &call_list); if (parent_call != NULL) { GRPC_CALL_INTERNAL_REF(parent_call, "child"); GPR_ASSERT(call->is_client); @@ -459,7 +459,7 @@ static void destroy_call(grpc_call *call, grpc_call_list *call_list) { size_t i; grpc_call *c = call; grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), call_list); - GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); + GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call", call_list); gpr_mu_destroy(&c->mu); gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { @@ -673,7 +673,8 @@ static void unlock(grpc_call *call, grpc_call_list *call_list) { if (completing_requests > 0) { for (i = 0; i < completing_requests; i++) { completed_requests[i].on_complete(call, completed_requests[i].success, - completed_requests[i].user_data); + completed_requests[i].user_data, + call_list); } lock(call); call->completing = 0; @@ -1556,14 +1557,16 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); } -static void finish_batch(grpc_call *call, int success, void *tag) { +static void finish_batch(grpc_call *call, int success, void *tag, + grpc_call_list *call_list) { grpc_cq_end_op(call->cq, tag, success, done_completion, call, - allocate_completion(call)); + allocate_completion(call), call_list); } -static void finish_batch_with_close(grpc_call *call, int success, void *tag) { +static void finish_batch_with_close(grpc_call *call, int success, void *tag, + grpc_call_list *call_list) { grpc_cq_end_op(call->cq, tag, 1, done_completion, call, - allocate_completion(call)); + allocate_completion(call), call_list); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1581,7 +1584,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; - void (*finish_func)(grpc_call *, int, void *) = finish_batch; + void (*finish_func)(grpc_call *, int, void *, grpc_call_list *) = + finish_batch; grpc_call_error error; grpc_call_list call_list = GRPC_CALL_LIST_INIT; @@ -1596,7 +1600,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_cq_begin_op(call->cq); GRPC_CALL_INTERNAL_REF(call, "completion"); grpc_cq_end_op(call->cq, tag, 1, done_completion, call, - allocate_completion(call)); + allocate_completion(call), &call_list); error = GRPC_CALL_OK; goto done; } diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 7a7178bc7b..144aa7cef2 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -87,7 +87,8 @@ typedef struct { } grpc_ioreq; typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, - void *user_data); + void *user_data, + grpc_call_list *call_list); grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index fdba09fcce..46bea13936 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -272,9 +272,9 @@ void grpc_channel_internal_ref(grpc_channel *c) { gpr_ref(&c->refs); } -static void destroy_channel(grpc_channel *channel) { +static void destroy_channel(grpc_channel *channel, grpc_call_list *call_list) { size_t i; - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel), call_list); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]); } @@ -303,26 +303,31 @@ static void destroy_channel(grpc_channel *channel) { } #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG -void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) { +void grpc_channel_internal_unref(grpc_channel *channel, const char *reason, + grpc_call_list *call_list) { gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel, channel->refs.count, channel->refs.count - 1, reason); #else -void grpc_channel_internal_unref(grpc_channel *channel) { +void grpc_channel_internal_unref(grpc_channel *channel, + grpc_call_list *call_list) { #endif if (gpr_unref(&channel->refs)) { - destroy_channel(channel); + destroy_channel(channel, call_list); } } void grpc_channel_destroy(grpc_channel *channel) { grpc_transport_op op; grpc_channel_element *elem; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; memset(&op, 0, sizeof(op)); op.disconnect = 1; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - elem->filter->start_transport_op(elem, &op); + elem->filter->start_transport_op(elem, &op, &call_list); - GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel"); + GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel", &call_list); + + grpc_call_list_run(&call_list); } grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 664ecc1c5a..3f51164fcc 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -67,18 +67,20 @@ grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel); #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); -void grpc_channel_internal_unref(grpc_channel *channel, const char *reason); +void grpc_channel_internal_unref(grpc_channel *channel, const char *reason, + grpc_call_list *call_list); #define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \ grpc_channel_internal_ref(channel, reason) -#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \ - grpc_channel_internal_unref(channel, reason) +#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, call_list) \ + grpc_channel_internal_unref(channel, reason, call_list) #else void grpc_channel_internal_ref(grpc_channel *channel); -void grpc_channel_internal_unref(grpc_channel *channel); +void grpc_channel_internal_unref(grpc_channel *channel, + grpc_call_list *call_list); #define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \ grpc_channel_internal_ref(channel) -#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \ - grpc_channel_internal_unref(channel) +#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, call_list) \ + grpc_channel_internal_unref(channel, call_list) #endif #endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 4d0cf1ed8b..7891669e35 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -56,7 +56,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( } state = grpc_client_channel_check_connectivity_state( client_channel_elem, try_to_connect, &call_list); - grpc_call_list_run(call_list); + grpc_call_list_run(&call_list); return state; } @@ -80,17 +80,18 @@ typedef struct { void *tag; } state_watcher; -static void delete_state_watcher(state_watcher *w) { +static void delete_state_watcher(state_watcher *w, grpc_call_list *call_list) { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(w->channel)); grpc_client_channel_del_interested_party(client_channel_elem, - grpc_cq_pollset(w->cq)); - GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity"); + grpc_cq_pollset(w->cq), call_list); + GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity", call_list); gpr_mu_destroy(&w->mu); gpr_free(w); } -static void finished_completion(void *pw, grpc_cq_completion *ignored) { +static void finished_completion(void *pw, grpc_cq_completion *ignored, + grpc_call_list *call_list) { int delete = 0; state_watcher *w = pw; gpr_mu_lock(&w->mu); @@ -110,18 +111,19 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) { gpr_mu_unlock(&w->mu); if (delete) { - delete_state_watcher(w); + delete_state_watcher(w, call_list); } } -static void partly_done(state_watcher *w, int due_to_completion) { +static void partly_done(state_watcher *w, int due_to_completion, + grpc_call_list *call_list) { int delete = 0; if (due_to_completion) { gpr_mu_lock(&w->mu); w->success = 1; gpr_mu_unlock(&w->mu); - grpc_alarm_cancel(&w->alarm); + grpc_alarm_cancel(&w->alarm, call_list); } gpr_mu_lock(&w->mu); @@ -129,7 +131,7 @@ static void partly_done(state_watcher *w, int due_to_completion) { case WAITING: w->phase = CALLING_BACK; grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w, - &w->completion_storage); + &w->completion_storage, call_list); break; case CALLING_BACK: w->phase = CALLING_BACK_AND_FINISHED; @@ -145,13 +147,17 @@ static void partly_done(state_watcher *w, int due_to_completion) { gpr_mu_unlock(&w->mu); if (delete) { - delete_state_watcher(w); + delete_state_watcher(w, call_list); } } -static void watch_complete(void *pw, int success) { partly_done(pw, 1); } +static void watch_complete(void *pw, int success, grpc_call_list *call_list) { + partly_done(pw, 1, call_list); +} -static void timeout_complete(void *pw, int success) { partly_done(pw, 0); } +static void timeout_complete(void *pw, int success, grpc_call_list *call_list) { + partly_done(pw, 0, call_list); +} void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, @@ -172,9 +178,9 @@ void grpc_channel_watch_connectivity_state( w->tag = tag; w->channel = channel; - grpc_alarm_init(&w->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_alarm_init( + &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC), &call_list); if (client_channel_elem->filter != &grpc_client_channel_filter) { gpr_log(GPR_ERROR, @@ -185,10 +191,10 @@ void grpc_channel_watch_connectivity_state( } else { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); grpc_client_channel_add_interested_party(client_channel_elem, - grpc_cq_pollset(cq)); + grpc_cq_pollset(cq), &call_list); grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, &w->on_complete, &call_list); } - grpc_call_list_run(call_list); + grpc_call_list_run(&call_list); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index f6f42b3d7a..7ac76da10a 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -55,6 +55,13 @@ typedef struct { grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; + + grpc_endpoint *tcp; + + grpc_mdctx *mdctx; + grpc_workqueue *workqueue; + + grpc_closure connected; } connector; static void connector_ref(grpc_connector *con) { @@ -62,21 +69,24 @@ static void connector_ref(grpc_connector *con) { gpr_ref(&c->refs); } -static void connector_unref(grpc_connector *con) { +static void connector_unref(grpc_connector *con, grpc_call_list *call_list) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { + grpc_mdctx_unref(c->mdctx); + GRPC_WORKQUEUE_UNREF(c->workqueue, "connector", call_list); gpr_free(c); } } -static void connected(void *arg, grpc_endpoint *tcp) { +static void connected(void *arg, int success, grpc_call_list *call_list) { connector *c = arg; grpc_closure *notify; + grpc_endpoint *tcp = c->tcp; if (tcp != NULL) { c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue, - 1); - grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); + c->args.channel_args, tcp, c->mdctx, 1, call_list); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0, + call_list); GPR_ASSERT(c->result->transport); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); c->result->filters[0] = &grpc_http_client_filter; @@ -86,24 +96,27 @@ static void connected(void *arg, grpc_endpoint *tcp) { } notify = c->notify; c->notify = NULL; - notify->cb(notify->cb_arg, 1); + notify->cb(notify->cb_arg, 1, call_list); } -static void connector_shutdown(grpc_connector *con) {} +static void connector_shutdown(grpc_connector *con, grpc_call_list *call_list) { +} static void connector_connect(grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, - grpc_closure *notify) { + grpc_closure *notify, grpc_call_list *call_list) { connector *c = (connector *)con; GPR_ASSERT(c->notify == NULL); GPR_ASSERT(notify->cb); c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, - args->workqueue, args->addr, args->addr_len, - args->deadline); + c->tcp = NULL; + grpc_closure_init(&c->connected, connected, c); + grpc_tcp_client_connect(&c->connected, &c->tcp, args->interested_parties, + args->addr, args->addr_len, args->deadline, + call_list); } static const grpc_connector_vtable connector_vtable = { @@ -122,10 +135,11 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) { gpr_ref(&f->refs); } -static void subchannel_factory_unref(grpc_subchannel_factory *scf) { +static void subchannel_factory_unref(grpc_subchannel_factory *scf, + grpc_call_list *call_list) { subchannel_factory *f = (subchannel_factory *)scf; if (gpr_unref(&f->refs)) { - GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory", call_list); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -133,7 +147,8 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) { } static grpc_subchannel *subchannel_factory_create_subchannel( - grpc_subchannel_factory *scf, grpc_subchannel_args *args) { + grpc_subchannel_factory *scf, grpc_subchannel_args *args, + grpc_call_list *call_list) { subchannel_factory *f = (subchannel_factory *)scf; connector *c = gpr_malloc(sizeof(*c)); grpc_channel_args *final_args = @@ -146,7 +161,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( args->args = final_args; args->master = f->master; s = grpc_subchannel_create(&c->base, args); - grpc_connector_unref(&c->base); + grpc_connector_unref(&c->base, call_list); grpc_channel_args_destroy(final_args); return s; } @@ -168,7 +183,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_resolver *resolver; subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); - grpc_workqueue *workqueue = grpc_workqueue_create(); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_workqueue *workqueue = grpc_workqueue_create(&call_list); size_t n = 0; GPR_ASSERT(!reserved); if (grpc_channel_args_is_census_enabled(args)) { @@ -179,7 +195,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx, - workqueue, 1); + workqueue, 1, &call_list); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; @@ -189,15 +205,17 @@ grpc_channel *grpc_insecure_channel_create(const char *target, f->merge_args = grpc_channel_args_copy(args); f->master = channel; GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); - resolver = grpc_resolver_create(target, &f->base, workqueue); + resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), - resolver); - GRPC_RESOLVER_UNREF(resolver, "create"); - grpc_subchannel_factory_unref(&f->base); + resolver, &call_list); + GRPC_RESOLVER_UNREF(resolver, "create", &call_list); + grpc_subchannel_factory_unref(&f->base, &call_list); + + grpc_call_list_run(&call_list); return channel; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b58115a93f..49dfc3c0e1 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -67,8 +67,12 @@ struct grpc_completion_queue { int is_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; + grpc_closure pollset_destroy_done; }; +static void on_pollset_destroy_done(void *cc, int success, + grpc_call_list *call_list); + grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); GPR_ASSERT(!reserved); @@ -80,6 +84,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_pollset_init(&cc->pollset); cc->completed_tail = &cc->completed_head; cc->completed_head.next = (gpr_uintptr)cc->completed_tail; + grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc); return cc; } @@ -94,7 +99,8 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { gpr_ref(&cc->owning_refs); } -static void on_pollset_destroy_done(void *arg) { +static void on_pollset_destroy_done(void *arg, int success, + grpc_call_list *call_list) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } @@ -127,8 +133,10 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) { event, then enter shutdown mode */ /* Queue a GRPC_OP_COMPLETED operation */ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, - void (*done)(void *done_arg, grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { + void (*done)(void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list), + void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list) { int shutdown; int i; grpc_pollset_worker *pluck_worker; @@ -162,7 +170,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, call_list); } } @@ -172,6 +180,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_pollset_worker worker; int first_loop = 1; gpr_timespec now; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GPR_ASSERT(!reserved); @@ -190,7 +199,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(c->done_arg, c, &call_list); break; } if (cc->shutdown) { @@ -207,10 +216,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } first_loop = 0; - grpc_pollset_work(&cc->pollset, &worker, now, deadline); + grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); + grpc_call_list_run(&call_list); return ret; } @@ -247,6 +257,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_pollset_worker worker; gpr_timespec now; int first_loop = 1; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GPR_ASSERT(!reserved); @@ -268,7 +279,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(c->done_arg, c, &call_list); goto done; } prev = c; @@ -299,18 +310,20 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, break; } first_loop = 0; - grpc_pollset_work(&cc->pollset, &worker, now, deadline); + grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list); del_plucker(cc, tag, &worker); } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); + grpc_call_list_run(&call_list); return ret; } /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); if (cc->shutdown_called) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); @@ -324,8 +337,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, &call_list); } + grpc_call_list_run(&call_list); } void grpc_completion_queue_destroy(grpc_completion_queue *cc) { diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 793baff03a..6d8d1ce959 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -44,7 +44,8 @@ typedef struct grpc_cq_completion { void *tag; /** done callback - called when this queue element is no longer needed by the completion queue */ - void (*done)(void *done_arg, struct grpc_cq_completion *c); + void (*done)(void *done_arg, struct grpc_cq_completion *c, + grpc_call_list *call_list); void *done_arg; /** next pointer; low bit is used to indicate success or not */ gpr_uintptr next; @@ -74,7 +75,8 @@ void grpc_cq_begin_op(grpc_completion_queue *cc); void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, void (*done)(void *done_arg, grpc_cq_completion *storage, grpc_call_list *call_list), - void *done_arg, grpc_cq_completion *storage); + void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index a5de900eff..c5cf33f1f9 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -55,13 +55,14 @@ typedef struct { } channel_data; static void lame_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->send_ops != NULL) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(op->on_done_send->cb_arg, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0, call_list); } if (op->recv_ops != NULL) { char tmp[GPR_LTOA_MIN_BUFSIZE]; @@ -80,44 +81,48 @@ static void lame_start_transport_stream_op(grpc_call_element *elem, mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 1, call_list); } if (op->on_consumed != NULL) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0, call_list); } } -static char *lame_get_peer(grpc_call_element *elem) { +static char *lame_get_peer(grpc_call_element *elem, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; return grpc_channel_get_target(chand->master); } static void lame_start_transport_op(grpc_channel_element *elem, - grpc_transport_op *op) { + grpc_transport_op *op, + grpc_call_list *call_list) { if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; op->on_connectivity_state_change->cb( - op->on_connectivity_state_change->cb_arg, 1); + op->on_connectivity_state_change->cb_arg, 1, call_list); } if (op->on_consumed != NULL) { - op->on_consumed->cb(op->on_consumed->cb_arg, 1); + op->on_consumed->cb(op->on_consumed->cb_arg, 1, call_list); } } static void init_call_elem(grpc_call_element *elem, const void *transport_server_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { if (initial_op) { - grpc_transport_stream_op_finish_with_failure(initial_op); + grpc_transport_stream_op_finish_with_failure(initial_op, call_list); } } -static void destroy_call_elem(grpc_call_element *elem) {} +static void destroy_call_elem(grpc_call_element *elem, + grpc_call_list *call_list) {} static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + int is_first, int is_last, + grpc_call_list *call_list) { channel_data *chand = elem->channel_data; GPR_ASSERT(is_first); GPR_ASSERT(is_last); @@ -125,7 +130,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, chand->master = master; } -static void destroy_channel_elem(grpc_channel_element *elem) {} +static void destroy_channel_elem(grpc_channel_element *elem, + grpc_call_list *call_list) {} static const grpc_channel_filter lame_filter = { lame_start_transport_stream_op, @@ -148,14 +154,16 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, grpc_channel *channel; grpc_channel_element *elem; channel_data *chand; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; static const grpc_channel_filter *filters[] = {&lame_filter}; - channel = grpc_channel_create_from_filters(target, filters, 1, NULL, - grpc_mdctx_create(), - grpc_workqueue_create(), 1); + channel = grpc_channel_create_from_filters( + target, filters, 1, NULL, grpc_mdctx_create(), + grpc_workqueue_create(&call_list), 1, &call_list); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); GPR_ASSERT(elem->filter == &lame_filter); chand = (channel_data *)elem->channel_data; chand->error_code = error_code; chand->error_message = error_message; + grpc_call_list_run(&call_list); return channel; } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 4a75d03f0a..b5b9ee173e 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -108,8 +108,9 @@ static void on_secure_transport_setup_done(void *arg, c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, secure_endpoint, c->mdctx, 1); - grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); + c->args.channel_args, secure_endpoint, c->mdctx, 1, call_list); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0, + call_list); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); c->result->filters[0] = &grpc_http_client_filter; c->result->filters[1] = &grpc_client_auth_filter; @@ -187,12 +188,13 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) { gpr_ref(&f->refs); } -static void subchannel_factory_unref(grpc_subchannel_factory *scf) { +static void subchannel_factory_unref(grpc_subchannel_factory *scf, + grpc_call_list *call_list) { subchannel_factory *f = (subchannel_factory *)scf; if (gpr_unref(&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "subchannel_factory"); - GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory", call_list); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -279,7 +281,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(target, filters, n, args_copy, - mdctx, workqueue, 1); + mdctx, workqueue, 1, &call_list); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; @@ -299,7 +301,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver, &call_list); GRPC_RESOLVER_UNREF(resolver, "create", &call_list); - grpc_subchannel_factory_unref(&f->base); + grpc_subchannel_factory_unref(&f->base, &call_list); GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create"); grpc_channel_args_destroy(args_copy); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e38c6028d9..24545c67e1 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -57,9 +57,11 @@ typedef struct listener { void *arg; void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, - size_t pollset_count); - void (*destroy)(grpc_server *server, void *arg); + size_t pollset_count, grpc_call_list *call_list); + void (*destroy)(grpc_server *server, void *arg, grpc_closure *closure, + grpc_call_list *call_list); struct listener *next; + grpc_closure destroy_done; } listener; typedef struct call_data call_data; @@ -219,19 +221,19 @@ struct grpc_server { /** when did we print the last shutdown progress message */ gpr_timespec last_shutdown_message_time; - - grpc_workqueue *workqueue; }; #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) static void begin_call(grpc_server *server, call_data *calld, - requested_call *rc); -static void fail_call(grpc_server *server, requested_call *rc); + requested_call *rc, grpc_call_list *call_list); +static void fail_call(grpc_server *server, requested_call *rc, + grpc_call_list *call_list); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ -static void maybe_finish_shutdown(grpc_server *server); +static void maybe_finish_shutdown(grpc_server *server, + grpc_call_list *call_list); /* * channel broadcaster @@ -258,14 +260,15 @@ struct shutdown_cleanup_args { gpr_slice slice; }; -static void shutdown_cleanup(void *arg, int iomgr_status_ignored) { +static void shutdown_cleanup(void *arg, int iomgr_status_ignored, + grpc_call_list *call_list) { struct shutdown_cleanup_args *a = arg; gpr_slice_unref(a->slice); gpr_free(a); } static void send_shutdown(grpc_channel *channel, int send_goaway, - int send_disconnect) { + int send_disconnect, grpc_call_list *call_list) { grpc_transport_op op; struct shutdown_cleanup_args *sc; grpc_channel_element *elem; @@ -281,17 +284,17 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, op.on_consumed = &sc->closure; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(elem, &op); + elem->filter->start_transport_op(elem, &op, call_list); } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, - int force_disconnect) { + int send_goaway, int force_disconnect, + grpc_call_list *call_list) { size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], send_goaway, force_disconnect); - GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); + send_shutdown(cb->channels[i], send_goaway, force_disconnect, call_list); + GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast", call_list); } gpr_free(cb->channels); } @@ -311,12 +314,12 @@ static void request_matcher_destroy(request_matcher *request_matcher) { gpr_stack_lockfree_destroy(request_matcher->requests); } -static void kill_zombie(void *elem, int success) { +static void kill_zombie(void *elem, int success, grpc_call_list *call_list) { grpc_call_destroy(grpc_call_from_top_element(elem)); } static void request_matcher_zombify_all_pending_calls( - request_matcher *request_matcher, grpc_workqueue *workqueue) { + request_matcher *request_matcher, grpc_call_list *call_list) { while (request_matcher->pending_head) { call_data *calld = request_matcher->pending_head; request_matcher->pending_head = calld->pending_next; @@ -326,15 +329,16 @@ static void request_matcher_zombify_all_pending_calls( grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); } } static void request_matcher_kill_requests(grpc_server *server, - request_matcher *rm) { + request_matcher *rm, + grpc_call_list *call_list) { int request_id; while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { - fail_call(server, &server->requested_calls[request_id]); + fail_call(server, &server->requested_calls[request_id], call_list); } } @@ -346,7 +350,7 @@ static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } -static void server_delete(grpc_server *server) { +static void server_delete(grpc_server *server, grpc_call_list *call_list) { registered_method *rm; size_t i; grpc_channel_args_destroy(server->channel_args); @@ -365,7 +369,6 @@ static void server_delete(grpc_server *server) { } request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); - GRPC_WORKQUEUE_UNREF(server->workqueue, "destroy"); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -373,9 +376,9 @@ static void server_delete(grpc_server *server) { gpr_free(server); } -static void server_unref(grpc_server *server) { +static void server_unref(grpc_server *server, grpc_call_list *call_list) { if (gpr_unref(&server->internal_refcount)) { - server_delete(server); + server_delete(server, call_list); } } @@ -389,30 +392,29 @@ static void orphan_channel(channel_data *chand) { chand->next = chand->prev = chand; } -static void finish_destroy_channel(void *cd, int success) { +static void finish_destroy_channel(void *cd, int success, + grpc_call_list *call_list) { channel_data *chand = cd; grpc_server *server = chand->server; gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server"); - server_unref(server); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server", call_list); + server_unref(server, call_list); } -static void destroy_channel(channel_data *chand) { +static void destroy_channel(channel_data *chand, grpc_call_list *call_list) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != NULL); orphan_channel(chand); server_ref(chand->server); - maybe_finish_shutdown(chand->server); + maybe_finish_shutdown(chand->server, call_list); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - gpr_log(GPR_DEBUG, "queue finish_destroy_channel: %p on %p", chand->channel, - chand->server->workqueue); - grpc_workqueue_push(chand->server->workqueue, - &chand->finish_destroy_channel_closure, 1); + grpc_call_list_add(call_list, &chand->finish_destroy_channel_closure, 1); } static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, - request_matcher *request_matcher) { + request_matcher *request_matcher, + grpc_call_list *call_list) { call_data *calld = elem->call_data; int request_id; @@ -421,7 +423,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); return; } @@ -443,11 +445,11 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, &server->requested_calls[request_id]); + begin_call(server, calld, &server->requested_calls[request_id], call_list); } } -static void start_new_rpc(grpc_call_element *elem) { +static void start_new_rpc(grpc_call_element *elem, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_server *server = chand->server; @@ -466,7 +468,8 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; finish_start_new_rpc(server, elem, - &rm->server_registered_method->request_matcher); + &rm->server_registered_method->request_matcher, + call_list); return; } /* check for a wildcard method definition (no host set) */ @@ -478,11 +481,13 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->host != NULL) continue; if (rm->method != calld->path) continue; finish_start_new_rpc(server, elem, - &rm->server_registered_method->request_matcher); + &rm->server_registered_method->request_matcher, + call_list); return; } } - finish_start_new_rpc(server, elem, &server->unregistered_request_matcher); + finish_start_new_rpc(server, elem, &server->unregistered_request_matcher, + call_list); } static int num_listeners(grpc_server *server) { @@ -494,8 +499,9 @@ static int num_listeners(grpc_server *server) { return n; } -static void done_shutdown_event(void *server, grpc_cq_completion *completion) { - server_unref(server); +static void done_shutdown_event(void *server, grpc_cq_completion *completion, + grpc_call_list *call_list) { + server_unref(server, call_list); } static int num_channels(grpc_server *server) { @@ -508,25 +514,27 @@ static int num_channels(grpc_server *server) { return n; } -static void kill_pending_work_locked(grpc_server *server) { +static void kill_pending_work_locked(grpc_server *server, + grpc_call_list *call_list) { registered_method *rm; - request_matcher_kill_requests(server, &server->unregistered_request_matcher); + request_matcher_kill_requests(server, &server->unregistered_request_matcher, + call_list); request_matcher_zombify_all_pending_calls( - &server->unregistered_request_matcher, server->workqueue); + &server->unregistered_request_matcher, call_list); for (rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(&rm->request_matcher, - server->workqueue); + request_matcher_kill_requests(server, &rm->request_matcher, call_list); + request_matcher_zombify_all_pending_calls(&rm->request_matcher, call_list); } } -static void maybe_finish_shutdown(grpc_server *server) { +static void maybe_finish_shutdown(grpc_server *server, + grpc_call_list *call_list) { size_t i; if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } - kill_pending_work_locked(server); + kill_pending_work_locked(server, call_list); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { @@ -548,7 +556,7 @@ static void maybe_finish_shutdown(grpc_server *server) { server_ref(server); grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1, done_shutdown_event, server, - &server->shutdown_tags[i].completion); + &server->shutdown_tags[i].completion, call_list); } } @@ -566,10 +574,9 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void server_on_recv(void *ptr, int success) { +static void server_on_recv(void *ptr, int success, grpc_call_list *call_list) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; gpr_timespec op_deadline; if (success && !calld->got_initial_metadata) { @@ -587,7 +594,7 @@ static void server_on_recv(void *ptr, int success) { } if (calld->host && calld->path) { calld->got_initial_metadata = 1; - start_new_rpc(elem); + start_new_rpc(elem, call_list); } break; } @@ -604,8 +611,7 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_workqueue_push(chand->server->workqueue, - &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); } else { gpr_mu_unlock(&calld->mu_state); } @@ -616,8 +622,7 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_workqueue_push(chand->server->workqueue, - &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -629,7 +634,7 @@ static void server_on_recv(void *ptr, int success) { break; } - calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, call_list); } static void server_mutate_op(grpc_call_element *elem, @@ -646,10 +651,11 @@ static void server_mutate_op(grpc_call_element *elem, } static void server_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); - grpc_call_next_op(elem, op); + grpc_call_next_op(elem, op, call_list); } static void accept_stream(void *cd, grpc_transport *transport, @@ -660,7 +666,8 @@ static void accept_stream(void *cd, grpc_transport *transport, 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } -static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { +static void channel_connectivity_changed(void *cd, int iomgr_status_ignored, + grpc_call_list *call_list) { channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { @@ -670,18 +677,19 @@ static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { op.connectivity_state = &chand->connectivity_state; grpc_channel_next_op(grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - &op); + &op, call_list); } else { gpr_mu_lock(&server->mu_global); - destroy_channel(chand); + destroy_channel(chand, call_list); gpr_mu_unlock(&server->mu_global); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity"); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity", call_list); } } static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -696,7 +704,8 @@ static void init_call_elem(grpc_call_element *elem, if (initial_op) server_mutate_op(elem, initial_op); } -static void destroy_call_elem(grpc_call_element *elem) { +static void destroy_call_elem(grpc_call_element *elem, + grpc_call_list *call_list) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; @@ -711,13 +720,13 @@ static void destroy_call_elem(grpc_call_element *elem) { gpr_mu_destroy(&calld->mu_state); - server_unref(chand->server); + server_unref(chand->server, call_list); } static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, - int is_last) { + int is_last, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; GPR_ASSERT(is_first); GPR_ASSERT(!is_last); @@ -733,7 +742,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channel_connectivity_changed, chand); } -static void destroy_channel_elem(grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_channel_element *elem, + grpc_call_list *call_list) { size_t i; channel_data *chand = elem->channel_data; if (chand->registered_methods) { @@ -752,11 +762,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; - maybe_finish_shutdown(chand->server); + maybe_finish_shutdown(chand->server, call_list); gpr_mu_unlock(&chand->server->mu_global); GRPC_MDSTR_UNREF(chand->path_key); GRPC_MDSTR_UNREF(chand->authority_key); - server_unref(chand->server); + server_unref(chand->server, call_list); } } @@ -810,7 +820,6 @@ grpc_server *grpc_server_create_from_filters( gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; - server->workqueue = grpc_workqueue_create(); /* TODO(ctiller): expose a channel_arg for this */ server->max_requested_calls = 32768; @@ -881,23 +890,26 @@ void *grpc_server_register_method(grpc_server *server, const char *method, void grpc_server_start(grpc_server *server) { listener *l; size_t i; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); for (i = 0; i < server->cq_count; i++) { server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); - grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]); } for (l = server->listeners; l; l = l->next) { - l->start(server, l->arg, server->pollsets, server->cq_count); + l->start(server, l->arg, server->pollsets, server->cq_count, &call_list); } + + grpc_call_list_run(&call_list); } void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, grpc_workqueue *workqueue, - const grpc_channel_args *args) { + const grpc_channel_args *args, + grpc_call_list *call_list) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); @@ -927,11 +939,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, for (i = 0; i < s->cq_count; i++) { memset(&op, 0, sizeof(op)); op.bind_pollset = grpc_cq_pollset(s->cqs[i]); - grpc_transport_perform_op(transport, &op); + grpc_transport_perform_op(transport, &op, call_list); } channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, - mdctx, workqueue, 0); + mdctx, workqueue, 0, call_list); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; @@ -987,19 +999,30 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0; - grpc_transport_perform_op(transport, &op); + grpc_transport_perform_op(transport, &op, call_list); } -void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) { +void done_published_shutdown(void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list) { (void) done_arg; gpr_free(storage); } +static void listener_destroy_done(void *s, int success, + grpc_call_list *call_list) { + grpc_server *server = s; + gpr_mu_lock(&server->mu_global); + server->listeners_destroyed++; + maybe_finish_shutdown(server, call_list); + gpr_mu_unlock(&server->mu_global); +} + void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; shutdown_tag *sdt; channel_broadcaster broadcaster; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag); @@ -1008,9 +1031,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, grpc_cq_begin_op(cq); if (server->shutdown_published) { grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL, - gpr_malloc(sizeof(grpc_cq_completion))); + gpr_malloc(sizeof(grpc_cq_completion)), &call_list); gpr_mu_unlock(&server->mu_global); - return; + goto done; } server->shutdown_tags = gpr_realloc(server->shutdown_tags, @@ -1020,7 +1043,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, sdt->cq = cq; if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_unlock(&server->mu_global); - return; + goto done; } server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); @@ -1029,41 +1052,40 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - kill_pending_work_locked(server); + kill_pending_work_locked(server, &call_list); gpr_mu_unlock(&server->mu_call); gpr_atm_rel_store(&server->shutdown_flag, 1); - maybe_finish_shutdown(server); + maybe_finish_shutdown(server, &call_list); gpr_mu_unlock(&server->mu_global); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { - l->destroy(server, l->arg); + grpc_closure_init(&l->destroy_done, listener_destroy_done, server); + l->destroy(server, l->arg, &l->destroy_done, &call_list); } - channel_broadcaster_shutdown(&broadcaster, 1, 0); -} + channel_broadcaster_shutdown(&broadcaster, 1, 0, &call_list); -void grpc_server_listener_destroy_done(void *s) { - grpc_server *server = s; - gpr_mu_lock(&server->mu_global); - server->listeners_destroyed++; - maybe_finish_shutdown(server); - gpr_mu_unlock(&server->mu_global); +done: + grpc_call_list_run(&call_list); } void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster broadcaster; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&server->mu_global); channel_broadcaster_init(server, &broadcaster); gpr_mu_unlock(&server->mu_global); - channel_broadcaster_shutdown(&broadcaster, 0, 1); + channel_broadcaster_shutdown(&broadcaster, 0, 1, &call_list); + grpc_call_list_run(&call_list); } void grpc_server_destroy(grpc_server *server) { listener *l; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&server->mu_global); GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners); @@ -1077,16 +1099,17 @@ void grpc_server_destroy(grpc_server *server) { gpr_mu_unlock(&server->mu_global); - grpc_workqueue_flush(server->workqueue); - - server_unref(server); + server_unref(server, &call_list); + grpc_call_list_run(&call_list); } -void grpc_server_add_listener(grpc_server *server, void *arg, - void (*start)(grpc_server *server, void *arg, - grpc_pollset **pollsets, - size_t pollset_count), - void (*destroy)(grpc_server *server, void *arg)) { +void grpc_server_add_listener( + grpc_server *server, void *arg, + void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, + size_t pollset_count, grpc_call_list *call_list), + void (*destroy)(grpc_server *server, void *arg, grpc_closure *on_done, + grpc_call_list *call_list), + grpc_call_list *call_list) { listener *l = gpr_malloc(sizeof(listener)); l->arg = arg; l->start = start; @@ -1096,18 +1119,19 @@ void grpc_server_add_listener(grpc_server *server, void *arg, } static grpc_call_error queue_call_request(grpc_server *server, - requested_call *rc) { + requested_call *rc, + grpc_call_list *call_list) { call_data *calld = NULL; request_matcher *request_matcher = NULL; int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(server, rc); + fail_call(server, rc, call_list); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call(server, rc); + fail_call(server, rc, call_list); return GRPC_CALL_OK; } switch (rc->type) { @@ -1135,12 +1159,13 @@ static grpc_call_error queue_call_request(grpc_server *server, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, &server->requested_calls[request_id]); + begin_call(server, calld, &server->requested_calls[request_id], + call_list); } gpr_mu_lock(&server->mu_call); } @@ -1154,13 +1179,16 @@ grpc_call_error grpc_server_request_call( grpc_metadata_array *initial_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { + grpc_call_error error; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; requested_call *rc = gpr_malloc(sizeof(*rc)); GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); if (!grpc_cq_is_server_cq(cq_for_notification)) { gpr_free(rc); - return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + goto done; } grpc_cq_begin_op(cq_for_notification); details->reserved = NULL; @@ -1172,7 +1200,10 @@ grpc_call_error grpc_server_request_call( rc->call = call; rc->data.batch.details = details; rc->data.batch.initial_metadata = initial_metadata; - return queue_call_request(server, rc); + error = queue_call_request(server, rc, &call_list); +done: + grpc_call_list_run(&call_list); + return error; } grpc_call_error grpc_server_request_registered_call( @@ -1180,11 +1211,14 @@ grpc_call_error grpc_server_request_registered_call( grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { + grpc_call_error error; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; requested_call *rc = gpr_malloc(sizeof(*rc)); registered_method *registered_method = rm; if (!grpc_cq_is_server_cq(cq_for_notification)) { gpr_free(rc); - return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + goto done; } grpc_cq_begin_op(cq_for_notification); rc->type = REGISTERED_CALL; @@ -1197,12 +1231,16 @@ grpc_call_error grpc_server_request_registered_call( rc->data.registered.deadline = deadline; rc->data.registered.initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; - return queue_call_request(server, rc); + error = queue_call_request(server, rc, &call_list); +done: + grpc_call_list_run(&call_list); + return error; } -static void publish_registered_or_batch(grpc_call *call, int success, - void *tag); -static void publish_was_not_set(grpc_call *call, int success, void *tag) { +static void publish_registered_or_batch(grpc_call *call, int success, void *tag, + grpc_call_list *call_list); +static void publish_was_not_set(grpc_call *call, int success, void *tag, + grpc_call_list *call_list) { abort(); } @@ -1218,7 +1256,7 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { } static void begin_call(grpc_server *server, call_data *calld, - requested_call *rc) { + requested_call *rc, grpc_call_list *call_list) { grpc_ioreq_completion_func publish = publish_was_not_set; grpc_ioreq req[2]; grpc_ioreq *r = req; @@ -1229,7 +1267,7 @@ static void begin_call(grpc_server *server, call_data *calld, fill in the metadata array passed by the client, we need to perform an ioreq op, that should complete immediately. */ - grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); + grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call, call_list); *rc->call = calld->call; calld->cq_new = rc->cq_for_notification; switch (rc->type) { @@ -1265,10 +1303,11 @@ static void begin_call(grpc_server *server, call_data *calld, GRPC_CALL_INTERNAL_REF(calld->call, "server"); grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req), - publish, rc); + publish, rc, call_list); } -static void done_request_event(void *req, grpc_cq_completion *c) { +static void done_request_event(void *req, grpc_cq_completion *c, + grpc_call_list *call_list) { requested_call *rc = req; grpc_server *server = rc->server; @@ -1281,10 +1320,11 @@ static void done_request_event(void *req, grpc_cq_completion *c) { gpr_free(req); } - server_unref(server); + server_unref(server, call_list); } -static void fail_call(grpc_server *server, requested_call *rc) { +static void fail_call(grpc_server *server, requested_call *rc, + grpc_call_list *call_list) { *rc->call = NULL; switch (rc->type) { case BATCH_CALL: @@ -1296,11 +1336,11 @@ static void fail_call(grpc_server *server, requested_call *rc) { } server_ref(server); grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, - &rc->completion); + &rc->completion, call_list); } -static void publish_registered_or_batch(grpc_call *call, int success, - void *prc) { +static void publish_registered_or_batch(grpc_call *call, int success, void *prc, + grpc_call_list *call_list) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); requested_call *rc = prc; @@ -1308,7 +1348,7 @@ static void publish_registered_or_batch(grpc_call *call, int success, channel_data *chand = elem->channel_data; server_ref(chand->server); grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, - &rc->completion); + &rc->completion, call_list); GRPC_CALL_INTERNAL_UNREF(call, "server", call_list); } diff --git a/src/core/surface/server.h b/src/core/surface/server.h index f87296797c..2f2c5b8948 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -49,9 +49,9 @@ void grpc_server_add_listener( grpc_server *server, void *listener, void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t npollsets, grpc_call_list *call_list), - void (*destroy)(grpc_server *server, void *arg, grpc_call_list *call_list)); - -void grpc_server_listener_destroy_done(void *server); + void (*destroy)(grpc_server *server, void *arg, grpc_closure *on_done, + grpc_call_list *call_list), + grpc_call_list *call_list); /* Setup a transport - creates a channel stack, binds the transport to the server */ @@ -59,7 +59,8 @@ void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, grpc_workqueue *workqueue, - const grpc_channel_args *args); + const grpc_channel_args *args, + grpc_call_list *call_list); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 91cf6ece9c..df63d99dea 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -43,15 +43,17 @@ #include <grpc/support/useful.h> static void setup_transport(void *server, grpc_transport *transport, - grpc_mdctx *mdctx, grpc_workqueue *workqueue) { + grpc_mdctx *mdctx, grpc_workqueue *workqueue, + grpc_call_list *call_list) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, - grpc_server_get_channel_args(server)); + grpc_server_get_channel_args(server), call_list); } -static void new_transport(void *server, grpc_endpoint *tcp) { +static void new_transport(void *server, grpc_endpoint *tcp, + grpc_call_list *call_list) { /* * Beware that the call to grpc_create_chttp2_transport() has to happen before * grpc_tcp_server_destroy(). This is fine here, but similar code @@ -60,25 +62,27 @@ static void new_transport(void *server, grpc_endpoint *tcp) { * case. */ grpc_mdctx *mdctx = grpc_mdctx_create(); - grpc_workqueue *workqueue = grpc_workqueue_create(); + grpc_workqueue *workqueue = grpc_workqueue_create(call_list); grpc_transport *transport = grpc_create_chttp2_transport( - grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0); - setup_transport(server, transport, mdctx, workqueue); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_server_get_channel_args(server), tcp, mdctx, 0, call_list); + setup_transport(server, transport, mdctx, workqueue, call_list); + grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list); } /* Server callback: start listening on our ports */ static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, - size_t pollset_count) { + size_t pollset_count, grpc_call_list *call_list) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server); + grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server, + call_list); } /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ -static void destroy(grpc_server *server, void *tcpp) { +static void destroy(grpc_server *server, void *tcpp, grpc_closure *destroy_done, + grpc_call_list *call_list) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server); + grpc_tcp_server_destroy(tcp, destroy_done, call_list); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { @@ -88,6 +92,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { unsigned count = 0; int port_num = -1; int port_temp; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; resolved = grpc_blocking_resolve_address(addr, "http"); if (!resolved) { @@ -124,9 +129,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses_destroy(resolved); /* Register with the server only upon success */ - grpc_server_add_listener(server, tcp, start, destroy); - - return port_num; + grpc_server_add_listener(server, tcp, start, destroy, &call_list); + goto done; /* Error path: cleanup and return */ error: @@ -136,5 +140,9 @@ error: if (tcp) { grpc_tcp_server_destroy(tcp, NULL, NULL); } + port_num = 0; + +done: + grpc_call_list_run(&call_list); return 0; } diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 403358016d..ef12c910cd 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -71,7 +71,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index 23957b05ad..7530f0f644 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -36,6 +36,7 @@ /* Parser for GRPC streams embedded in DATA frames */ +#include "src/core/iomgr/iomgr.h" #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> #include "src/core/transport/stream_op.h" @@ -74,7 +75,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( frame */ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list); /* create a slice with an empty data frame and is_last set */ gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id); diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c index 09d4da234c..1c2bce6736 100644 --- a/src/core/transport/chttp2/frame_goaway.c +++ b/src/core/transport/chttp2/frame_goaway.c @@ -64,7 +64,8 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; diff --git a/src/core/transport/chttp2/frame_goaway.h b/src/core/transport/chttp2/frame_goaway.h index 9c5edfc821..ec991f4350 100644 --- a/src/core/transport/chttp2/frame_goaway.h +++ b/src/core/transport/chttp2/frame_goaway.h @@ -34,6 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_GOAWAY_H #define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_GOAWAY_H +#include "src/core/iomgr/iomgr.h" #include "src/core/transport/chttp2/frame.h" #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> @@ -66,7 +67,8 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame( grpc_chttp2_goaway_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list); void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code, gpr_slice debug_data, diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c index 10d1e0a523..2fb8850a45 100644 --- a/src/core/transport/chttp2/frame_ping.c +++ b/src/core/transport/chttp2/frame_ping.c @@ -70,7 +70,8 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; @@ -89,9 +90,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( for (ping = transport_parsing->pings.next; ping != &transport_parsing->pings; ping = ping->next) { if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) { - /* we know no locks are held here, we may as well just call up - * directly */ - ping->on_recv->cb(ping->on_recv->cb_arg, 1); + grpc_call_list_add(call_list, ping->on_recv, 1); } ping->next->prev = ping->prev; ping->prev->next = ping->next; diff --git a/src/core/transport/chttp2/frame_ping.h b/src/core/transport/chttp2/frame_ping.h index 99197e8352..70e19eb8ab 100644 --- a/src/core/transport/chttp2/frame_ping.h +++ b/src/core/transport/chttp2/frame_ping.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H #include <grpc/support/slice.h> +#include "src/core/iomgr/iomgr.h" #include "src/core/transport/chttp2/frame.h" typedef struct { @@ -49,6 +50,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame( grpc_chttp2_ping_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H */ diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c index 67da245239..7cf8abe88f 100644 --- a/src/core/transport/chttp2/frame_rst_stream.c +++ b/src/core/transport/chttp2/frame_rst_stream.c @@ -72,7 +72,8 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h index ed69e588af..17d57fae5e 100644 --- a/src/core/transport/chttp2/frame_rst_stream.h +++ b/src/core/transport/chttp2/frame_rst_stream.h @@ -36,6 +36,7 @@ #include <grpc/support/slice.h> #include "src/core/transport/chttp2/frame.h" +#include "src/core/iomgr/iomgr.h" typedef struct { gpr_uint8 byte; @@ -48,6 +49,7 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */ diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c index 54d3694a5c..78bd4bb09d 100644 --- a/src/core/transport/chttp2/frame_settings.c +++ b/src/core/transport/chttp2/frame_settings.c @@ -139,7 +139,8 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( void *p, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list) { grpc_chttp2_settings_parser *parser = p; const gpr_uint8 *cur = GPR_SLICE_START_PTR(slice); const gpr_uint8 *end = GPR_SLICE_END_PTR(slice); diff --git a/src/core/transport/chttp2/frame_settings.h b/src/core/transport/chttp2/frame_settings.h index 0ac68a9fa8..a04a28b7da 100644 --- a/src/core/transport/chttp2/frame_settings.h +++ b/src/core/transport/chttp2/frame_settings.h @@ -37,6 +37,7 @@ #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> #include "src/core/transport/chttp2/frame.h" +#include "src/core/iomgr/iomgr.h" typedef enum { GRPC_CHTTP2_SPS_ID0, @@ -95,6 +96,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame( gpr_uint32 *settings); grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */ diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index ea13969e8c..51eb261346 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -75,7 +75,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list) { gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; diff --git a/src/core/transport/chttp2/frame_window_update.h b/src/core/transport/chttp2/frame_window_update.h index deba801d00..7f1168e551 100644 --- a/src/core/transport/chttp2/frame_window_update.h +++ b/src/core/transport/chttp2/frame_window_update.h @@ -34,6 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H #define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H +#include "src/core/iomgr/iomgr.h" #include <grpc/support/slice.h> #include "src/core/transport/chttp2/frame.h" @@ -51,6 +52,7 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( gpr_uint8 flags); grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */ diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index 9c40e8a4e6..e3b8e54e8d 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -1379,7 +1379,8 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list) { grpc_chttp2_hpack_parser *parser = hpack_parser; if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice))) { diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index 4f489d67fb..c9ae6a9767 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -37,6 +37,7 @@ #include <stddef.h> #include <grpc/support/port_platform.h> +#include "src/core/iomgr/iomgr.h" #include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/hpack_table.h" #include "src/core/transport/metadata.h" @@ -108,6 +109,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p, the transport */ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_PARSER_H */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 386f2dd315..b9dbbc25ee 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -163,8 +163,6 @@ typedef struct grpc_chttp2_outstanding_ping { typedef struct { /** data to write next write */ gpr_slice_buffer qbuf; - /** queued callbacks */ - grpc_call_list run_at_unlock; /** window available for us to send to peer */ gpr_int64 outgoing_window; @@ -269,7 +267,8 @@ struct grpc_chttp2_transport_parsing { grpc_chttp2_stream_parsing *incoming_stream; grpc_chttp2_parse_error (*parser)( void *parser_user_data, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list); /* received settings */ gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS]; @@ -469,19 +468,23 @@ struct grpc_chttp2_stream { int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); void grpc_chttp2_perform_writes( - grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); -void grpc_chttp2_terminate_writing(void *transport_writing, int success); + grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint, + grpc_call_list *call_list); +void grpc_chttp2_terminate_writing(void *transport_writing, int success, + grpc_call_list *call_list); void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, - grpc_chttp2_transport_writing *writing); + grpc_chttp2_transport_writing *writing, + grpc_call_list *call_list); void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); /** Process one slice of incoming data; return 1 if the connection is still viable after reading, or 0 if the connection should be torn down */ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, - gpr_slice slice); + gpr_slice slice, grpc_call_list *call_list); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, - grpc_chttp2_transport_parsing *parsing); + grpc_chttp2_transport_parsing *parsing, + grpc_call_list *call_list); /** Get a writable stream returns non-zero if there was a stream available */ @@ -574,7 +577,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( void grpc_chttp2_add_incoming_goaway( grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, - gpr_slice goaway_text); + gpr_slice goaway_text, grpc_call_list *call_list); void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index f26f446787..2d95963f1f 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -59,7 +59,8 @@ static int init_skip_frame_parser( grpc_chttp2_transport_parsing *transport_parsing, int is_header); static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, - gpr_slice slice, int is_last); + gpr_slice slice, int is_last, + grpc_call_list *call_list); void grpc_chttp2_prepare_to_read( grpc_chttp2_transport_global *transport_global, @@ -90,9 +91,9 @@ void grpc_chttp2_prepare_to_read( } } -void grpc_chttp2_publish_reads( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing) { +void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_transport_parsing *transport_parsing, + grpc_call_list *call_list) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_parsing *stream_parsing; @@ -132,7 +133,7 @@ void grpc_chttp2_publish_reads( if (transport_parsing->goaway_received) { grpc_chttp2_add_incoming_goaway(transport_global, (gpr_uint32)transport_parsing->goaway_error, - transport_parsing->goaway_text); + transport_parsing->goaway_text, call_list); transport_parsing->goaway_text = gpr_empty_slice(); transport_parsing->goaway_received = 0; } @@ -235,7 +236,7 @@ void grpc_chttp2_publish_reads( } int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, - gpr_slice slice) { + gpr_slice slice, grpc_call_list *call_list) { gpr_uint8 *beg = GPR_SLICE_START_PTR(slice); gpr_uint8 *end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; @@ -364,7 +365,8 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, transport_parsing->incoming_stream_id; } if (transport_parsing->incoming_frame_size == 0) { - if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1)) { + if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1, + call_list)) { return 0; } transport_parsing->incoming_stream = NULL; @@ -384,7 +386,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, if (!parse_frame_slice(transport_parsing, gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), (size_t)(end - beg)), - 1)) { + 1, call_list)) { return 0; } transport_parsing->deframe_state = GRPC_DTS_FH_0; @@ -398,7 +400,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice_sub_no_ref( slice, cur_offset, cur_offset + transport_parsing->incoming_frame_size), - 1)) { + 1, call_list)) { return 0; } cur += transport_parsing->incoming_frame_size; @@ -408,7 +410,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, if (!parse_frame_slice(transport_parsing, gpr_slice_sub_no_ref(slice, (size_t)(cur - beg), (size_t)(end - beg)), - 0)) { + 0, call_list)) { return 0; } transport_parsing->incoming_frame_size -= (gpr_uint32)(end - cur); @@ -470,7 +472,8 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { static grpc_chttp2_parse_error skip_parser( void *parser, grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { + grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last, + grpc_call_list *call_list) { return GRPC_CHTTP2_PARSE_OK; } @@ -789,12 +792,13 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { */ static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, - gpr_slice slice, int is_last) { + gpr_slice slice, int is_last, + grpc_call_list *call_list) { grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, - is_last)) { + is_last, call_list)) { case GRPC_CHTTP2_PARSE_OK: if (stream_parsing) { grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index db6715b43a..18f4bfbc77 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -163,7 +163,8 @@ int grpc_chttp2_unlocking_check_writes( } void grpc_chttp2_perform_writes( - grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) { + grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint, + grpc_call_list *call_list) { GPR_ASSERT(transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams(transport_writing)); @@ -172,17 +173,8 @@ void grpc_chttp2_perform_writes( GPR_ASSERT(transport_writing->outbuf.count > 0); GPR_ASSERT(endpoint); - switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf, - &transport_writing->done_cb)) { - case GRPC_ENDPOINT_DONE: - grpc_chttp2_terminate_writing(transport_writing, 1); - break; - case GRPC_ENDPOINT_ERROR: - grpc_chttp2_terminate_writing(transport_writing, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_write(endpoint, &transport_writing->outbuf, + &transport_writing->done_cb, call_list); } static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { @@ -220,7 +212,8 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { void grpc_chttp2_cleanup_writing( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_transport_writing *transport_writing, + grpc_call_list *call_list) { grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; @@ -238,8 +231,7 @@ void grpc_chttp2_cleanup_writing( stream_global->outgoing_sopb->nops == 0) { GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE); stream_global->outgoing_sopb = NULL; - grpc_call_list_add(&transport_global->run_at_unlock, - stream_global->send_done_closure, 1); + grpc_call_list_add(call_list, stream_global->send_done_closure, 1); } } stream_global->writing_now = 0; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index bc056ac0b8..acd7cbdc1d 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -78,27 +78,31 @@ int grpc_flowctl_trace = 0; static const grpc_transport_vtable vtable; static void lock(grpc_chttp2_transport *t); -static void unlock(grpc_chttp2_transport *t); +static void unlock(grpc_chttp2_transport *t, grpc_call_list *call_list); -static void unlock_check_read_write_state(grpc_chttp2_transport *t); +static void unlock_check_read_write_state(grpc_chttp2_transport *t, + grpc_call_list *call_list); /* forward declarations of various callbacks that we'll build closures around */ -static void writing_action(void *t, int iomgr_success_ignored); +static void writing_action(void *t, int iomgr_success_ignored, + grpc_call_list *call_list); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); /** Endpoint callback to process incoming data */ -static void recv_data(void *tp, int success); +static void recv_data(void *tp, int success, grpc_call_list *call_list); /** Start disconnection chain */ -static void drop_connection(grpc_chttp2_transport *t); +static void drop_connection(grpc_chttp2_transport *t, + grpc_call_list *call_list); /** Perform a transport_op */ static void perform_stream_op_locked( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op); + grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op, + grpc_call_list *call_list); /** Cancel a stream: coming from the transport API */ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, @@ -112,23 +116,27 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_chttp2_transport *t, - grpc_pollset *pollset); + grpc_pollset *pollset, + grpc_call_list *call_list); static void add_to_pollset_set_locked(grpc_chttp2_transport *t, - grpc_pollset_set *pollset_set); + grpc_pollset_set *pollset_set, + grpc_call_list *call_list); /** Start new streams that have been created if we can */ static void maybe_start_some_streams( - grpc_chttp2_transport_global *transport_global); + grpc_chttp2_transport_global *transport_global, grpc_call_list *call_list); static void connectivity_state_set( grpc_chttp2_transport_global *transport_global, - grpc_connectivity_state state, const char *reason); + grpc_connectivity_state state, const char *reason, + grpc_call_list *call_list); /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ -static void destruct_transport(grpc_chttp2_transport *t) { +static void destruct_transport(grpc_chttp2_transport *t, + grpc_call_list *call_list) { size_t i; gpr_mu_lock(&t->mu); @@ -157,7 +165,8 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_stream_map_destroy(&t->parsing_stream_map); grpc_chttp2_stream_map_destroy(&t->new_stream_map); - grpc_connectivity_state_destroy(&t->channel_callback.state_tracker); + grpc_connectivity_state_destroy(&t->channel_callback.state_tracker, + call_list); gpr_mu_unlock(&t->mu); gpr_mu_destroy(&t->mu); @@ -166,7 +175,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - ping->on_recv->cb(ping->on_recv->cb_arg, 0); + grpc_call_list_add(call_list, ping->on_recv, 0); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -180,13 +189,13 @@ static void destruct_transport(grpc_chttp2_transport *t) { #ifdef REFCOUNTING_DEBUG #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) -#define UNREF_TRANSPORT(t, r) unref_transport(t, r, __FILE__, __LINE__) -static void unref_transport(grpc_chttp2_transport *t, const char *reason, - const char *file, int line) { +#define UNREF_TRANSPORT(t, r, cl) unref_transport(t, cl, r, __FILE__, __LINE__) +static void unref_transport(grpc_chttp2_transport *t, grpc_call_list *call_list, + const char *reason, const char *file, int line) { gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count, t->refs.count - 1, reason, file, line); if (!gpr_unref(&t->refs)) return; - destruct_transport(t); + destruct_transport(t, call_list); } static void ref_transport(grpc_chttp2_transport *t, const char *reason, @@ -197,10 +206,11 @@ static void ref_transport(grpc_chttp2_transport *t, const char *reason, } #else #define REF_TRANSPORT(t, r) ref_transport(t) -#define UNREF_TRANSPORT(t, r) unref_transport(t) -static void unref_transport(grpc_chttp2_transport *t) { +#define UNREF_TRANSPORT(t, r, cl) unref_transport(t, cl) +static void unref_transport(grpc_chttp2_transport *t, + grpc_call_list *call_list) { if (!gpr_unref(&t->refs)) return; - destruct_transport(t); + destruct_transport(t, call_list); } static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } @@ -209,7 +219,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_chttp2_transport *t, const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, - grpc_workqueue *workqueue, gpr_uint8 is_client) { + gpr_uint8 is_client, grpc_call_list *call_list) { size_t i; int j; @@ -329,15 +339,15 @@ static void init_transport(grpc_chttp2_transport *t, } } -static void destroy_transport(grpc_transport *gt) { +static void destroy_transport(grpc_transport *gt, grpc_call_list *call_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; lock(t); t->destroying = 1; - drop_connection(t); - unlock(t); + drop_connection(t, call_list); + unlock(t, call_list); - UNREF_TRANSPORT(t, "destroy"); + UNREF_TRANSPORT(t, "destroy", call_list); } /** block grpc_endpoint_shutdown being called until a paired @@ -347,45 +357,50 @@ static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { gpr_ref(&t->shutdown_ep_refs); } -static void allow_endpoint_shutdown_locked(grpc_chttp2_transport *t) { +static void allow_endpoint_shutdown_locked(grpc_chttp2_transport *t, + grpc_call_list *call_list) { if (gpr_unref(&t->shutdown_ep_refs)) { if (t->ep) { - grpc_endpoint_shutdown(t->ep); + grpc_endpoint_shutdown(t->ep, call_list); } } } -static void allow_endpoint_shutdown_unlocked(grpc_chttp2_transport *t) { +static void allow_endpoint_shutdown_unlocked(grpc_chttp2_transport *t, + grpc_call_list *call_list) { if (gpr_unref(&t->shutdown_ep_refs)) { gpr_mu_lock(&t->mu); if (t->ep) { - grpc_endpoint_shutdown(t->ep); + grpc_endpoint_shutdown(t->ep, call_list); } gpr_mu_unlock(&t->mu); } } -static void destroy_endpoint(grpc_chttp2_transport *t) { - grpc_endpoint_destroy(t->ep); +static void destroy_endpoint(grpc_chttp2_transport *t, + grpc_call_list *call_list) { + grpc_endpoint_destroy(t->ep, call_list); t->ep = NULL; - UNREF_TRANSPORT( - t, "disconnect"); /* safe because we'll still have the ref for write */ + /* safe because we'll still have the ref for write */ + UNREF_TRANSPORT(t, "disconnect", call_list); } -static void close_transport_locked(grpc_chttp2_transport *t) { +static void close_transport_locked(grpc_chttp2_transport *t, + grpc_call_list *call_list) { if (!t->closed) { t->closed = 1; connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE, - "close_transport"); + "close_transport", call_list); if (t->ep) { - allow_endpoint_shutdown_locked(t); + allow_endpoint_shutdown_locked(t, call_list); } } } static int init_stream(grpc_transport *gt, grpc_stream *gs, const void *server_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -416,13 +431,15 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->global.in_stream_map = 1; } - if (initial_op) perform_stream_op_locked(&t->global, &s->global, initial_op); - unlock(t); + if (initial_op) + perform_stream_op_locked(&t->global, &s->global, initial_op, call_list); + unlock(t, call_list); return 0; } -static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { +static void destroy_stream(grpc_transport *gt, grpc_stream *gs, + grpc_call_list *call_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; int i; @@ -433,7 +450,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(t); + close_transport_locked(t, call_list); } if (!t->parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, @@ -463,7 +480,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { grpc_chttp2_incoming_metadata_live_op_buffer_end( &s->global.outstanding_metadata); - UNREF_TRANSPORT(t, "stream"); + UNREF_TRANSPORT(t, "stream", call_list); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -498,21 +515,17 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } -static void unlock(grpc_chttp2_transport *t) { - grpc_call_list run = GRPC_CALL_LIST_INIT; - - unlock_check_read_write_state(t); +static void unlock(grpc_chttp2_transport *t, grpc_call_list *call_list) { + unlock_check_read_write_state(t, call_list); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); - grpc_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1); + grpc_call_list_add(call_list, &t->writing_action, 1); prevent_endpoint_shutdown(t); } - GPR_SWAP(grpc_call_list, run, t->global.run_at_unlock); gpr_mu_unlock(&t->mu); - grpc_call_list_run(run); } /* @@ -534,52 +547,54 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) { +void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success, + grpc_call_list *call_list) { grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); lock(t); - allow_endpoint_shutdown_locked(t); + allow_endpoint_shutdown_locked(t, call_list); if (!success) { - drop_connection(t); + drop_connection(t, call_list); } /* cleanup writing related jazz */ - grpc_chttp2_cleanup_writing(&t->global, &t->writing); + grpc_chttp2_cleanup_writing(&t->global, &t->writing, call_list); /* leave the writing flag up on shutdown to prevent further writes in unlock() from starting */ t->writing_active = 0; if (t->ep && !t->endpoint_reading) { - destroy_endpoint(t); + destroy_endpoint(t, call_list); } - unlock(t); + unlock(t, call_list); - UNREF_TRANSPORT(t, "writing"); + UNREF_TRANSPORT(t, "writing", call_list); } -static void writing_action(void *gt, int iomgr_success_ignored) { +static void writing_action(void *gt, int iomgr_success_ignored, + grpc_call_list *call_list) { grpc_chttp2_transport *t = gt; - grpc_chttp2_perform_writes(&t->writing, t->ep); + grpc_chttp2_perform_writes(&t->writing, t->ep, call_list); } void grpc_chttp2_add_incoming_goaway( grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error, - gpr_slice goaway_text) { + gpr_slice goaway_text, grpc_call_list *call_list) { char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg); gpr_free(msg); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE, - "got_goaway"); + "got_goaway", call_list); } static void maybe_start_some_streams( - grpc_chttp2_transport_global *transport_global) { + grpc_chttp2_transport_global *transport_global, grpc_call_list *call_list) { grpc_chttp2_stream_global *stream_global; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ @@ -601,7 +616,7 @@ static void maybe_start_some_streams( if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE, - "no_more_stream_ids"); + "no_more_stream_ids", call_list); } stream_global->outgoing_window = @@ -631,7 +646,8 @@ static void maybe_start_some_streams( static void perform_stream_op_locked( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { + grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op, + grpc_call_list *call_list) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(transport_global, stream_global, op->cancel_with_status); } @@ -658,14 +674,13 @@ static void perform_stream_op_locked( transport_global->is_client ? "CLI" : "SVR", stream_global)); grpc_chttp2_list_add_waiting_for_concurrency(transport_global, stream_global); - maybe_start_some_streams(transport_global); + maybe_start_some_streams(transport_global, call_list); } else if (stream_global->outgoing_window > 0) { grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } else { grpc_sopb_reset(op->send_ops); - grpc_call_list_add(&transport_global->run_at_unlock, - stream_global->send_done_closure, 0); + grpc_call_list_add(call_list, stream_global->send_done_closure, 0); } } @@ -700,20 +715,21 @@ static void perform_stream_op_locked( if (op->bind_pollset) { add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global), - op->bind_pollset); + op->bind_pollset, call_list); } - grpc_call_list_add(&transport_global->run_at_unlock, op->on_consumed, 1); + grpc_call_list_add(call_list, op->on_consumed, 1); } static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; lock(t); - perform_stream_op_locked(&t->global, &s->global, op); - unlock(t); + perform_stream_op_locked(&t->global, &s->global, op, call_list); + unlock(t, call_list); } static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { @@ -733,18 +749,19 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); } -static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { +static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op, + grpc_call_list *call_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; int close_transport = 0; lock(t); - grpc_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1); + grpc_call_list_add(call_list, op->on_consumed, 1); if (op->on_connectivity_state_change) { grpc_connectivity_state_notify_on_state_change( &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change, &t->global.run_at_unlock); + op->on_connectivity_state_change, call_list); } if (op->send_goaway) { @@ -763,11 +780,11 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->bind_pollset) { - add_to_pollset_locked(t, op->bind_pollset); + add_to_pollset_locked(t, op->bind_pollset, call_list); } if (op->bind_pollset_set) { - add_to_pollset_set_locked(t, op->bind_pollset_set); + add_to_pollset_set_locked(t, op->bind_pollset_set, call_list); } if (op->send_ping) { @@ -775,15 +792,15 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->disconnect) { - close_transport_locked(t); + close_transport_locked(t, call_list); } - unlock(t); + unlock(t, call_list); if (close_transport) { lock(t); - close_transport_locked(t); - unlock(t); + close_transport_locked(t, call_list); + unlock(t, call_list); } } @@ -799,7 +816,8 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, return GRPC_STREAM_OPEN; } -static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { +static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id, + grpc_call_list *call_list) { size_t new_stream_count; grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id); @@ -814,7 +832,7 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { grpc_chttp2_parsing_become_skip_parser(&t->parsing); } if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { - close_transport_locked(t); + close_transport_locked(t, call_list); } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + @@ -822,11 +840,12 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { GPR_ASSERT(new_stream_count <= GPR_UINT32_MAX); if (new_stream_count != t->global.concurrent_stream_count) { t->global.concurrent_stream_count = (gpr_uint32)new_stream_count; - maybe_start_some_streams(&t->global); + maybe_start_some_streams(&t->global, call_list); } } -static void unlock_check_read_write_state(grpc_chttp2_transport *t) { +static void unlock_check_read_write_state(grpc_chttp2_transport *t, + grpc_call_list *call_list) { grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global; grpc_stream_state state; @@ -840,7 +859,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { GPR_ASSERT(stream_global->in_stream_map); GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN); GPR_ASSERT(stream_global->read_closed); - remove_stream(t, stream_global->id); + remove_stream(t, stream_global->id, call_list); grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); } @@ -866,8 +885,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { if (stream_global->outgoing_sopb != NULL) { grpc_sopb_reset(stream_global->outgoing_sopb); stream_global->outgoing_sopb = NULL; - grpc_call_list_add(&transport_global->run_at_unlock, - stream_global->send_done_closure, 1); + grpc_call_list_add(call_list, stream_global->send_done_closure, 1); } stream_global->read_closed = 1; if (!stream_global->published_cancelled) { @@ -889,7 +907,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global); } else { - remove_stream(t, stream_global->id); + remove_stream(t, stream_global->id, call_list); } } if (!stream_global->publish_sopb) { @@ -917,8 +935,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { &stream_global->outstanding_metadata); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); stream_global->published_state = *stream_global->publish_state = state; - grpc_call_list_add(&transport_global->run_at_unlock, - stream_global->recv_done_closure, 1); + grpc_call_list_add(call_list, stream_global->recv_done_closure, 1); stream_global->recv_done_closure = NULL; stream_global->publish_sopb = NULL; stream_global->publish_state = NULL; @@ -1053,8 +1070,9 @@ static void end_all_the_calls(grpc_chttp2_transport *t) { grpc_chttp2_for_all_streams(&t->global, NULL, cancel_stream_cb); } -static void drop_connection(grpc_chttp2_transport *t) { - close_transport_locked(t); +static void drop_connection(grpc_chttp2_transport *t, + grpc_call_list *call_list) { + close_transport_locked(t, call_list); end_all_the_calls(t); } @@ -1079,17 +1097,19 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) { } } -static void read_error_locked(grpc_chttp2_transport *t) { +static void read_error_locked(grpc_chttp2_transport *t, + grpc_call_list *call_list) { t->endpoint_reading = 0; if (!t->writing_active && t->ep) { - destroy_endpoint(t); + destroy_endpoint(t, call_list); } } /* tcp read callback */ -static int recv_data_loop(grpc_chttp2_transport *t, int *success) { +static void recv_data(void *tp, int success, grpc_call_list *call_list) { size_t i; int keep_reading = 0; + grpc_chttp2_transport *t = tp; lock(t); i = 0; @@ -1102,12 +1122,13 @@ static int recv_data_loop(grpc_chttp2_transport *t, int *success) { grpc_chttp2_prepare_to_read(&t->global, &t->parsing); gpr_mu_unlock(&t->mu); for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]); + grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i], + call_list); i++) ; gpr_mu_lock(&t->mu); if (i != t->read_buffer.count) { - drop_connection(t); + drop_connection(t, call_list); } /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, @@ -1120,52 +1141,27 @@ static int recv_data_loop(grpc_chttp2_transport *t, int *success) { t->parsing.initial_window_update = 0; } /* handle higher level things */ - grpc_chttp2_publish_reads(&t->global, &t->parsing); + grpc_chttp2_publish_reads(&t->global, &t->parsing, call_list); t->parsing_active = 0; } - if (!*success || i != t->read_buffer.count) { - drop_connection(t); - read_error_locked(t); + if (!success || i != t->read_buffer.count) { + drop_connection(t, call_list); + read_error_locked(t, call_list); } else if (!t->closed) { keep_reading = 1; REF_TRANSPORT(t, "keep_reading"); prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); - unlock(t); + unlock(t, call_list); if (keep_reading) { - int ret = -1; - switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) { - case GRPC_ENDPOINT_DONE: - *success = 1; - ret = 1; - break; - case GRPC_ENDPOINT_ERROR: - *success = 0; - ret = 1; - break; - case GRPC_ENDPOINT_PENDING: - ret = 0; - break; - } - allow_endpoint_shutdown_unlocked(t); - UNREF_TRANSPORT(t, "keep_reading"); - return ret; + grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data, call_list); + allow_endpoint_shutdown_unlocked(t, call_list); + UNREF_TRANSPORT(t, "keep_reading", call_list); } else { - UNREF_TRANSPORT(t, "recv_data"); - return 0; + UNREF_TRANSPORT(t, "recv_data", call_list); } - - gpr_log(GPR_ERROR, "should never reach here"); - abort(); -} - -static void recv_data(void *tp, int success) { - grpc_chttp2_transport *t = tp; - - while (recv_data_loop(t, &success)) - ; } /* @@ -1174,12 +1170,13 @@ static void recv_data(void *tp, int success) { static void connectivity_state_set( grpc_chttp2_transport_global *transport_global, - grpc_connectivity_state state, const char *reason) { + grpc_connectivity_state state, const char *reason, + grpc_call_list *call_list) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set( &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, - state, reason, &transport_global->run_at_unlock); + state, reason, call_list); } /* @@ -1187,16 +1184,18 @@ static void connectivity_state_set( */ static void add_to_pollset_locked(grpc_chttp2_transport *t, - grpc_pollset *pollset) { + grpc_pollset *pollset, + grpc_call_list *call_list) { if (t->ep) { - grpc_endpoint_add_to_pollset(t->ep, pollset); + grpc_endpoint_add_to_pollset(t->ep, pollset, call_list); } } static void add_to_pollset_set_locked(grpc_chttp2_transport *t, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_call_list *call_list) { if (t->ep) { - grpc_endpoint_add_to_pollset_set(t->ep, pollset_set); + grpc_endpoint_add_to_pollset_set(t->ep, pollset_set, call_list); } } @@ -1235,7 +1234,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, * INTEGRATION GLUE */ -static char *chttp2_get_peer(grpc_transport *t) { +static char *chttp2_get_peer(grpc_transport *t, grpc_call_list *call_list) { return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); } @@ -1249,16 +1248,17 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, - grpc_workqueue *workqueue, int is_client) { + int is_client, grpc_call_list *call_list) { grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); - init_transport(t, channel_args, ep, mdctx, workqueue, is_client != 0); + init_transport(t, channel_args, ep, mdctx, is_client != 0, call_list); return &t->base; } void grpc_chttp2_transport_start_reading(grpc_transport *transport, - gpr_slice *slices, size_t nslices) { + gpr_slice *slices, size_t nslices, + grpc_call_list *call_list) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); - recv_data(t, 1); + recv_data(t, 1, call_list); } diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h index fa0d6e4151..e963e16707 100644 --- a/src/core/transport/chttp2_transport.h +++ b/src/core/transport/chttp2_transport.h @@ -42,9 +42,10 @@ extern int grpc_flowctl_trace; grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, - grpc_mdctx *metadata_context, int is_client); + grpc_mdctx *metadata_context, int is_client, grpc_call_list *call_list); void grpc_chttp2_transport_start_reading(grpc_transport *transport, - gpr_slice *slices, size_t nslices); + gpr_slice *slices, size_t nslices, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 8b2e1b9b02..dc8392159b 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -66,7 +66,8 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, tracker->name = gpr_strdup(name); } -void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker, + grpc_call_list *call_list) { int success; grpc_connectivity_state_watcher *w; while ((w = tracker->watchers)) { @@ -78,7 +79,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { } else { success = 0; } - w->notify->cb(w->notify->cb_arg, success); + grpc_call_list_add(call_list, w->notify, success); gpr_free(w); } gpr_free(tracker->name); diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index a29c655b4e..3f92b22d5d 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -61,7 +61,8 @@ extern int grpc_connectivity_state_trace; void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state, const char *name); -void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker, + grpc_call_list *call_list); /** Set connectivity state; not thread safe; access must be serialized with an * external lock */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 3f6b93c3e8..f7c87c1e90 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -40,48 +40,47 @@ size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } -void grpc_transport_destroy(grpc_transport *transport) { - transport->vtable->destroy(transport); +void grpc_transport_destroy(grpc_transport *transport, + grpc_call_list *call_list) { + transport->vtable->destroy(transport, call_list); } int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, const void *server_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { return transport->vtable->init_stream(transport, stream, server_data, - initial_op); + initial_op, call_list); } void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op) { - transport->vtable->perform_stream_op(transport, stream, op); + grpc_transport_stream_op *op, + grpc_call_list *call_list) { + transport->vtable->perform_stream_op(transport, stream, op, call_list); } -void grpc_transport_perform_op(grpc_transport *transport, - grpc_transport_op *op) { - transport->vtable->perform_op(transport, op); +void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op, + grpc_call_list *call_list) { + transport->vtable->perform_op(transport, op, call_list); } void grpc_transport_destroy_stream(grpc_transport *transport, - grpc_stream *stream) { - transport->vtable->destroy_stream(transport, stream); + grpc_stream *stream, + grpc_call_list *call_list) { + transport->vtable->destroy_stream(transport, stream, call_list); } -char *grpc_transport_get_peer(grpc_transport *transport) { - return transport->vtable->get_peer(transport); +char *grpc_transport_get_peer(grpc_transport *transport, + grpc_call_list *call_list) { + return transport->vtable->get_peer(transport, call_list); } -void grpc_transport_stream_op_finish_with_failure( - grpc_transport_stream_op *op) { - if (op->send_ops) { - op->on_done_send->cb(op->on_done_send->cb_arg, 0); - } - if (op->recv_ops) { - op->on_done_recv->cb(op->on_done_recv->cb_arg, 0); - } - if (op->on_consumed) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); - } +void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op, + grpc_call_list *call_list) { + grpc_call_list_add(call_list, op->on_done_recv, 0); + grpc_call_list_add(call_list, op->on_done_send, 0); + grpc_call_list_add(call_list, op->on_consumed, 0); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, @@ -105,11 +104,12 @@ typedef struct { grpc_closure closure; } close_message_data; -static void free_message(void *p, int iomgr_success) { +static void free_message(void *p, int iomgr_success, + grpc_call_list *call_list) { close_message_data *cmd = p; gpr_slice_unref(cmd->message); if (cmd->then_call != NULL) { - cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success); + cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success, call_list); } gpr_free(cmd); } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 271891d430..d5383ad11a 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -136,7 +136,8 @@ size_t grpc_transport_stream_size(grpc_transport *transport); supplied from the accept_stream callback function */ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, const void *server_data, - grpc_transport_stream_op *initial_op); + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list); /* Destroy transport data for a stream. @@ -149,9 +150,11 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, stream - the grpc_stream to destroy (memory is still owned by the caller, but any child memory must be cleaned up) */ void grpc_transport_destroy_stream(grpc_transport *transport, - grpc_stream *stream); + grpc_stream *stream, + grpc_call_list *call_list); -void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op); +void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op, + grpc_call_list *call_list); void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, grpc_status_code status); @@ -173,10 +176,11 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); op - a grpc_transport_stream_op specifying the op to perform */ void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, - grpc_transport_stream_op *op); + grpc_transport_stream_op *op, + grpc_call_list *call_list); -void grpc_transport_perform_op(grpc_transport *transport, - grpc_transport_op *op); +void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op, + grpc_call_list *call_list); /* Send a ping on a transport @@ -191,9 +195,11 @@ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ -void grpc_transport_destroy(grpc_transport *transport); +void grpc_transport_destroy(grpc_transport *transport, + grpc_call_list *call_list); /* Get the transports peer */ -char *grpc_transport_get_peer(grpc_transport *transport); +char *grpc_transport_get_peer(grpc_transport *transport, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index d3bbdf6c27..9adb16b941 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -44,23 +44,27 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_init_stream */ int (*init_stream)(grpc_transport *self, grpc_stream *stream, const void *server_data, - grpc_transport_stream_op *initial_op); + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list); /* implementation of grpc_transport_perform_stream_op */ void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream, - grpc_transport_stream_op *op); + grpc_transport_stream_op *op, + grpc_call_list *call_list); /* implementation of grpc_transport_perform_op */ - void (*perform_op)(grpc_transport *self, grpc_transport_op *op); + void (*perform_op)(grpc_transport *self, grpc_transport_op *op, + grpc_call_list *call_list); /* implementation of grpc_transport_destroy_stream */ - void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); + void (*destroy_stream)(grpc_transport *self, grpc_stream *stream, + grpc_call_list *call_list); /* implementation of grpc_transport_destroy */ - void (*destroy)(grpc_transport *self); + void (*destroy)(grpc_transport *self, grpc_call_list *call_list); /* implementation of grpc_transport_get_peer */ - char *(*get_peer)(grpc_transport *self); + char *(*get_peer)(grpc_transport *self, grpc_call_list *call_list); } grpc_transport_vtable; /* an instance of a grpc transport */ |