aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-21 14:39:57 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-21 14:39:57 -0700
commitdfff1b8126a1f83833fd99626517f28d1e68453a (patch)
tree40a9aa8126c08a11fb1a5cdd4058f504e05dca43 /src/core/surface
parent3ffd8220a17fd2fdf64adc66b03e4e254880471b (diff)
Call list progress
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c22
-rw-r--r--src/core/surface/call.h3
-rw-r--r--src/core/surface/channel.c19
-rw-r--r--src/core/surface/channel.h14
-rw-r--r--src/core/surface/channel_connectivity.c40
-rw-r--r--src/core/surface/channel_create.c60
-rw-r--r--src/core/surface/completion_queue.c32
-rw-r--r--src/core/surface/completion_queue.h6
-rw-r--r--src/core/surface/lame_client.c40
-rw-r--r--src/core/surface/secure_channel_create.c14
-rw-r--r--src/core/surface/server.c288
-rw-r--r--src/core/surface/server.h9
-rw-r--r--src/core/surface/server_chttp2.c36
13 files changed, 347 insertions, 236 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index f2f8f0a6ed..bbaf65759d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -356,7 +356,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
initial_op_ptr = &initial_op;
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
- CALL_STACK_FROM_CALL(call));
+ CALL_STACK_FROM_CALL(call), &call_list);
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
@@ -459,7 +459,7 @@ static void destroy_call(grpc_call *call, grpc_call_list *call_list) {
size_t i;
grpc_call *c = call;
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), call_list);
- GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
+ GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call", call_list);
gpr_mu_destroy(&c->mu);
gpr_mu_destroy(&c->completion_mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
@@ -673,7 +673,8 @@ static void unlock(grpc_call *call, grpc_call_list *call_list) {
if (completing_requests > 0) {
for (i = 0; i < completing_requests; i++) {
completed_requests[i].on_complete(call, completed_requests[i].success,
- completed_requests[i].user_data);
+ completed_requests[i].user_data,
+ call_list);
}
lock(call);
call->completing = 0;
@@ -1556,14 +1557,16 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
*(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
}
-static void finish_batch(grpc_call *call, int success, void *tag) {
+static void finish_batch(grpc_call *call, int success, void *tag,
+ grpc_call_list *call_list) {
grpc_cq_end_op(call->cq, tag, success, done_completion, call,
- allocate_completion(call));
+ allocate_completion(call), call_list);
}
-static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
+static void finish_batch_with_close(grpc_call *call, int success, void *tag,
+ grpc_call_list *call_list) {
grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
- allocate_completion(call));
+ allocate_completion(call), call_list);
}
static int are_write_flags_valid(gpr_uint32 flags) {
@@ -1581,7 +1584,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
- void (*finish_func)(grpc_call *, int, void *) = finish_batch;
+ void (*finish_func)(grpc_call *, int, void *, grpc_call_list *) =
+ finish_batch;
grpc_call_error error;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
@@ -1596,7 +1600,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_cq_begin_op(call->cq);
GRPC_CALL_INTERNAL_REF(call, "completion");
grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
- allocate_completion(call));
+ allocate_completion(call), &call_list);
error = GRPC_CALL_OK;
goto done;
}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 7a7178bc7b..144aa7cef2 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -87,7 +87,8 @@ typedef struct {
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
- void *user_data);
+ void *user_data,
+ grpc_call_list *call_list);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 propagation_mask,
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index fdba09fcce..46bea13936 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -272,9 +272,9 @@ void grpc_channel_internal_ref(grpc_channel *c) {
gpr_ref(&c->refs);
}
-static void destroy_channel(grpc_channel *channel) {
+static void destroy_channel(grpc_channel *channel, grpc_call_list *call_list) {
size_t i;
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel), call_list);
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]);
}
@@ -303,26 +303,31 @@ static void destroy_channel(grpc_channel *channel) {
}
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
-void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) {
+void grpc_channel_internal_unref(grpc_channel *channel, const char *reason,
+ grpc_call_list *call_list) {
gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel,
channel->refs.count, channel->refs.count - 1, reason);
#else
-void grpc_channel_internal_unref(grpc_channel *channel) {
+void grpc_channel_internal_unref(grpc_channel *channel,
+ grpc_call_list *call_list) {
#endif
if (gpr_unref(&channel->refs)) {
- destroy_channel(channel);
+ destroy_channel(channel, call_list);
}
}
void grpc_channel_destroy(grpc_channel *channel) {
grpc_transport_op op;
grpc_channel_element *elem;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
memset(&op, 0, sizeof(op));
op.disconnect = 1;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->start_transport_op(elem, &op);
+ elem->filter->start_transport_op(elem, &op, &call_list);
- GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
+ GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel", &call_list);
+
+ grpc_call_list_run(&call_list);
}
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 664ecc1c5a..3f51164fcc 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -67,18 +67,20 @@ grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel);
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
-void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);
+void grpc_channel_internal_unref(grpc_channel *channel, const char *reason,
+ grpc_call_list *call_list);
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel, reason)
-#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
- grpc_channel_internal_unref(channel, reason)
+#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, call_list) \
+ grpc_channel_internal_unref(channel, reason, call_list)
#else
void grpc_channel_internal_ref(grpc_channel *channel);
-void grpc_channel_internal_unref(grpc_channel *channel);
+void grpc_channel_internal_unref(grpc_channel *channel,
+ grpc_call_list *call_list);
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel)
-#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
- grpc_channel_internal_unref(channel)
+#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, call_list) \
+ grpc_channel_internal_unref(channel, call_list)
#endif
#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 4d0cf1ed8b..7891669e35 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -56,7 +56,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
}
state = grpc_client_channel_check_connectivity_state(
client_channel_elem, try_to_connect, &call_list);
- grpc_call_list_run(call_list);
+ grpc_call_list_run(&call_list);
return state;
}
@@ -80,17 +80,18 @@ typedef struct {
void *tag;
} state_watcher;
-static void delete_state_watcher(state_watcher *w) {
+static void delete_state_watcher(state_watcher *w, grpc_call_list *call_list) {
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
grpc_client_channel_del_interested_party(client_channel_elem,
- grpc_cq_pollset(w->cq));
- GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
+ grpc_cq_pollset(w->cq), call_list);
+ GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity", call_list);
gpr_mu_destroy(&w->mu);
gpr_free(w);
}
-static void finished_completion(void *pw, grpc_cq_completion *ignored) {
+static void finished_completion(void *pw, grpc_cq_completion *ignored,
+ grpc_call_list *call_list) {
int delete = 0;
state_watcher *w = pw;
gpr_mu_lock(&w->mu);
@@ -110,18 +111,19 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) {
gpr_mu_unlock(&w->mu);
if (delete) {
- delete_state_watcher(w);
+ delete_state_watcher(w, call_list);
}
}
-static void partly_done(state_watcher *w, int due_to_completion) {
+static void partly_done(state_watcher *w, int due_to_completion,
+ grpc_call_list *call_list) {
int delete = 0;
if (due_to_completion) {
gpr_mu_lock(&w->mu);
w->success = 1;
gpr_mu_unlock(&w->mu);
- grpc_alarm_cancel(&w->alarm);
+ grpc_alarm_cancel(&w->alarm, call_list);
}
gpr_mu_lock(&w->mu);
@@ -129,7 +131,7 @@ static void partly_done(state_watcher *w, int due_to_completion) {
case WAITING:
w->phase = CALLING_BACK;
grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w,
- &w->completion_storage);
+ &w->completion_storage, call_list);
break;
case CALLING_BACK:
w->phase = CALLING_BACK_AND_FINISHED;
@@ -145,13 +147,17 @@ static void partly_done(state_watcher *w, int due_to_completion) {
gpr_mu_unlock(&w->mu);
if (delete) {
- delete_state_watcher(w);
+ delete_state_watcher(w, call_list);
}
}
-static void watch_complete(void *pw, int success) { partly_done(pw, 1); }
+static void watch_complete(void *pw, int success, grpc_call_list *call_list) {
+ partly_done(pw, 1, call_list);
+}
-static void timeout_complete(void *pw, int success) { partly_done(pw, 0); }
+static void timeout_complete(void *pw, int success, grpc_call_list *call_list) {
+ partly_done(pw, 0, call_list);
+}
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
@@ -172,9 +178,9 @@ void grpc_channel_watch_connectivity_state(
w->tag = tag;
w->channel = channel;
- grpc_alarm_init(&w->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_alarm_init(
+ &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC), &call_list);
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
@@ -185,10 +191,10 @@ void grpc_channel_watch_connectivity_state(
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem,
- grpc_cq_pollset(cq));
+ grpc_cq_pollset(cq), &call_list);
grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
&w->on_complete, &call_list);
}
- grpc_call_list_run(call_list);
+ grpc_call_list_run(&call_list);
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index f6f42b3d7a..7ac76da10a 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -55,6 +55,13 @@ typedef struct {
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
+
+ grpc_endpoint *tcp;
+
+ grpc_mdctx *mdctx;
+ grpc_workqueue *workqueue;
+
+ grpc_closure connected;
} connector;
static void connector_ref(grpc_connector *con) {
@@ -62,21 +69,24 @@ static void connector_ref(grpc_connector *con) {
gpr_ref(&c->refs);
}
-static void connector_unref(grpc_connector *con) {
+static void connector_unref(grpc_connector *con, grpc_call_list *call_list) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
+ grpc_mdctx_unref(c->mdctx);
+ GRPC_WORKQUEUE_UNREF(c->workqueue, "connector", call_list);
gpr_free(c);
}
}
-static void connected(void *arg, grpc_endpoint *tcp) {
+static void connected(void *arg, int success, grpc_call_list *call_list) {
connector *c = arg;
grpc_closure *notify;
+ grpc_endpoint *tcp = c->tcp;
if (tcp != NULL) {
c->result->transport = grpc_create_chttp2_transport(
- c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue,
- 1);
- grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
+ c->args.channel_args, tcp, c->mdctx, 1, call_list);
+ grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0,
+ call_list);
GPR_ASSERT(c->result->transport);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
c->result->filters[0] = &grpc_http_client_filter;
@@ -86,24 +96,27 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
notify = c->notify;
c->notify = NULL;
- notify->cb(notify->cb_arg, 1);
+ notify->cb(notify->cb_arg, 1, call_list);
}
-static void connector_shutdown(grpc_connector *con) {}
+static void connector_shutdown(grpc_connector *con, grpc_call_list *call_list) {
+}
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
- grpc_closure *notify) {
+ grpc_closure *notify, grpc_call_list *call_list) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
c->notify = notify;
c->args = *args;
c->result = result;
- grpc_tcp_client_connect(connected, c, args->interested_parties,
- args->workqueue, args->addr, args->addr_len,
- args->deadline);
+ c->tcp = NULL;
+ grpc_closure_init(&c->connected, connected, c);
+ grpc_tcp_client_connect(&c->connected, &c->tcp, args->interested_parties,
+ args->addr, args->addr_len, args->deadline,
+ call_list);
}
static const grpc_connector_vtable connector_vtable = {
@@ -122,10 +135,11 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
+static void subchannel_factory_unref(grpc_subchannel_factory *scf,
+ grpc_call_list *call_list) {
subchannel_factory *f = (subchannel_factory *)scf;
if (gpr_unref(&f->refs)) {
- GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
+ GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory", call_list);
grpc_channel_args_destroy(f->merge_args);
grpc_mdctx_unref(f->mdctx);
gpr_free(f);
@@ -133,7 +147,8 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
}
static grpc_subchannel *subchannel_factory_create_subchannel(
- grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
+ grpc_subchannel_factory *scf, grpc_subchannel_args *args,
+ grpc_call_list *call_list) {
subchannel_factory *f = (subchannel_factory *)scf;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
@@ -146,7 +161,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
args->args = final_args;
args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
- grpc_connector_unref(&c->base);
+ grpc_connector_unref(&c->base, call_list);
grpc_channel_args_destroy(final_args);
return s;
}
@@ -168,7 +183,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_resolver *resolver;
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
- grpc_workqueue *workqueue = grpc_workqueue_create();
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_workqueue *workqueue = grpc_workqueue_create(&call_list);
size_t n = 0;
GPR_ASSERT(!reserved);
if (grpc_channel_args_is_census_enabled(args)) {
@@ -179,7 +195,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx,
- workqueue, 1);
+ workqueue, 1, &call_list);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
@@ -189,15 +205,17 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
f->merge_args = grpc_channel_args_copy(args);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base, workqueue);
+ resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
return NULL;
}
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
- resolver);
- GRPC_RESOLVER_UNREF(resolver, "create");
- grpc_subchannel_factory_unref(&f->base);
+ resolver, &call_list);
+ GRPC_RESOLVER_UNREF(resolver, "create", &call_list);
+ grpc_subchannel_factory_unref(&f->base, &call_list);
+
+ grpc_call_list_run(&call_list);
return channel;
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b58115a93f..49dfc3c0e1 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -67,8 +67,12 @@ struct grpc_completion_queue {
int is_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
+ grpc_closure pollset_destroy_done;
};
+static void on_pollset_destroy_done(void *cc, int success,
+ grpc_call_list *call_list);
+
grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
GPR_ASSERT(!reserved);
@@ -80,6 +84,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_pollset_init(&cc->pollset);
cc->completed_tail = &cc->completed_head;
cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
+ grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc);
return cc;
}
@@ -94,7 +99,8 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
gpr_ref(&cc->owning_refs);
}
-static void on_pollset_destroy_done(void *arg) {
+static void on_pollset_destroy_done(void *arg, int success,
+ grpc_call_list *call_list) {
grpc_completion_queue *cc = arg;
GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
}
@@ -127,8 +133,10 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) {
event, then enter shutdown mode */
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
- void (*done)(void *done_arg, grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
+ void (*done)(void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list),
+ void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list) {
int shutdown;
int i;
grpc_pollset_worker *pluck_worker;
@@ -162,7 +170,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, call_list);
}
}
@@ -172,6 +180,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_pollset_worker worker;
int first_loop = 1;
gpr_timespec now;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GPR_ASSERT(!reserved);
@@ -190,7 +199,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(c->done_arg, c, &call_list);
break;
}
if (cc->shutdown) {
@@ -207,10 +216,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
first_loop = 0;
- grpc_pollset_work(&cc->pollset, &worker, now, deadline);
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
+ grpc_call_list_run(&call_list);
return ret;
}
@@ -247,6 +257,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker worker;
gpr_timespec now;
int first_loop = 1;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GPR_ASSERT(!reserved);
@@ -268,7 +279,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(c->done_arg, c, &call_list);
goto done;
}
prev = c;
@@ -299,18 +310,20 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
break;
}
first_loop = 0;
- grpc_pollset_work(&cc->pollset, &worker, now, deadline);
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list);
del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
+ grpc_call_list_run(&call_list);
return ret;
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
if (cc->shutdown_called) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
@@ -324,8 +337,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, &call_list);
}
+ grpc_call_list_run(&call_list);
}
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 793baff03a..6d8d1ce959 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -44,7 +44,8 @@ typedef struct grpc_cq_completion {
void *tag;
/** done callback - called when this queue element is no longer
needed by the completion queue */
- void (*done)(void *done_arg, struct grpc_cq_completion *c);
+ void (*done)(void *done_arg, struct grpc_cq_completion *c,
+ grpc_call_list *call_list);
void *done_arg;
/** next pointer; low bit is used to indicate success or not */
gpr_uintptr next;
@@ -74,7 +75,8 @@ void grpc_cq_begin_op(grpc_completion_queue *cc);
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage,
grpc_call_list *call_list),
- void *done_arg, grpc_cq_completion *storage);
+ void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index a5de900eff..c5cf33f1f9 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -55,13 +55,14 @@ typedef struct {
} channel_data;
static void lame_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->send_ops != NULL) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb(op->on_done_send->cb_arg, 0);
+ op->on_done_send->cb(op->on_done_send->cb_arg, 0, call_list);
}
if (op->recv_ops != NULL) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
@@ -80,44 +81,48 @@ static void lame_start_transport_stream_op(grpc_call_element *elem,
mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
+ op->on_done_recv->cb(op->on_done_recv->cb_arg, 1, call_list);
}
if (op->on_consumed != NULL) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0, call_list);
}
}
-static char *lame_get_peer(grpc_call_element *elem) {
+static char *lame_get_peer(grpc_call_element *elem, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
return grpc_channel_get_target(chand->master);
}
static void lame_start_transport_op(grpc_channel_element *elem,
- grpc_transport_op *op) {
+ grpc_transport_op *op,
+ grpc_call_list *call_list) {
if (op->on_connectivity_state_change) {
GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE);
*op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
op->on_connectivity_state_change->cb(
- op->on_connectivity_state_change->cb_arg, 1);
+ op->on_connectivity_state_change->cb_arg, 1, call_list);
}
if (op->on_consumed != NULL) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 1);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 1, call_list);
}
}
static void init_call_elem(grpc_call_element *elem,
const void *transport_server_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
if (initial_op) {
- grpc_transport_stream_op_finish_with_failure(initial_op);
+ grpc_transport_stream_op_finish_with_failure(initial_op, call_list);
}
}
-static void destroy_call_elem(grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_call_element *elem,
+ grpc_call_list *call_list) {}
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ int is_first, int is_last,
+ grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
@@ -125,7 +130,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand->master = master;
}
-static void destroy_channel_elem(grpc_channel_element *elem) {}
+static void destroy_channel_elem(grpc_channel_element *elem,
+ grpc_call_list *call_list) {}
static const grpc_channel_filter lame_filter = {
lame_start_transport_stream_op,
@@ -148,14 +154,16 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
grpc_channel *channel;
grpc_channel_element *elem;
channel_data *chand;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
static const grpc_channel_filter *filters[] = {&lame_filter};
- channel = grpc_channel_create_from_filters(target, filters, 1, NULL,
- grpc_mdctx_create(),
- grpc_workqueue_create(), 1);
+ channel = grpc_channel_create_from_filters(
+ target, filters, 1, NULL, grpc_mdctx_create(),
+ grpc_workqueue_create(&call_list), 1, &call_list);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GPR_ASSERT(elem->filter == &lame_filter);
chand = (channel_data *)elem->channel_data;
chand->error_code = error_code;
chand->error_message = error_message;
+ grpc_call_list_run(&call_list);
return channel;
}
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 4a75d03f0a..b5b9ee173e 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -108,8 +108,9 @@ static void on_secure_transport_setup_done(void *arg,
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
c->result->transport = grpc_create_chttp2_transport(
- c->args.channel_args, secure_endpoint, c->mdctx, 1);
- grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
+ c->args.channel_args, secure_endpoint, c->mdctx, 1, call_list);
+ grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0,
+ call_list);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
c->result->filters[0] = &grpc_http_client_filter;
c->result->filters[1] = &grpc_client_auth_filter;
@@ -187,12 +188,13 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
+static void subchannel_factory_unref(grpc_subchannel_factory *scf,
+ grpc_call_list *call_list) {
subchannel_factory *f = (subchannel_factory *)scf;
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"subchannel_factory");
- GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
+ GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory", call_list);
grpc_channel_args_destroy(f->merge_args);
grpc_mdctx_unref(f->mdctx);
gpr_free(f);
@@ -279,7 +281,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(target, filters, n, args_copy,
- mdctx, workqueue, 1);
+ mdctx, workqueue, 1, &call_list);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
@@ -299,7 +301,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
resolver, &call_list);
GRPC_RESOLVER_UNREF(resolver, "create", &call_list);
- grpc_subchannel_factory_unref(&f->base);
+ grpc_subchannel_factory_unref(&f->base, &call_list);
GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create");
grpc_channel_args_destroy(args_copy);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e38c6028d9..24545c67e1 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -57,9 +57,11 @@
typedef struct listener {
void *arg;
void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
- size_t pollset_count);
- void (*destroy)(grpc_server *server, void *arg);
+ size_t pollset_count, grpc_call_list *call_list);
+ void (*destroy)(grpc_server *server, void *arg, grpc_closure *closure,
+ grpc_call_list *call_list);
struct listener *next;
+ grpc_closure destroy_done;
} listener;
typedef struct call_data call_data;
@@ -219,19 +221,19 @@ struct grpc_server {
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
-
- grpc_workqueue *workqueue;
};
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
static void begin_call(grpc_server *server, call_data *calld,
- requested_call *rc);
-static void fail_call(grpc_server *server, requested_call *rc);
+ requested_call *rc, grpc_call_list *call_list);
+static void fail_call(grpc_server *server, requested_call *rc,
+ grpc_call_list *call_list);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
-static void maybe_finish_shutdown(grpc_server *server);
+static void maybe_finish_shutdown(grpc_server *server,
+ grpc_call_list *call_list);
/*
* channel broadcaster
@@ -258,14 +260,15 @@ struct shutdown_cleanup_args {
gpr_slice slice;
};
-static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
+static void shutdown_cleanup(void *arg, int iomgr_status_ignored,
+ grpc_call_list *call_list) {
struct shutdown_cleanup_args *a = arg;
gpr_slice_unref(a->slice);
gpr_free(a);
}
static void send_shutdown(grpc_channel *channel, int send_goaway,
- int send_disconnect) {
+ int send_disconnect, grpc_call_list *call_list) {
grpc_transport_op op;
struct shutdown_cleanup_args *sc;
grpc_channel_element *elem;
@@ -281,17 +284,17 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
op.on_consumed = &sc->closure;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(elem, &op);
+ elem->filter->start_transport_op(elem, &op, call_list);
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
- int send_goaway,
- int force_disconnect) {
+ int send_goaway, int force_disconnect,
+ grpc_call_list *call_list) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
- send_shutdown(cb->channels[i], send_goaway, force_disconnect);
- GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
+ send_shutdown(cb->channels[i], send_goaway, force_disconnect, call_list);
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast", call_list);
}
gpr_free(cb->channels);
}
@@ -311,12 +314,12 @@ static void request_matcher_destroy(request_matcher *request_matcher) {
gpr_stack_lockfree_destroy(request_matcher->requests);
}
-static void kill_zombie(void *elem, int success) {
+static void kill_zombie(void *elem, int success, grpc_call_list *call_list) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
static void request_matcher_zombify_all_pending_calls(
- request_matcher *request_matcher, grpc_workqueue *workqueue) {
+ request_matcher *request_matcher, grpc_call_list *call_list) {
while (request_matcher->pending_head) {
call_data *calld = request_matcher->pending_head;
request_matcher->pending_head = calld->pending_next;
@@ -326,15 +329,16 @@ static void request_matcher_zombify_all_pending_calls(
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
}
}
static void request_matcher_kill_requests(grpc_server *server,
- request_matcher *rm) {
+ request_matcher *rm,
+ grpc_call_list *call_list) {
int request_id;
while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
- fail_call(server, &server->requested_calls[request_id]);
+ fail_call(server, &server->requested_calls[request_id], call_list);
}
}
@@ -346,7 +350,7 @@ static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
-static void server_delete(grpc_server *server) {
+static void server_delete(grpc_server *server, grpc_call_list *call_list) {
registered_method *rm;
size_t i;
grpc_channel_args_destroy(server->channel_args);
@@ -365,7 +369,6 @@ static void server_delete(grpc_server *server) {
}
request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
- GRPC_WORKQUEUE_UNREF(server->workqueue, "destroy");
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -373,9 +376,9 @@ static void server_delete(grpc_server *server) {
gpr_free(server);
}
-static void server_unref(grpc_server *server) {
+static void server_unref(grpc_server *server, grpc_call_list *call_list) {
if (gpr_unref(&server->internal_refcount)) {
- server_delete(server);
+ server_delete(server, call_list);
}
}
@@ -389,30 +392,29 @@ static void orphan_channel(channel_data *chand) {
chand->next = chand->prev = chand;
}
-static void finish_destroy_channel(void *cd, int success) {
+static void finish_destroy_channel(void *cd, int success,
+ grpc_call_list *call_list) {
channel_data *chand = cd;
grpc_server *server = chand->server;
gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel);
- GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
- server_unref(server);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server", call_list);
+ server_unref(server, call_list);
}
-static void destroy_channel(channel_data *chand) {
+static void destroy_channel(channel_data *chand, grpc_call_list *call_list) {
if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
server_ref(chand->server);
- maybe_finish_shutdown(chand->server);
+ maybe_finish_shutdown(chand->server, call_list);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- gpr_log(GPR_DEBUG, "queue finish_destroy_channel: %p on %p", chand->channel,
- chand->server->workqueue);
- grpc_workqueue_push(chand->server->workqueue,
- &chand->finish_destroy_channel_closure, 1);
+ grpc_call_list_add(call_list, &chand->finish_destroy_channel_closure, 1);
}
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
- request_matcher *request_matcher) {
+ request_matcher *request_matcher,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
int request_id;
@@ -421,7 +423,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
return;
}
@@ -443,11 +445,11 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(server, calld, &server->requested_calls[request_id]);
+ begin_call(server, calld, &server->requested_calls[request_id], call_list);
}
}
-static void start_new_rpc(grpc_call_element *elem) {
+static void start_new_rpc(grpc_call_element *elem, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_server *server = chand->server;
@@ -466,7 +468,8 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
finish_start_new_rpc(server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ call_list);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -478,11 +481,13 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
finish_start_new_rpc(server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ call_list);
return;
}
}
- finish_start_new_rpc(server, elem, &server->unregistered_request_matcher);
+ finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
+ call_list);
}
static int num_listeners(grpc_server *server) {
@@ -494,8 +499,9 @@ static int num_listeners(grpc_server *server) {
return n;
}
-static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
- server_unref(server);
+static void done_shutdown_event(void *server, grpc_cq_completion *completion,
+ grpc_call_list *call_list) {
+ server_unref(server, call_list);
}
static int num_channels(grpc_server *server) {
@@ -508,25 +514,27 @@ static int num_channels(grpc_server *server) {
return n;
}
-static void kill_pending_work_locked(grpc_server *server) {
+static void kill_pending_work_locked(grpc_server *server,
+ grpc_call_list *call_list) {
registered_method *rm;
- request_matcher_kill_requests(server, &server->unregistered_request_matcher);
+ request_matcher_kill_requests(server, &server->unregistered_request_matcher,
+ call_list);
request_matcher_zombify_all_pending_calls(
- &server->unregistered_request_matcher, server->workqueue);
+ &server->unregistered_request_matcher, call_list);
for (rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_kill_requests(server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(&rm->request_matcher,
- server->workqueue);
+ request_matcher_kill_requests(server, &rm->request_matcher, call_list);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher, call_list);
}
}
-static void maybe_finish_shutdown(grpc_server *server) {
+static void maybe_finish_shutdown(grpc_server *server,
+ grpc_call_list *call_list) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
- kill_pending_work_locked(server);
+ kill_pending_work_locked(server, call_list);
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
@@ -548,7 +556,7 @@ static void maybe_finish_shutdown(grpc_server *server) {
server_ref(server);
grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
done_shutdown_event, server,
- &server->shutdown_tags[i].completion);
+ &server->shutdown_tags[i].completion, call_list);
}
}
@@ -566,10 +574,9 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void server_on_recv(void *ptr, int success) {
+static void server_on_recv(void *ptr, int success, grpc_call_list *call_list) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
gpr_timespec op_deadline;
if (success && !calld->got_initial_metadata) {
@@ -587,7 +594,7 @@ static void server_on_recv(void *ptr, int success) {
}
if (calld->host && calld->path) {
calld->got_initial_metadata = 1;
- start_new_rpc(elem);
+ start_new_rpc(elem, call_list);
}
break;
}
@@ -604,8 +611,7 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_workqueue_push(chand->server->workqueue,
- &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
} else {
gpr_mu_unlock(&calld->mu_state);
}
@@ -616,8 +622,7 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_workqueue_push(chand->server->workqueue,
- &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -629,7 +634,7 @@ static void server_on_recv(void *ptr, int success) {
break;
}
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, call_list);
}
static void server_mutate_op(grpc_call_element *elem,
@@ -646,10 +651,11 @@ static void server_mutate_op(grpc_call_element *elem,
}
static void server_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
- grpc_call_next_op(elem, op);
+ grpc_call_next_op(elem, op, call_list);
}
static void accept_stream(void *cd, grpc_transport *transport,
@@ -660,7 +666,8 @@ static void accept_stream(void *cd, grpc_transport *transport,
0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
-static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
+static void channel_connectivity_changed(void *cd, int iomgr_status_ignored,
+ grpc_call_list *call_list) {
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
@@ -670,18 +677,19 @@ static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
op.connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- &op);
+ &op, call_list);
} else {
gpr_mu_lock(&server->mu_global);
- destroy_channel(chand);
+ destroy_channel(chand, call_list);
gpr_mu_unlock(&server->mu_global);
- GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity", call_list);
}
}
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -696,7 +704,8 @@ static void init_call_elem(grpc_call_element *elem,
if (initial_op) server_mutate_op(elem, initial_op);
}
-static void destroy_call_elem(grpc_call_element *elem) {
+static void destroy_call_elem(grpc_call_element *elem,
+ grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
@@ -711,13 +720,13 @@ static void destroy_call_elem(grpc_call_element *elem) {
gpr_mu_destroy(&calld->mu_state);
- server_unref(chand->server);
+ server_unref(chand->server, call_list);
}
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
- int is_last) {
+ int is_last, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(is_first);
GPR_ASSERT(!is_last);
@@ -733,7 +742,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channel_connectivity_changed, chand);
}
-static void destroy_channel_elem(grpc_channel_element *elem) {
+static void destroy_channel_elem(grpc_channel_element *elem,
+ grpc_call_list *call_list) {
size_t i;
channel_data *chand = elem->channel_data;
if (chand->registered_methods) {
@@ -752,11 +762,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
- maybe_finish_shutdown(chand->server);
+ maybe_finish_shutdown(chand->server, call_list);
gpr_mu_unlock(&chand->server->mu_global);
GRPC_MDSTR_UNREF(chand->path_key);
GRPC_MDSTR_UNREF(chand->authority_key);
- server_unref(chand->server);
+ server_unref(chand->server, call_list);
}
}
@@ -810,7 +820,6 @@ grpc_server *grpc_server_create_from_filters(
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
- server->workqueue = grpc_workqueue_create();
/* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls = 32768;
@@ -881,23 +890,26 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
void grpc_server_start(grpc_server *server) {
listener *l;
size_t i;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
- grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]);
}
for (l = server->listeners; l; l = l->next) {
- l->start(server, l->arg, server->pollsets, server->cq_count);
+ l->start(server, l->arg, server->pollsets, server->cq_count, &call_list);
}
+
+ grpc_call_list_run(&call_list);
}
void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
grpc_workqueue *workqueue,
- const grpc_channel_args *args) {
+ const grpc_channel_args *args,
+ grpc_call_list *call_list) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters =
gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
@@ -927,11 +939,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
for (i = 0; i < s->cq_count; i++) {
memset(&op, 0, sizeof(op));
op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
- grpc_transport_perform_op(transport, &op);
+ grpc_transport_perform_op(transport, &op, call_list);
}
channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
- mdctx, workqueue, 0);
+ mdctx, workqueue, 0, call_list);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
@@ -987,19 +999,30 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
op.on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state;
op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
- grpc_transport_perform_op(transport, &op);
+ grpc_transport_perform_op(transport, &op, call_list);
}
-void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) {
+void done_published_shutdown(void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list) {
(void) done_arg;
gpr_free(storage);
}
+static void listener_destroy_done(void *s, int success,
+ grpc_call_list *call_list) {
+ grpc_server *server = s;
+ gpr_mu_lock(&server->mu_global);
+ server->listeners_destroyed++;
+ maybe_finish_shutdown(server, call_list);
+ gpr_mu_unlock(&server->mu_global);
+}
+
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
@@ -1008,9 +1031,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_cq_begin_op(cq);
if (server->shutdown_published) {
grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL,
- gpr_malloc(sizeof(grpc_cq_completion)));
+ gpr_malloc(sizeof(grpc_cq_completion)), &call_list);
gpr_mu_unlock(&server->mu_global);
- return;
+ goto done;
}
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
@@ -1020,7 +1043,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
sdt->cq = cq;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_unlock(&server->mu_global);
- return;
+ goto done;
}
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
@@ -1029,41 +1052,40 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- kill_pending_work_locked(server);
+ kill_pending_work_locked(server, &call_list);
gpr_mu_unlock(&server->mu_call);
gpr_atm_rel_store(&server->shutdown_flag, 1);
- maybe_finish_shutdown(server);
+ maybe_finish_shutdown(server, &call_list);
gpr_mu_unlock(&server->mu_global);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
- l->destroy(server, l->arg);
+ grpc_closure_init(&l->destroy_done, listener_destroy_done, server);
+ l->destroy(server, l->arg, &l->destroy_done, &call_list);
}
- channel_broadcaster_shutdown(&broadcaster, 1, 0);
-}
+ channel_broadcaster_shutdown(&broadcaster, 1, 0, &call_list);
-void grpc_server_listener_destroy_done(void *s) {
- grpc_server *server = s;
- gpr_mu_lock(&server->mu_global);
- server->listeners_destroyed++;
- maybe_finish_shutdown(server);
- gpr_mu_unlock(&server->mu_global);
+done:
+ grpc_call_list_run(&call_list);
}
void grpc_server_cancel_all_calls(grpc_server *server) {
channel_broadcaster broadcaster;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&server->mu_global);
channel_broadcaster_init(server, &broadcaster);
gpr_mu_unlock(&server->mu_global);
- channel_broadcaster_shutdown(&broadcaster, 0, 1);
+ channel_broadcaster_shutdown(&broadcaster, 0, 1, &call_list);
+ grpc_call_list_run(&call_list);
}
void grpc_server_destroy(grpc_server *server) {
listener *l;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&server->mu_global);
GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
@@ -1077,16 +1099,17 @@ void grpc_server_destroy(grpc_server *server) {
gpr_mu_unlock(&server->mu_global);
- grpc_workqueue_flush(server->workqueue);
-
- server_unref(server);
+ server_unref(server, &call_list);
+ grpc_call_list_run(&call_list);
}
-void grpc_server_add_listener(grpc_server *server, void *arg,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset **pollsets,
- size_t pollset_count),
- void (*destroy)(grpc_server *server, void *arg)) {
+void grpc_server_add_listener(
+ grpc_server *server, void *arg,
+ void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
+ size_t pollset_count, grpc_call_list *call_list),
+ void (*destroy)(grpc_server *server, void *arg, grpc_closure *on_done,
+ grpc_call_list *call_list),
+ grpc_call_list *call_list) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
l->start = start;
@@ -1096,18 +1119,19 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
}
static grpc_call_error queue_call_request(grpc_server *server,
- requested_call *rc) {
+ requested_call *rc,
+ grpc_call_list *call_list) {
call_data *calld = NULL;
request_matcher *request_matcher = NULL;
int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
- fail_call(server, rc);
+ fail_call(server, rc, call_list);
return GRPC_CALL_OK;
}
request_id = gpr_stack_lockfree_pop(server->request_freelist);
if (request_id == -1) {
/* out of request ids: just fail this one */
- fail_call(server, rc);
+ fail_call(server, rc, call_list);
return GRPC_CALL_OK;
}
switch (rc->type) {
@@ -1135,12 +1159,13 @@ static grpc_call_error queue_call_request(grpc_server *server,
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(server, calld, &server->requested_calls[request_id]);
+ begin_call(server, calld, &server->requested_calls[request_id],
+ call_list);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1154,13 +1179,16 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
+ grpc_call_error error;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
requested_call *rc = gpr_malloc(sizeof(*rc));
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
if (!grpc_cq_is_server_cq(cq_for_notification)) {
gpr_free(rc);
- return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ goto done;
}
grpc_cq_begin_op(cq_for_notification);
details->reserved = NULL;
@@ -1172,7 +1200,10 @@ grpc_call_error grpc_server_request_call(
rc->call = call;
rc->data.batch.details = details;
rc->data.batch.initial_metadata = initial_metadata;
- return queue_call_request(server, rc);
+ error = queue_call_request(server, rc, &call_list);
+done:
+ grpc_call_list_run(&call_list);
+ return error;
}
grpc_call_error grpc_server_request_registered_call(
@@ -1180,11 +1211,14 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
+ grpc_call_error error;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm;
if (!grpc_cq_is_server_cq(cq_for_notification)) {
gpr_free(rc);
- return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ goto done;
}
grpc_cq_begin_op(cq_for_notification);
rc->type = REGISTERED_CALL;
@@ -1197,12 +1231,16 @@ grpc_call_error grpc_server_request_registered_call(
rc->data.registered.deadline = deadline;
rc->data.registered.initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
- return queue_call_request(server, rc);
+ error = queue_call_request(server, rc, &call_list);
+done:
+ grpc_call_list_run(&call_list);
+ return error;
}
-static void publish_registered_or_batch(grpc_call *call, int success,
- void *tag);
-static void publish_was_not_set(grpc_call *call, int success, void *tag) {
+static void publish_registered_or_batch(grpc_call *call, int success, void *tag,
+ grpc_call_list *call_list);
+static void publish_was_not_set(grpc_call *call, int success, void *tag,
+ grpc_call_list *call_list) {
abort();
}
@@ -1218,7 +1256,7 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
}
static void begin_call(grpc_server *server, call_data *calld,
- requested_call *rc) {
+ requested_call *rc, grpc_call_list *call_list) {
grpc_ioreq_completion_func publish = publish_was_not_set;
grpc_ioreq req[2];
grpc_ioreq *r = req;
@@ -1229,7 +1267,7 @@ static void begin_call(grpc_server *server, call_data *calld,
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
- grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
+ grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call, call_list);
*rc->call = calld->call;
calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
@@ -1265,10 +1303,11 @@ static void begin_call(grpc_server *server, call_data *calld,
GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req),
- publish, rc);
+ publish, rc, call_list);
}
-static void done_request_event(void *req, grpc_cq_completion *c) {
+static void done_request_event(void *req, grpc_cq_completion *c,
+ grpc_call_list *call_list) {
requested_call *rc = req;
grpc_server *server = rc->server;
@@ -1281,10 +1320,11 @@ static void done_request_event(void *req, grpc_cq_completion *c) {
gpr_free(req);
}
- server_unref(server);
+ server_unref(server, call_list);
}
-static void fail_call(grpc_server *server, requested_call *rc) {
+static void fail_call(grpc_server *server, requested_call *rc,
+ grpc_call_list *call_list) {
*rc->call = NULL;
switch (rc->type) {
case BATCH_CALL:
@@ -1296,11 +1336,11 @@ static void fail_call(grpc_server *server, requested_call *rc) {
}
server_ref(server);
grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
- &rc->completion);
+ &rc->completion, call_list);
}
-static void publish_registered_or_batch(grpc_call *call, int success,
- void *prc) {
+static void publish_registered_or_batch(grpc_call *call, int success, void *prc,
+ grpc_call_list *call_list) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
requested_call *rc = prc;
@@ -1308,7 +1348,7 @@ static void publish_registered_or_batch(grpc_call *call, int success,
channel_data *chand = elem->channel_data;
server_ref(chand->server);
grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
- &rc->completion);
+ &rc->completion, call_list);
GRPC_CALL_INTERNAL_UNREF(call, "server", call_list);
}
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index f87296797c..2f2c5b8948 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -49,9 +49,9 @@ void grpc_server_add_listener(
grpc_server *server, void *listener,
void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
size_t npollsets, grpc_call_list *call_list),
- void (*destroy)(grpc_server *server, void *arg, grpc_call_list *call_list));
-
-void grpc_server_listener_destroy_done(void *server);
+ void (*destroy)(grpc_server *server, void *arg, grpc_closure *on_done,
+ grpc_call_list *call_list),
+ grpc_call_list *call_list);
/* Setup a transport - creates a channel stack, binds the transport to the
server */
@@ -59,7 +59,8 @@ void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
grpc_workqueue *workqueue,
- const grpc_channel_args *args);
+ const grpc_channel_args *args,
+ grpc_call_list *call_list);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 91cf6ece9c..df63d99dea 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -43,15 +43,17 @@
#include <grpc/support/useful.h>
static void setup_transport(void *server, grpc_transport *transport,
- grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
+ grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+ grpc_call_list *call_list) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
- grpc_server_get_channel_args(server));
+ grpc_server_get_channel_args(server), call_list);
}
-static void new_transport(void *server, grpc_endpoint *tcp) {
+static void new_transport(void *server, grpc_endpoint *tcp,
+ grpc_call_list *call_list) {
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code
@@ -60,25 +62,27 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
* case.
*/
grpc_mdctx *mdctx = grpc_mdctx_create();
- grpc_workqueue *workqueue = grpc_workqueue_create();
+ grpc_workqueue *workqueue = grpc_workqueue_create(call_list);
grpc_transport *transport = grpc_create_chttp2_transport(
- grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0);
- setup_transport(server, transport, mdctx, workqueue);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_server_get_channel_args(server), tcp, mdctx, 0, call_list);
+ setup_transport(server, transport, mdctx, workqueue, call_list);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list);
}
/* Server callback: start listening on our ports */
static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets,
- size_t pollset_count) {
+ size_t pollset_count, grpc_call_list *call_list) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server);
+ grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server,
+ call_list);
}
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
-static void destroy(grpc_server *server, void *tcpp) {
+static void destroy(grpc_server *server, void *tcpp, grpc_closure *destroy_done,
+ grpc_call_list *call_list) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server);
+ grpc_tcp_server_destroy(tcp, destroy_done, call_list);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
@@ -88,6 +92,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
unsigned count = 0;
int port_num = -1;
int port_temp;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
resolved = grpc_blocking_resolve_address(addr, "http");
if (!resolved) {
@@ -124,9 +129,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
grpc_resolved_addresses_destroy(resolved);
/* Register with the server only upon success */
- grpc_server_add_listener(server, tcp, start, destroy);
-
- return port_num;
+ grpc_server_add_listener(server, tcp, start, destroy, &call_list);
+ goto done;
/* Error path: cleanup and return */
error:
@@ -136,5 +140,9 @@ error:
if (tcp) {
grpc_tcp_server_destroy(tcp, NULL, NULL);
}
+ port_num = 0;
+
+done:
+ grpc_call_list_run(&call_list);
return 0;
}