diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-21 14:39:57 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-21 14:39:57 -0700 |
commit | dfff1b8126a1f83833fd99626517f28d1e68453a (patch) | |
tree | 40a9aa8126c08a11fb1a5cdd4058f504e05dca43 /src/core/surface | |
parent | 3ffd8220a17fd2fdf64adc66b03e4e254880471b (diff) |
Call list progress
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 22 | ||||
-rw-r--r-- | src/core/surface/call.h | 3 | ||||
-rw-r--r-- | src/core/surface/channel.c | 19 | ||||
-rw-r--r-- | src/core/surface/channel.h | 14 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 40 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 60 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 32 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 6 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 40 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 14 | ||||
-rw-r--r-- | src/core/surface/server.c | 288 | ||||
-rw-r--r-- | src/core/surface/server.h | 9 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 36 |
13 files changed, 347 insertions, 236 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index f2f8f0a6ed..bbaf65759d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -356,7 +356,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, initial_op_ptr = &initial_op; } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, - CALL_STACK_FROM_CALL(call)); + CALL_STACK_FROM_CALL(call), &call_list); if (parent_call != NULL) { GRPC_CALL_INTERNAL_REF(parent_call, "child"); GPR_ASSERT(call->is_client); @@ -459,7 +459,7 @@ static void destroy_call(grpc_call *call, grpc_call_list *call_list) { size_t i; grpc_call *c = call; grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), call_list); - GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); + GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call", call_list); gpr_mu_destroy(&c->mu); gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { @@ -673,7 +673,8 @@ static void unlock(grpc_call *call, grpc_call_list *call_list) { if (completing_requests > 0) { for (i = 0; i < completing_requests; i++) { completed_requests[i].on_complete(call, completed_requests[i].success, - completed_requests[i].user_data); + completed_requests[i].user_data, + call_list); } lock(call); call->completing = 0; @@ -1556,14 +1557,16 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); } -static void finish_batch(grpc_call *call, int success, void *tag) { +static void finish_batch(grpc_call *call, int success, void *tag, + grpc_call_list *call_list) { grpc_cq_end_op(call->cq, tag, success, done_completion, call, - allocate_completion(call)); + allocate_completion(call), call_list); } -static void finish_batch_with_close(grpc_call *call, int success, void *tag) { +static void finish_batch_with_close(grpc_call *call, int success, void *tag, + grpc_call_list *call_list) { grpc_cq_end_op(call->cq, tag, 1, done_completion, call, - allocate_completion(call)); + allocate_completion(call), call_list); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1581,7 +1584,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; - void (*finish_func)(grpc_call *, int, void *) = finish_batch; + void (*finish_func)(grpc_call *, int, void *, grpc_call_list *) = + finish_batch; grpc_call_error error; grpc_call_list call_list = GRPC_CALL_LIST_INIT; @@ -1596,7 +1600,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_cq_begin_op(call->cq); GRPC_CALL_INTERNAL_REF(call, "completion"); grpc_cq_end_op(call->cq, tag, 1, done_completion, call, - allocate_completion(call)); + allocate_completion(call), &call_list); error = GRPC_CALL_OK; goto done; } diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 7a7178bc7b..144aa7cef2 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -87,7 +87,8 @@ typedef struct { } grpc_ioreq; typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, - void *user_data); + void *user_data, + grpc_call_list *call_list); grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index fdba09fcce..46bea13936 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -272,9 +272,9 @@ void grpc_channel_internal_ref(grpc_channel *c) { gpr_ref(&c->refs); } -static void destroy_channel(grpc_channel *channel) { +static void destroy_channel(grpc_channel *channel, grpc_call_list *call_list) { size_t i; - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel), call_list); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]); } @@ -303,26 +303,31 @@ static void destroy_channel(grpc_channel *channel) { } #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG -void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) { +void grpc_channel_internal_unref(grpc_channel *channel, const char *reason, + grpc_call_list *call_list) { gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel, channel->refs.count, channel->refs.count - 1, reason); #else -void grpc_channel_internal_unref(grpc_channel *channel) { +void grpc_channel_internal_unref(grpc_channel *channel, + grpc_call_list *call_list) { #endif if (gpr_unref(&channel->refs)) { - destroy_channel(channel); + destroy_channel(channel, call_list); } } void grpc_channel_destroy(grpc_channel *channel) { grpc_transport_op op; grpc_channel_element *elem; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; memset(&op, 0, sizeof(op)); op.disconnect = 1; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - elem->filter->start_transport_op(elem, &op); + elem->filter->start_transport_op(elem, &op, &call_list); - GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel"); + GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel", &call_list); + + grpc_call_list_run(&call_list); } grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 664ecc1c5a..3f51164fcc 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -67,18 +67,20 @@ grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel); #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); -void grpc_channel_internal_unref(grpc_channel *channel, const char *reason); +void grpc_channel_internal_unref(grpc_channel *channel, const char *reason, + grpc_call_list *call_list); #define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \ grpc_channel_internal_ref(channel, reason) -#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \ - grpc_channel_internal_unref(channel, reason) +#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, call_list) \ + grpc_channel_internal_unref(channel, reason, call_list) #else void grpc_channel_internal_ref(grpc_channel *channel); -void grpc_channel_internal_unref(grpc_channel *channel); +void grpc_channel_internal_unref(grpc_channel *channel, + grpc_call_list *call_list); #define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \ grpc_channel_internal_ref(channel) -#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \ - grpc_channel_internal_unref(channel) +#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, call_list) \ + grpc_channel_internal_unref(channel, call_list) #endif #endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 4d0cf1ed8b..7891669e35 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -56,7 +56,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( } state = grpc_client_channel_check_connectivity_state( client_channel_elem, try_to_connect, &call_list); - grpc_call_list_run(call_list); + grpc_call_list_run(&call_list); return state; } @@ -80,17 +80,18 @@ typedef struct { void *tag; } state_watcher; -static void delete_state_watcher(state_watcher *w) { +static void delete_state_watcher(state_watcher *w, grpc_call_list *call_list) { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(w->channel)); grpc_client_channel_del_interested_party(client_channel_elem, - grpc_cq_pollset(w->cq)); - GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity"); + grpc_cq_pollset(w->cq), call_list); + GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity", call_list); gpr_mu_destroy(&w->mu); gpr_free(w); } -static void finished_completion(void *pw, grpc_cq_completion *ignored) { +static void finished_completion(void *pw, grpc_cq_completion *ignored, + grpc_call_list *call_list) { int delete = 0; state_watcher *w = pw; gpr_mu_lock(&w->mu); @@ -110,18 +111,19 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) { gpr_mu_unlock(&w->mu); if (delete) { - delete_state_watcher(w); + delete_state_watcher(w, call_list); } } -static void partly_done(state_watcher *w, int due_to_completion) { +static void partly_done(state_watcher *w, int due_to_completion, + grpc_call_list *call_list) { int delete = 0; if (due_to_completion) { gpr_mu_lock(&w->mu); w->success = 1; gpr_mu_unlock(&w->mu); - grpc_alarm_cancel(&w->alarm); + grpc_alarm_cancel(&w->alarm, call_list); } gpr_mu_lock(&w->mu); @@ -129,7 +131,7 @@ static void partly_done(state_watcher *w, int due_to_completion) { case WAITING: w->phase = CALLING_BACK; grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w, - &w->completion_storage); + &w->completion_storage, call_list); break; case CALLING_BACK: w->phase = CALLING_BACK_AND_FINISHED; @@ -145,13 +147,17 @@ static void partly_done(state_watcher *w, int due_to_completion) { gpr_mu_unlock(&w->mu); if (delete) { - delete_state_watcher(w); + delete_state_watcher(w, call_list); } } -static void watch_complete(void *pw, int success) { partly_done(pw, 1); } +static void watch_complete(void *pw, int success, grpc_call_list *call_list) { + partly_done(pw, 1, call_list); +} -static void timeout_complete(void *pw, int success) { partly_done(pw, 0); } +static void timeout_complete(void *pw, int success, grpc_call_list *call_list) { + partly_done(pw, 0, call_list); +} void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, @@ -172,9 +178,9 @@ void grpc_channel_watch_connectivity_state( w->tag = tag; w->channel = channel; - grpc_alarm_init(&w->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_alarm_init( + &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC), &call_list); if (client_channel_elem->filter != &grpc_client_channel_filter) { gpr_log(GPR_ERROR, @@ -185,10 +191,10 @@ void grpc_channel_watch_connectivity_state( } else { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); grpc_client_channel_add_interested_party(client_channel_elem, - grpc_cq_pollset(cq)); + grpc_cq_pollset(cq), &call_list); grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, &w->on_complete, &call_list); } - grpc_call_list_run(call_list); + grpc_call_list_run(&call_list); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index f6f42b3d7a..7ac76da10a 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -55,6 +55,13 @@ typedef struct { grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; + + grpc_endpoint *tcp; + + grpc_mdctx *mdctx; + grpc_workqueue *workqueue; + + grpc_closure connected; } connector; static void connector_ref(grpc_connector *con) { @@ -62,21 +69,24 @@ static void connector_ref(grpc_connector *con) { gpr_ref(&c->refs); } -static void connector_unref(grpc_connector *con) { +static void connector_unref(grpc_connector *con, grpc_call_list *call_list) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { + grpc_mdctx_unref(c->mdctx); + GRPC_WORKQUEUE_UNREF(c->workqueue, "connector", call_list); gpr_free(c); } } -static void connected(void *arg, grpc_endpoint *tcp) { +static void connected(void *arg, int success, grpc_call_list *call_list) { connector *c = arg; grpc_closure *notify; + grpc_endpoint *tcp = c->tcp; if (tcp != NULL) { c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue, - 1); - grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); + c->args.channel_args, tcp, c->mdctx, 1, call_list); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0, + call_list); GPR_ASSERT(c->result->transport); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); c->result->filters[0] = &grpc_http_client_filter; @@ -86,24 +96,27 @@ static void connected(void *arg, grpc_endpoint *tcp) { } notify = c->notify; c->notify = NULL; - notify->cb(notify->cb_arg, 1); + notify->cb(notify->cb_arg, 1, call_list); } -static void connector_shutdown(grpc_connector *con) {} +static void connector_shutdown(grpc_connector *con, grpc_call_list *call_list) { +} static void connector_connect(grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, - grpc_closure *notify) { + grpc_closure *notify, grpc_call_list *call_list) { connector *c = (connector *)con; GPR_ASSERT(c->notify == NULL); GPR_ASSERT(notify->cb); c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, - args->workqueue, args->addr, args->addr_len, - args->deadline); + c->tcp = NULL; + grpc_closure_init(&c->connected, connected, c); + grpc_tcp_client_connect(&c->connected, &c->tcp, args->interested_parties, + args->addr, args->addr_len, args->deadline, + call_list); } static const grpc_connector_vtable connector_vtable = { @@ -122,10 +135,11 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) { gpr_ref(&f->refs); } -static void subchannel_factory_unref(grpc_subchannel_factory *scf) { +static void subchannel_factory_unref(grpc_subchannel_factory *scf, + grpc_call_list *call_list) { subchannel_factory *f = (subchannel_factory *)scf; if (gpr_unref(&f->refs)) { - GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory", call_list); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -133,7 +147,8 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) { } static grpc_subchannel *subchannel_factory_create_subchannel( - grpc_subchannel_factory *scf, grpc_subchannel_args *args) { + grpc_subchannel_factory *scf, grpc_subchannel_args *args, + grpc_call_list *call_list) { subchannel_factory *f = (subchannel_factory *)scf; connector *c = gpr_malloc(sizeof(*c)); grpc_channel_args *final_args = @@ -146,7 +161,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( args->args = final_args; args->master = f->master; s = grpc_subchannel_create(&c->base, args); - grpc_connector_unref(&c->base); + grpc_connector_unref(&c->base, call_list); grpc_channel_args_destroy(final_args); return s; } @@ -168,7 +183,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_resolver *resolver; subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); - grpc_workqueue *workqueue = grpc_workqueue_create(); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_workqueue *workqueue = grpc_workqueue_create(&call_list); size_t n = 0; GPR_ASSERT(!reserved); if (grpc_channel_args_is_census_enabled(args)) { @@ -179,7 +195,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx, - workqueue, 1); + workqueue, 1, &call_list); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; @@ -189,15 +205,17 @@ grpc_channel *grpc_insecure_channel_create(const char *target, f->merge_args = grpc_channel_args_copy(args); f->master = channel; GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); - resolver = grpc_resolver_create(target, &f->base, workqueue); + resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), - resolver); - GRPC_RESOLVER_UNREF(resolver, "create"); - grpc_subchannel_factory_unref(&f->base); + resolver, &call_list); + GRPC_RESOLVER_UNREF(resolver, "create", &call_list); + grpc_subchannel_factory_unref(&f->base, &call_list); + + grpc_call_list_run(&call_list); return channel; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b58115a93f..49dfc3c0e1 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -67,8 +67,12 @@ struct grpc_completion_queue { int is_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; + grpc_closure pollset_destroy_done; }; +static void on_pollset_destroy_done(void *cc, int success, + grpc_call_list *call_list); + grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); GPR_ASSERT(!reserved); @@ -80,6 +84,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_pollset_init(&cc->pollset); cc->completed_tail = &cc->completed_head; cc->completed_head.next = (gpr_uintptr)cc->completed_tail; + grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc); return cc; } @@ -94,7 +99,8 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { gpr_ref(&cc->owning_refs); } -static void on_pollset_destroy_done(void *arg) { +static void on_pollset_destroy_done(void *arg, int success, + grpc_call_list *call_list) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } @@ -127,8 +133,10 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) { event, then enter shutdown mode */ /* Queue a GRPC_OP_COMPLETED operation */ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, - void (*done)(void *done_arg, grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { + void (*done)(void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list), + void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list) { int shutdown; int i; grpc_pollset_worker *pluck_worker; @@ -162,7 +170,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, call_list); } } @@ -172,6 +180,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_pollset_worker worker; int first_loop = 1; gpr_timespec now; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GPR_ASSERT(!reserved); @@ -190,7 +199,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(c->done_arg, c, &call_list); break; } if (cc->shutdown) { @@ -207,10 +216,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } first_loop = 0; - grpc_pollset_work(&cc->pollset, &worker, now, deadline); + grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); + grpc_call_list_run(&call_list); return ret; } @@ -247,6 +257,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_pollset_worker worker; gpr_timespec now; int first_loop = 1; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GPR_ASSERT(!reserved); @@ -268,7 +279,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(c->done_arg, c, &call_list); goto done; } prev = c; @@ -299,18 +310,20 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, break; } first_loop = 0; - grpc_pollset_work(&cc->pollset, &worker, now, deadline); + grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list); del_plucker(cc, tag, &worker); } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); + grpc_call_list_run(&call_list); return ret; } /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); if (cc->shutdown_called) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); @@ -324,8 +337,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, &call_list); } + grpc_call_list_run(&call_list); } void grpc_completion_queue_destroy(grpc_completion_queue *cc) { diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 793baff03a..6d8d1ce959 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -44,7 +44,8 @@ typedef struct grpc_cq_completion { void *tag; /** done callback - called when this queue element is no longer needed by the completion queue */ - void (*done)(void *done_arg, struct grpc_cq_completion *c); + void (*done)(void *done_arg, struct grpc_cq_completion *c, + grpc_call_list *call_list); void *done_arg; /** next pointer; low bit is used to indicate success or not */ gpr_uintptr next; @@ -74,7 +75,8 @@ void grpc_cq_begin_op(grpc_completion_queue *cc); void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, void (*done)(void *done_arg, grpc_cq_completion *storage, grpc_call_list *call_list), - void *done_arg, grpc_cq_completion *storage); + void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index a5de900eff..c5cf33f1f9 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -55,13 +55,14 @@ typedef struct { } channel_data; static void lame_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->send_ops != NULL) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(op->on_done_send->cb_arg, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0, call_list); } if (op->recv_ops != NULL) { char tmp[GPR_LTOA_MIN_BUFSIZE]; @@ -80,44 +81,48 @@ static void lame_start_transport_stream_op(grpc_call_element *elem, mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 1, call_list); } if (op->on_consumed != NULL) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0, call_list); } } -static char *lame_get_peer(grpc_call_element *elem) { +static char *lame_get_peer(grpc_call_element *elem, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; return grpc_channel_get_target(chand->master); } static void lame_start_transport_op(grpc_channel_element *elem, - grpc_transport_op *op) { + grpc_transport_op *op, + grpc_call_list *call_list) { if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; op->on_connectivity_state_change->cb( - op->on_connectivity_state_change->cb_arg, 1); + op->on_connectivity_state_change->cb_arg, 1, call_list); } if (op->on_consumed != NULL) { - op->on_consumed->cb(op->on_consumed->cb_arg, 1); + op->on_consumed->cb(op->on_consumed->cb_arg, 1, call_list); } } static void init_call_elem(grpc_call_element *elem, const void *transport_server_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { if (initial_op) { - grpc_transport_stream_op_finish_with_failure(initial_op); + grpc_transport_stream_op_finish_with_failure(initial_op, call_list); } } -static void destroy_call_elem(grpc_call_element *elem) {} +static void destroy_call_elem(grpc_call_element *elem, + grpc_call_list *call_list) {} static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + int is_first, int is_last, + grpc_call_list *call_list) { channel_data *chand = elem->channel_data; GPR_ASSERT(is_first); GPR_ASSERT(is_last); @@ -125,7 +130,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, chand->master = master; } -static void destroy_channel_elem(grpc_channel_element *elem) {} +static void destroy_channel_elem(grpc_channel_element *elem, + grpc_call_list *call_list) {} static const grpc_channel_filter lame_filter = { lame_start_transport_stream_op, @@ -148,14 +154,16 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, grpc_channel *channel; grpc_channel_element *elem; channel_data *chand; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; static const grpc_channel_filter *filters[] = {&lame_filter}; - channel = grpc_channel_create_from_filters(target, filters, 1, NULL, - grpc_mdctx_create(), - grpc_workqueue_create(), 1); + channel = grpc_channel_create_from_filters( + target, filters, 1, NULL, grpc_mdctx_create(), + grpc_workqueue_create(&call_list), 1, &call_list); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); GPR_ASSERT(elem->filter == &lame_filter); chand = (channel_data *)elem->channel_data; chand->error_code = error_code; chand->error_message = error_message; + grpc_call_list_run(&call_list); return channel; } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 4a75d03f0a..b5b9ee173e 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -108,8 +108,9 @@ static void on_secure_transport_setup_done(void *arg, c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, secure_endpoint, c->mdctx, 1); - grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); + c->args.channel_args, secure_endpoint, c->mdctx, 1, call_list); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0, + call_list); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); c->result->filters[0] = &grpc_http_client_filter; c->result->filters[1] = &grpc_client_auth_filter; @@ -187,12 +188,13 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) { gpr_ref(&f->refs); } -static void subchannel_factory_unref(grpc_subchannel_factory *scf) { +static void subchannel_factory_unref(grpc_subchannel_factory *scf, + grpc_call_list *call_list) { subchannel_factory *f = (subchannel_factory *)scf; if (gpr_unref(&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "subchannel_factory"); - GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory", call_list); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -279,7 +281,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(target, filters, n, args_copy, - mdctx, workqueue, 1); + mdctx, workqueue, 1, &call_list); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; @@ -299,7 +301,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver, &call_list); GRPC_RESOLVER_UNREF(resolver, "create", &call_list); - grpc_subchannel_factory_unref(&f->base); + grpc_subchannel_factory_unref(&f->base, &call_list); GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create"); grpc_channel_args_destroy(args_copy); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e38c6028d9..24545c67e1 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -57,9 +57,11 @@ typedef struct listener { void *arg; void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, - size_t pollset_count); - void (*destroy)(grpc_server *server, void *arg); + size_t pollset_count, grpc_call_list *call_list); + void (*destroy)(grpc_server *server, void *arg, grpc_closure *closure, + grpc_call_list *call_list); struct listener *next; + grpc_closure destroy_done; } listener; typedef struct call_data call_data; @@ -219,19 +221,19 @@ struct grpc_server { /** when did we print the last shutdown progress message */ gpr_timespec last_shutdown_message_time; - - grpc_workqueue *workqueue; }; #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) static void begin_call(grpc_server *server, call_data *calld, - requested_call *rc); -static void fail_call(grpc_server *server, requested_call *rc); + requested_call *rc, grpc_call_list *call_list); +static void fail_call(grpc_server *server, requested_call *rc, + grpc_call_list *call_list); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ -static void maybe_finish_shutdown(grpc_server *server); +static void maybe_finish_shutdown(grpc_server *server, + grpc_call_list *call_list); /* * channel broadcaster @@ -258,14 +260,15 @@ struct shutdown_cleanup_args { gpr_slice slice; }; -static void shutdown_cleanup(void *arg, int iomgr_status_ignored) { +static void shutdown_cleanup(void *arg, int iomgr_status_ignored, + grpc_call_list *call_list) { struct shutdown_cleanup_args *a = arg; gpr_slice_unref(a->slice); gpr_free(a); } static void send_shutdown(grpc_channel *channel, int send_goaway, - int send_disconnect) { + int send_disconnect, grpc_call_list *call_list) { grpc_transport_op op; struct shutdown_cleanup_args *sc; grpc_channel_element *elem; @@ -281,17 +284,17 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, op.on_consumed = &sc->closure; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(elem, &op); + elem->filter->start_transport_op(elem, &op, call_list); } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, - int force_disconnect) { + int send_goaway, int force_disconnect, + grpc_call_list *call_list) { size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], send_goaway, force_disconnect); - GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); + send_shutdown(cb->channels[i], send_goaway, force_disconnect, call_list); + GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast", call_list); } gpr_free(cb->channels); } @@ -311,12 +314,12 @@ static void request_matcher_destroy(request_matcher *request_matcher) { gpr_stack_lockfree_destroy(request_matcher->requests); } -static void kill_zombie(void *elem, int success) { +static void kill_zombie(void *elem, int success, grpc_call_list *call_list) { grpc_call_destroy(grpc_call_from_top_element(elem)); } static void request_matcher_zombify_all_pending_calls( - request_matcher *request_matcher, grpc_workqueue *workqueue) { + request_matcher *request_matcher, grpc_call_list *call_list) { while (request_matcher->pending_head) { call_data *calld = request_matcher->pending_head; request_matcher->pending_head = calld->pending_next; @@ -326,15 +329,16 @@ static void request_matcher_zombify_all_pending_calls( grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); } } static void request_matcher_kill_requests(grpc_server *server, - request_matcher *rm) { + request_matcher *rm, + grpc_call_list *call_list) { int request_id; while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { - fail_call(server, &server->requested_calls[request_id]); + fail_call(server, &server->requested_calls[request_id], call_list); } } @@ -346,7 +350,7 @@ static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } -static void server_delete(grpc_server *server) { +static void server_delete(grpc_server *server, grpc_call_list *call_list) { registered_method *rm; size_t i; grpc_channel_args_destroy(server->channel_args); @@ -365,7 +369,6 @@ static void server_delete(grpc_server *server) { } request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); - GRPC_WORKQUEUE_UNREF(server->workqueue, "destroy"); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -373,9 +376,9 @@ static void server_delete(grpc_server *server) { gpr_free(server); } -static void server_unref(grpc_server *server) { +static void server_unref(grpc_server *server, grpc_call_list *call_list) { if (gpr_unref(&server->internal_refcount)) { - server_delete(server); + server_delete(server, call_list); } } @@ -389,30 +392,29 @@ static void orphan_channel(channel_data *chand) { chand->next = chand->prev = chand; } -static void finish_destroy_channel(void *cd, int success) { +static void finish_destroy_channel(void *cd, int success, + grpc_call_list *call_list) { channel_data *chand = cd; grpc_server *server = chand->server; gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server"); - server_unref(server); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server", call_list); + server_unref(server, call_list); } -static void destroy_channel(channel_data *chand) { +static void destroy_channel(channel_data *chand, grpc_call_list *call_list) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != NULL); orphan_channel(chand); server_ref(chand->server); - maybe_finish_shutdown(chand->server); + maybe_finish_shutdown(chand->server, call_list); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - gpr_log(GPR_DEBUG, "queue finish_destroy_channel: %p on %p", chand->channel, - chand->server->workqueue); - grpc_workqueue_push(chand->server->workqueue, - &chand->finish_destroy_channel_closure, 1); + grpc_call_list_add(call_list, &chand->finish_destroy_channel_closure, 1); } static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, - request_matcher *request_matcher) { + request_matcher *request_matcher, + grpc_call_list *call_list) { call_data *calld = elem->call_data; int request_id; @@ -421,7 +423,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); return; } @@ -443,11 +445,11 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, &server->requested_calls[request_id]); + begin_call(server, calld, &server->requested_calls[request_id], call_list); } } -static void start_new_rpc(grpc_call_element *elem) { +static void start_new_rpc(grpc_call_element *elem, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_server *server = chand->server; @@ -466,7 +468,8 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; finish_start_new_rpc(server, elem, - &rm->server_registered_method->request_matcher); + &rm->server_registered_method->request_matcher, + call_list); return; } /* check for a wildcard method definition (no host set) */ @@ -478,11 +481,13 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->host != NULL) continue; if (rm->method != calld->path) continue; finish_start_new_rpc(server, elem, - &rm->server_registered_method->request_matcher); + &rm->server_registered_method->request_matcher, + call_list); return; } } - finish_start_new_rpc(server, elem, &server->unregistered_request_matcher); + finish_start_new_rpc(server, elem, &server->unregistered_request_matcher, + call_list); } static int num_listeners(grpc_server *server) { @@ -494,8 +499,9 @@ static int num_listeners(grpc_server *server) { return n; } -static void done_shutdown_event(void *server, grpc_cq_completion *completion) { - server_unref(server); +static void done_shutdown_event(void *server, grpc_cq_completion *completion, + grpc_call_list *call_list) { + server_unref(server, call_list); } static int num_channels(grpc_server *server) { @@ -508,25 +514,27 @@ static int num_channels(grpc_server *server) { return n; } -static void kill_pending_work_locked(grpc_server *server) { +static void kill_pending_work_locked(grpc_server *server, + grpc_call_list *call_list) { registered_method *rm; - request_matcher_kill_requests(server, &server->unregistered_request_matcher); + request_matcher_kill_requests(server, &server->unregistered_request_matcher, + call_list); request_matcher_zombify_all_pending_calls( - &server->unregistered_request_matcher, server->workqueue); + &server->unregistered_request_matcher, call_list); for (rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(&rm->request_matcher, - server->workqueue); + request_matcher_kill_requests(server, &rm->request_matcher, call_list); + request_matcher_zombify_all_pending_calls(&rm->request_matcher, call_list); } } -static void maybe_finish_shutdown(grpc_server *server) { +static void maybe_finish_shutdown(grpc_server *server, + grpc_call_list *call_list) { size_t i; if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } - kill_pending_work_locked(server); + kill_pending_work_locked(server, call_list); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { @@ -548,7 +556,7 @@ static void maybe_finish_shutdown(grpc_server *server) { server_ref(server); grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1, done_shutdown_event, server, - &server->shutdown_tags[i].completion); + &server->shutdown_tags[i].completion, call_list); } } @@ -566,10 +574,9 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void server_on_recv(void *ptr, int success) { +static void server_on_recv(void *ptr, int success, grpc_call_list *call_list) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; gpr_timespec op_deadline; if (success && !calld->got_initial_metadata) { @@ -587,7 +594,7 @@ static void server_on_recv(void *ptr, int success) { } if (calld->host && calld->path) { calld->got_initial_metadata = 1; - start_new_rpc(elem); + start_new_rpc(elem, call_list); } break; } @@ -604,8 +611,7 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_workqueue_push(chand->server->workqueue, - &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); } else { gpr_mu_unlock(&calld->mu_state); } @@ -616,8 +622,7 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_workqueue_push(chand->server->workqueue, - &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -629,7 +634,7 @@ static void server_on_recv(void *ptr, int success) { break; } - calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, call_list); } static void server_mutate_op(grpc_call_element *elem, @@ -646,10 +651,11 @@ static void server_mutate_op(grpc_call_element *elem, } static void server_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); - grpc_call_next_op(elem, op); + grpc_call_next_op(elem, op, call_list); } static void accept_stream(void *cd, grpc_transport *transport, @@ -660,7 +666,8 @@ static void accept_stream(void *cd, grpc_transport *transport, 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } -static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { +static void channel_connectivity_changed(void *cd, int iomgr_status_ignored, + grpc_call_list *call_list) { channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { @@ -670,18 +677,19 @@ static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { op.connectivity_state = &chand->connectivity_state; grpc_channel_next_op(grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - &op); + &op, call_list); } else { gpr_mu_lock(&server->mu_global); - destroy_channel(chand); + destroy_channel(chand, call_list); gpr_mu_unlock(&server->mu_global); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity"); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity", call_list); } } static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -696,7 +704,8 @@ static void init_call_elem(grpc_call_element *elem, if (initial_op) server_mutate_op(elem, initial_op); } -static void destroy_call_elem(grpc_call_element *elem) { +static void destroy_call_elem(grpc_call_element *elem, + grpc_call_list *call_list) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; @@ -711,13 +720,13 @@ static void destroy_call_elem(grpc_call_element *elem) { gpr_mu_destroy(&calld->mu_state); - server_unref(chand->server); + server_unref(chand->server, call_list); } static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, - int is_last) { + int is_last, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; GPR_ASSERT(is_first); GPR_ASSERT(!is_last); @@ -733,7 +742,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channel_connectivity_changed, chand); } -static void destroy_channel_elem(grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_channel_element *elem, + grpc_call_list *call_list) { size_t i; channel_data *chand = elem->channel_data; if (chand->registered_methods) { @@ -752,11 +762,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; - maybe_finish_shutdown(chand->server); + maybe_finish_shutdown(chand->server, call_list); gpr_mu_unlock(&chand->server->mu_global); GRPC_MDSTR_UNREF(chand->path_key); GRPC_MDSTR_UNREF(chand->authority_key); - server_unref(chand->server); + server_unref(chand->server, call_list); } } @@ -810,7 +820,6 @@ grpc_server *grpc_server_create_from_filters( gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; - server->workqueue = grpc_workqueue_create(); /* TODO(ctiller): expose a channel_arg for this */ server->max_requested_calls = 32768; @@ -881,23 +890,26 @@ void *grpc_server_register_method(grpc_server *server, const char *method, void grpc_server_start(grpc_server *server) { listener *l; size_t i; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); for (i = 0; i < server->cq_count; i++) { server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); - grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]); } for (l = server->listeners; l; l = l->next) { - l->start(server, l->arg, server->pollsets, server->cq_count); + l->start(server, l->arg, server->pollsets, server->cq_count, &call_list); } + + grpc_call_list_run(&call_list); } void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, grpc_workqueue *workqueue, - const grpc_channel_args *args) { + const grpc_channel_args *args, + grpc_call_list *call_list) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); @@ -927,11 +939,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, for (i = 0; i < s->cq_count; i++) { memset(&op, 0, sizeof(op)); op.bind_pollset = grpc_cq_pollset(s->cqs[i]); - grpc_transport_perform_op(transport, &op); + grpc_transport_perform_op(transport, &op, call_list); } channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, - mdctx, workqueue, 0); + mdctx, workqueue, 0, call_list); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; @@ -987,19 +999,30 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0; - grpc_transport_perform_op(transport, &op); + grpc_transport_perform_op(transport, &op, call_list); } -void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) { +void done_published_shutdown(void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list) { (void) done_arg; gpr_free(storage); } +static void listener_destroy_done(void *s, int success, + grpc_call_list *call_list) { + grpc_server *server = s; + gpr_mu_lock(&server->mu_global); + server->listeners_destroyed++; + maybe_finish_shutdown(server, call_list); + gpr_mu_unlock(&server->mu_global); +} + void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; shutdown_tag *sdt; channel_broadcaster broadcaster; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag); @@ -1008,9 +1031,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, grpc_cq_begin_op(cq); if (server->shutdown_published) { grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL, - gpr_malloc(sizeof(grpc_cq_completion))); + gpr_malloc(sizeof(grpc_cq_completion)), &call_list); gpr_mu_unlock(&server->mu_global); - return; + goto done; } server->shutdown_tags = gpr_realloc(server->shutdown_tags, @@ -1020,7 +1043,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, sdt->cq = cq; if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_unlock(&server->mu_global); - return; + goto done; } server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); @@ -1029,41 +1052,40 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - kill_pending_work_locked(server); + kill_pending_work_locked(server, &call_list); gpr_mu_unlock(&server->mu_call); gpr_atm_rel_store(&server->shutdown_flag, 1); - maybe_finish_shutdown(server); + maybe_finish_shutdown(server, &call_list); gpr_mu_unlock(&server->mu_global); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { - l->destroy(server, l->arg); + grpc_closure_init(&l->destroy_done, listener_destroy_done, server); + l->destroy(server, l->arg, &l->destroy_done, &call_list); } - channel_broadcaster_shutdown(&broadcaster, 1, 0); -} + channel_broadcaster_shutdown(&broadcaster, 1, 0, &call_list); -void grpc_server_listener_destroy_done(void *s) { - grpc_server *server = s; - gpr_mu_lock(&server->mu_global); - server->listeners_destroyed++; - maybe_finish_shutdown(server); - gpr_mu_unlock(&server->mu_global); +done: + grpc_call_list_run(&call_list); } void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster broadcaster; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&server->mu_global); channel_broadcaster_init(server, &broadcaster); gpr_mu_unlock(&server->mu_global); - channel_broadcaster_shutdown(&broadcaster, 0, 1); + channel_broadcaster_shutdown(&broadcaster, 0, 1, &call_list); + grpc_call_list_run(&call_list); } void grpc_server_destroy(grpc_server *server) { listener *l; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&server->mu_global); GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners); @@ -1077,16 +1099,17 @@ void grpc_server_destroy(grpc_server *server) { gpr_mu_unlock(&server->mu_global); - grpc_workqueue_flush(server->workqueue); - - server_unref(server); + server_unref(server, &call_list); + grpc_call_list_run(&call_list); } -void grpc_server_add_listener(grpc_server *server, void *arg, - void (*start)(grpc_server *server, void *arg, - grpc_pollset **pollsets, - size_t pollset_count), - void (*destroy)(grpc_server *server, void *arg)) { +void grpc_server_add_listener( + grpc_server *server, void *arg, + void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, + size_t pollset_count, grpc_call_list *call_list), + void (*destroy)(grpc_server *server, void *arg, grpc_closure *on_done, + grpc_call_list *call_list), + grpc_call_list *call_list) { listener *l = gpr_malloc(sizeof(listener)); l->arg = arg; l->start = start; @@ -1096,18 +1119,19 @@ void grpc_server_add_listener(grpc_server *server, void *arg, } static grpc_call_error queue_call_request(grpc_server *server, - requested_call *rc) { + requested_call *rc, + grpc_call_list *call_list) { call_data *calld = NULL; request_matcher *request_matcher = NULL; int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(server, rc); + fail_call(server, rc, call_list); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call(server, rc); + fail_call(server, rc, call_list); return GRPC_CALL_OK; } switch (rc->type) { @@ -1135,12 +1159,13 @@ static grpc_call_error queue_call_request(grpc_server *server, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); + grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, &server->requested_calls[request_id]); + begin_call(server, calld, &server->requested_calls[request_id], + call_list); } gpr_mu_lock(&server->mu_call); } @@ -1154,13 +1179,16 @@ grpc_call_error grpc_server_request_call( grpc_metadata_array *initial_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { + grpc_call_error error; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; requested_call *rc = gpr_malloc(sizeof(*rc)); GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); if (!grpc_cq_is_server_cq(cq_for_notification)) { gpr_free(rc); - return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + goto done; } grpc_cq_begin_op(cq_for_notification); details->reserved = NULL; @@ -1172,7 +1200,10 @@ grpc_call_error grpc_server_request_call( rc->call = call; rc->data.batch.details = details; rc->data.batch.initial_metadata = initial_metadata; - return queue_call_request(server, rc); + error = queue_call_request(server, rc, &call_list); +done: + grpc_call_list_run(&call_list); + return error; } grpc_call_error grpc_server_request_registered_call( @@ -1180,11 +1211,14 @@ grpc_call_error grpc_server_request_registered_call( grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { + grpc_call_error error; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; requested_call *rc = gpr_malloc(sizeof(*rc)); registered_method *registered_method = rm; if (!grpc_cq_is_server_cq(cq_for_notification)) { gpr_free(rc); - return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + goto done; } grpc_cq_begin_op(cq_for_notification); rc->type = REGISTERED_CALL; @@ -1197,12 +1231,16 @@ grpc_call_error grpc_server_request_registered_call( rc->data.registered.deadline = deadline; rc->data.registered.initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; - return queue_call_request(server, rc); + error = queue_call_request(server, rc, &call_list); +done: + grpc_call_list_run(&call_list); + return error; } -static void publish_registered_or_batch(grpc_call *call, int success, - void *tag); -static void publish_was_not_set(grpc_call *call, int success, void *tag) { +static void publish_registered_or_batch(grpc_call *call, int success, void *tag, + grpc_call_list *call_list); +static void publish_was_not_set(grpc_call *call, int success, void *tag, + grpc_call_list *call_list) { abort(); } @@ -1218,7 +1256,7 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { } static void begin_call(grpc_server *server, call_data *calld, - requested_call *rc) { + requested_call *rc, grpc_call_list *call_list) { grpc_ioreq_completion_func publish = publish_was_not_set; grpc_ioreq req[2]; grpc_ioreq *r = req; @@ -1229,7 +1267,7 @@ static void begin_call(grpc_server *server, call_data *calld, fill in the metadata array passed by the client, we need to perform an ioreq op, that should complete immediately. */ - grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); + grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call, call_list); *rc->call = calld->call; calld->cq_new = rc->cq_for_notification; switch (rc->type) { @@ -1265,10 +1303,11 @@ static void begin_call(grpc_server *server, call_data *calld, GRPC_CALL_INTERNAL_REF(calld->call, "server"); grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req), - publish, rc); + publish, rc, call_list); } -static void done_request_event(void *req, grpc_cq_completion *c) { +static void done_request_event(void *req, grpc_cq_completion *c, + grpc_call_list *call_list) { requested_call *rc = req; grpc_server *server = rc->server; @@ -1281,10 +1320,11 @@ static void done_request_event(void *req, grpc_cq_completion *c) { gpr_free(req); } - server_unref(server); + server_unref(server, call_list); } -static void fail_call(grpc_server *server, requested_call *rc) { +static void fail_call(grpc_server *server, requested_call *rc, + grpc_call_list *call_list) { *rc->call = NULL; switch (rc->type) { case BATCH_CALL: @@ -1296,11 +1336,11 @@ static void fail_call(grpc_server *server, requested_call *rc) { } server_ref(server); grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, - &rc->completion); + &rc->completion, call_list); } -static void publish_registered_or_batch(grpc_call *call, int success, - void *prc) { +static void publish_registered_or_batch(grpc_call *call, int success, void *prc, + grpc_call_list *call_list) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); requested_call *rc = prc; @@ -1308,7 +1348,7 @@ static void publish_registered_or_batch(grpc_call *call, int success, channel_data *chand = elem->channel_data; server_ref(chand->server); grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, - &rc->completion); + &rc->completion, call_list); GRPC_CALL_INTERNAL_UNREF(call, "server", call_list); } diff --git a/src/core/surface/server.h b/src/core/surface/server.h index f87296797c..2f2c5b8948 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -49,9 +49,9 @@ void grpc_server_add_listener( grpc_server *server, void *listener, void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t npollsets, grpc_call_list *call_list), - void (*destroy)(grpc_server *server, void *arg, grpc_call_list *call_list)); - -void grpc_server_listener_destroy_done(void *server); + void (*destroy)(grpc_server *server, void *arg, grpc_closure *on_done, + grpc_call_list *call_list), + grpc_call_list *call_list); /* Setup a transport - creates a channel stack, binds the transport to the server */ @@ -59,7 +59,8 @@ void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, grpc_workqueue *workqueue, - const grpc_channel_args *args); + const grpc_channel_args *args, + grpc_call_list *call_list); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 91cf6ece9c..df63d99dea 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -43,15 +43,17 @@ #include <grpc/support/useful.h> static void setup_transport(void *server, grpc_transport *transport, - grpc_mdctx *mdctx, grpc_workqueue *workqueue) { + grpc_mdctx *mdctx, grpc_workqueue *workqueue, + grpc_call_list *call_list) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, - grpc_server_get_channel_args(server)); + grpc_server_get_channel_args(server), call_list); } -static void new_transport(void *server, grpc_endpoint *tcp) { +static void new_transport(void *server, grpc_endpoint *tcp, + grpc_call_list *call_list) { /* * Beware that the call to grpc_create_chttp2_transport() has to happen before * grpc_tcp_server_destroy(). This is fine here, but similar code @@ -60,25 +62,27 @@ static void new_transport(void *server, grpc_endpoint *tcp) { * case. */ grpc_mdctx *mdctx = grpc_mdctx_create(); - grpc_workqueue *workqueue = grpc_workqueue_create(); + grpc_workqueue *workqueue = grpc_workqueue_create(call_list); grpc_transport *transport = grpc_create_chttp2_transport( - grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0); - setup_transport(server, transport, mdctx, workqueue); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_server_get_channel_args(server), tcp, mdctx, 0, call_list); + setup_transport(server, transport, mdctx, workqueue, call_list); + grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list); } /* Server callback: start listening on our ports */ static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, - size_t pollset_count) { + size_t pollset_count, grpc_call_list *call_list) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server); + grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server, + call_list); } /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ -static void destroy(grpc_server *server, void *tcpp) { +static void destroy(grpc_server *server, void *tcpp, grpc_closure *destroy_done, + grpc_call_list *call_list) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server); + grpc_tcp_server_destroy(tcp, destroy_done, call_list); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { @@ -88,6 +92,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { unsigned count = 0; int port_num = -1; int port_temp; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; resolved = grpc_blocking_resolve_address(addr, "http"); if (!resolved) { @@ -124,9 +129,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses_destroy(resolved); /* Register with the server only upon success */ - grpc_server_add_listener(server, tcp, start, destroy); - - return port_num; + grpc_server_add_listener(server, tcp, start, destroy, &call_list); + goto done; /* Error path: cleanup and return */ error: @@ -136,5 +140,9 @@ error: if (tcp) { grpc_tcp_server_destroy(tcp, NULL, NULL); } + port_num = 0; + +done: + grpc_call_list_run(&call_list); return 0; } |