aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/grpc_filter.c6
-rw-r--r--src/core/channel/channel_stack.c5
-rw-r--r--src/core/channel/channel_stack.h6
-rw-r--r--src/core/channel/client_channel.c9
-rw-r--r--src/core/channel/compress_filter.c3
-rw-r--r--src/core/channel/connected_channel.c19
-rw-r--r--src/core/channel/http_client_filter.c3
-rw-r--r--src/core/channel/http_server_filter.c3
-rw-r--r--src/core/channel/noop_filter.c3
-rw-r--r--src/core/client_config/lb_policies/pick_first.c2
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c2
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c2
-rw-r--r--src/core/client_config/subchannel.c18
-rw-r--r--src/core/client_config/subchannel_factory.c6
-rw-r--r--src/core/client_config/subchannel_factory.h5
-rw-r--r--src/core/client_config/subchannel_factory_decorators/merge_channel_args.c5
-rw-r--r--src/core/httpcli/httpcli.c7
-rw-r--r--src/core/httpcli/httpcli.h6
-rw-r--r--src/core/iomgr/alarm.c3
-rw-r--r--src/core/iomgr/pollset.h7
-rw-r--r--src/core/iomgr/pollset_posix.c20
-rw-r--r--src/core/iomgr/resolve_address.h1
-rw-r--r--src/core/security/client_auth_filter.c3
-rw-r--r--src/core/security/google_default_credentials.c2
-rw-r--r--src/core/security/server_auth_filter.c3
-rw-r--r--src/core/security/server_secure_chttp2.c20
-rw-r--r--src/core/surface/call.c22
-rw-r--r--src/core/surface/call.h3
-rw-r--r--src/core/surface/channel.c19
-rw-r--r--src/core/surface/channel.h14
-rw-r--r--src/core/surface/channel_connectivity.c40
-rw-r--r--src/core/surface/channel_create.c60
-rw-r--r--src/core/surface/completion_queue.c32
-rw-r--r--src/core/surface/completion_queue.h6
-rw-r--r--src/core/surface/lame_client.c40
-rw-r--r--src/core/surface/secure_channel_create.c14
-rw-r--r--src/core/surface/server.c288
-rw-r--r--src/core/surface/server.h9
-rw-r--r--src/core/surface/server_chttp2.c36
-rw-r--r--src/core/transport/chttp2/frame_data.c3
-rw-r--r--src/core/transport/chttp2/frame_data.h4
-rw-r--r--src/core/transport/chttp2/frame_goaway.c3
-rw-r--r--src/core/transport/chttp2/frame_goaway.h4
-rw-r--r--src/core/transport/chttp2/frame_ping.c7
-rw-r--r--src/core/transport/chttp2/frame_ping.h4
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.c3
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.h4
-rw-r--r--src/core/transport/chttp2/frame_settings.c3
-rw-r--r--src/core/transport/chttp2/frame_settings.h4
-rw-r--r--src/core/transport/chttp2/frame_window_update.c3
-rw-r--r--src/core/transport/chttp2/frame_window_update.h4
-rw-r--r--src/core/transport/chttp2/hpack_parser.c3
-rw-r--r--src/core/transport/chttp2/hpack_parser.h4
-rw-r--r--src/core/transport/chttp2/internal.h21
-rw-r--r--src/core/transport/chttp2/parsing.c30
-rw-r--r--src/core/transport/chttp2/writing.c22
-rw-r--r--src/core/transport/chttp2_transport.c284
-rw-r--r--src/core/transport/chttp2_transport.h5
-rw-r--r--src/core/transport/connectivity_state.c5
-rw-r--r--src/core/transport/connectivity_state.h3
-rw-r--r--src/core/transport/transport.c52
-rw-r--r--src/core/transport/transport.h22
-rw-r--r--src/core/transport/transport_impl.h16
63 files changed, 716 insertions, 549 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index d66f29228b..fa318c4e54 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -128,7 +128,8 @@ static void server_start_transport_op(grpc_call_element* elem,
static void client_init_call_elem(grpc_call_element* elem,
const void* server_transport_data,
- grpc_transport_stream_op* initial_op) {
+ grpc_transport_stream_op* initial_op,
+ grpc_call_list* call_list) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
@@ -144,7 +145,8 @@ static void client_destroy_call_elem(grpc_call_element* elem,
static void server_init_call_elem(grpc_call_element* elem,
const void* server_transport_data,
- grpc_transport_stream_op* initial_op) {
+ grpc_transport_stream_op* initial_op,
+ grpc_call_list* call_list) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index c767a87b20..10fbd520c3 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -153,7 +153,8 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack,
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
grpc_transport_stream_op *initial_op,
- grpc_call_stack *call_stack) {
+ grpc_call_stack *call_stack,
+ grpc_call_list *call_list) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
grpc_call_element *call_elems;
@@ -171,7 +172,7 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack,
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data,
- initial_op);
+ initial_op, call_list);
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 190247b3f1..5afe7f258a 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -85,7 +85,8 @@ typedef struct {
argument.*/
void (*init_call_elem)(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op);
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_call_element *elem, grpc_call_list *call_list);
@@ -172,7 +173,8 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack,
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
grpc_transport_stream_op *initial_op,
- grpc_call_stack *call_stack);
+ grpc_call_stack *call_stack,
+ grpc_call_list *call_list);
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack, grpc_call_list *call_list);
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 6618336e93..62f81daf44 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -459,7 +459,7 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success,
on_lb_policy_state_changed_locked(w, call_list);
gpr_mu_unlock(&w->chand->mu_config);
- GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
+ GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy", call_list);
gpr_free(w);
}
@@ -551,7 +551,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success,
GRPC_LB_POLICY_UNREF(lb_policy, "config_change", call_list);
}
- GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver", call_list);
}
static void cc_start_transport_op(grpc_channel_element *elem,
@@ -610,7 +610,8 @@ static void cc_start_transport_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
/* TODO(ctiller): is there something useful we can do here? */
@@ -688,7 +689,7 @@ static void destroy_channel_elem(grpc_channel_element *elem,
if (chand->lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel", call_list);
}
- grpc_connectivity_state_destroy(&chand->state_tracker);
+ grpc_connectivity_state_destroy(&chand->state_tracker, call_list);
grpc_pollset_set_destroy(&chand->pollset_set);
gpr_mu_destroy(&chand->mu_config);
}
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 5a7403ccfd..e2be603e26 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -282,7 +282,8 @@ static void compress_start_transport_stream_op(grpc_call_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 00a4d61afe..1af5eae947 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -69,21 +69,22 @@ static void con_start_transport_stream_op(grpc_call_element *elem,
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_transport_perform_stream_op(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
+ grpc_transport_perform_stream_op(
+ chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op, call_list);
}
static void con_start_transport_op(grpc_channel_element *elem,
grpc_transport_op *op,
grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
- grpc_transport_perform_op(chand->transport, op);
+ grpc_transport_perform_op(chand->transport, op, call_list);
}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
@@ -91,7 +92,7 @@ static void init_call_elem(grpc_call_element *elem,
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- server_transport_data, initial_op);
+ server_transport_data, initial_op, call_list);
GPR_ASSERT(r == 0);
}
@@ -101,8 +102,8 @@ static void destroy_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_transport_destroy_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld));
+ grpc_transport_destroy_stream(
+ chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), call_list);
}
/* Constructor for channel_data */
@@ -121,12 +122,12 @@ static void destroy_channel_elem(grpc_channel_element *elem,
grpc_call_list *call_list) {
channel_data *cd = (channel_data *)elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_transport_destroy(cd->transport);
+ grpc_transport_destroy(cd->transport, call_list);
}
static char *con_get_peer(grpc_call_element *elem, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
- return grpc_transport_get_peer(chand->transport);
+ return grpc_transport_get_peer(chand->transport, call_list);
}
const grpc_channel_filter grpc_connected_channel_filter = {
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 4e9eac72ba..d0adafc048 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -165,7 +165,8 @@ static void hc_start_transport_op(grpc_call_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 201adf4f35..70cc4f298a 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -238,7 +238,8 @@ static void hs_start_transport_op(grpc_call_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index d4f1acb4c5..de75f83654 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -74,7 +74,8 @@ static void noop_start_transport_stream_op(grpc_call_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 6dc52f43ce..8ea774bebc 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -101,7 +101,7 @@ void pf_destroy(grpc_lb_policy *pol, grpc_call_list *call_list) {
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", call_list);
}
- grpc_connectivity_state_destroy(&p->state_tracker);
+ grpc_connectivity_state_destroy(&p->state_tracker, call_list);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
gpr_free(p);
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 5750db4b43..0fc89ed864 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -197,7 +197,7 @@ static void dns_destroy(grpc_resolver *gr, grpc_call_list *call_list) {
if (r->resolved_config) {
grpc_client_config_unref(r->resolved_config, call_list);
}
- grpc_subchannel_factory_unref(r->subchannel_factory);
+ grpc_subchannel_factory_unref(r->subchannel_factory, call_list);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 38293b0f13..2729036d4f 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -159,7 +159,7 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r,
static void sockaddr_destroy(grpc_resolver *gr, grpc_call_list *call_list) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- grpc_subchannel_factory_unref(r->subchannel_factory);
+ grpc_subchannel_factory_unref(r->subchannel_factory, call_list);
gpr_free(r->addrs);
gpr_free(r->addrs_len);
gpr_free(r->lb_policy_name);
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index bae705f9c3..48df4bbd31 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -143,7 +143,8 @@ struct grpc_subchannel_call {
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
-static grpc_subchannel_call *create_call(connection *con);
+static grpc_subchannel_call *create_call(connection *con,
+ grpc_call_list *call_list);
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason,
grpc_call_list *call_list);
@@ -262,7 +263,7 @@ static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list) {
grpc_channel_args_destroy(c->args);
gpr_free(c->addr);
grpc_mdctx_unref(c->mdctx);
- grpc_connectivity_state_destroy(&c->state_tracker);
+ grpc_connectivity_state_destroy(&c->state_tracker, call_list);
grpc_connector_unref(c->connector, call_list);
gpr_free(c);
}
@@ -355,7 +356,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
CONNECTION_REF_LOCKED(con, "call");
gpr_mu_unlock(&c->mu);
- *target = create_call(con);
+ *target = create_call(con, call_list);
notify->cb(notify->cb_arg, 1, call_list);
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
@@ -561,7 +562,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
gpr_free(sw);
gpr_free(filters);
grpc_channel_stack_destroy(stk, call_list);
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
+ GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list);
GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list);
return;
}
@@ -582,7 +583,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
op.on_connectivity_state_change = &sw->closure;
op.bind_pollset_set = c->pollset_set;
SUBCHANNEL_REF_LOCKED(c, "state_watcher");
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
+ GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list);
GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
@@ -650,7 +651,7 @@ static void on_alarm(void *arg, int iomgr_success, grpc_call_list *call_list) {
update_reconnect_parameters(c);
continue_connect(c, call_list);
} else {
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
+ GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list);
GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list);
}
}
@@ -746,13 +747,14 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
top_elem->filter->start_transport_stream_op(top_elem, op, call_list);
}
-grpc_subchannel_call *create_call(connection *con) {
+static grpc_subchannel_call *create_call(connection *con,
+ grpc_call_list *call_list) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
gpr_ref_init(&call->refs, 1);
- grpc_call_stack_init(chanstk, NULL, NULL, callstk);
+ grpc_call_stack_init(chanstk, NULL, NULL, callstk, call_list);
return call;
}
diff --git a/src/core/client_config/subchannel_factory.c b/src/core/client_config/subchannel_factory.c
index 2a569aba13..e252665000 100644
--- a/src/core/client_config/subchannel_factory.c
+++ b/src/core/client_config/subchannel_factory.c
@@ -36,8 +36,10 @@
void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory) {
factory->vtable->ref(factory);
}
-void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory) {
- factory->vtable->unref(factory);
+
+void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory,
+ grpc_call_list *call_list) {
+ factory->vtable->unref(factory, call_list);
}
grpc_subchannel *grpc_subchannel_factory_create_subchannel(
diff --git a/src/core/client_config/subchannel_factory.h b/src/core/client_config/subchannel_factory.h
index b588580edb..2ff9d3c402 100644
--- a/src/core/client_config/subchannel_factory.h
+++ b/src/core/client_config/subchannel_factory.h
@@ -48,14 +48,15 @@ struct grpc_subchannel_factory {
struct grpc_subchannel_factory_vtable {
void (*ref)(grpc_subchannel_factory *factory);
- void (*unref)(grpc_subchannel_factory *factory);
+ void (*unref)(grpc_subchannel_factory *factory, grpc_call_list *call_list);
grpc_subchannel *(*create_subchannel)(grpc_subchannel_factory *factory,
grpc_subchannel_args *args,
grpc_call_list *call_list);
};
void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory);
-void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory);
+void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory,
+ grpc_call_list *call_list);
/** Create a new grpc_subchannel */
grpc_subchannel *grpc_subchannel_factory_create_subchannel(
diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
index b2c9797b1a..00dff6343c 100644
--- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
+++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
@@ -47,10 +47,11 @@ static void merge_args_factory_ref(grpc_subchannel_factory *scf) {
gpr_ref(&f->refs);
}
-static void merge_args_factory_unref(grpc_subchannel_factory *scf) {
+static void merge_args_factory_unref(grpc_subchannel_factory *scf,
+ grpc_call_list *call_list) {
merge_args_factory *f = (merge_args_factory *)scf;
if (gpr_unref(&f->refs)) {
- grpc_subchannel_factory_unref(f->wrapped);
+ grpc_subchannel_factory_unref(f->wrapped, call_list);
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 40ea2e9688..fc45eda2de 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -246,7 +246,7 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
grpc_call_list *call_list) {
char *name;
if (g_get_override &&
- g_get_override(request, deadline, on_response, user_data)) {
+ g_get_override(request, deadline, on_response, user_data, call_list)) {
return;
}
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
@@ -263,8 +263,9 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
grpc_httpcli_response_cb on_response, void *user_data,
grpc_call_list *call_list) {
char *name;
- if (g_post_override && g_post_override(request, body_bytes, body_size,
- deadline, on_response, user_data)) {
+ if (g_post_override &&
+ g_post_override(request, body_bytes, body_size, deadline, on_response,
+ user_data, call_list)) {
return;
}
gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h
index 74bb123042..6d19da37fb 100644
--- a/src/core/httpcli/httpcli.h
+++ b/src/core/httpcli/httpcli.h
@@ -147,13 +147,15 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
typedef int (*grpc_httpcli_get_override)(const grpc_httpcli_request *request,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response,
- void *user_data);
+ void *user_data,
+ grpc_call_list *call_list);
typedef int (*grpc_httpcli_post_override)(const grpc_httpcli_request *request,
const char *body_bytes,
size_t body_size,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response,
- void *user_data);
+ void *user_data,
+ grpc_call_list *call_list);
void grpc_httpcli_set_override(grpc_httpcli_get_override get,
grpc_httpcli_post_override post);
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 6e0d516f0c..146bda477d 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -291,6 +291,7 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now,
gpr_mu_lock(&shard->mu);
while ((alarm = pop_one(shard, now))) {
grpc_call_list_add(call_list, &alarm->closure, success);
+ n++;
}
*new_min_deadline = compute_min_deadline(shard);
gpr_mu_unlock(&shard->mu);
@@ -332,7 +333,7 @@ static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next,
gpr_mu_unlock(&g_checker_mu);
}
- return n > 0;
+ return (int)n;
}
int grpc_alarm_check(gpr_timespec now, gpr_timespec *next,
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 98653412bb..95ba694ff6 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -73,9 +73,12 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
not be released by grpc_pollset_work AFTER worker has been destroyed.
- Tries not to block past deadline. */
+ Tries not to block past deadline.
+ May call grpc_call_list_run on grpc_call_list, without holding the pollset
+ lock */
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec now, gpr_timespec deadline);
+ gpr_timespec now, gpr_timespec deadline,
+ grpc_call_list *call_list);
/* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 885cb29234..1040716179 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -174,21 +174,21 @@ static void finish_shutdown(grpc_pollset *pollset, grpc_call_list *call_list) {
}
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec now, gpr_timespec deadline) {
+ gpr_timespec now, gpr_timespec deadline,
+ grpc_call_list *call_list) {
/* pollset->mu already held */
int added_worker = 0;
int locked = 1;
- grpc_call_list call_list = GRPC_CALL_LIST_INIT;
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
if (!grpc_pollset_has_workers(pollset) &&
!grpc_call_list_empty(pollset->idle_jobs)) {
- grpc_call_list_move(&pollset->idle_jobs, &call_list);
+ grpc_call_list_move(&pollset->idle_jobs, call_list);
goto done;
}
- if (grpc_alarm_check(now, &deadline, &call_list)) {
+ if (grpc_alarm_check(now, &deadline, call_list)) {
goto done;
}
if (pollset->shutting_down) {
@@ -212,14 +212,8 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
pollset->kicked_without_pollers = 0;
}
done:
- if (!grpc_call_list_empty(call_list)) {
- if (locked) {
- gpr_mu_unlock(&pollset->mu);
- locked = 0;
- }
- grpc_call_list_run(&call_list);
- }
if (!locked) {
+ grpc_call_list_run(call_list);
gpr_mu_lock(&pollset->mu);
locked = 1;
}
@@ -233,8 +227,8 @@ done:
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
- finish_shutdown(pollset, &call_list);
- grpc_call_list_run(&call_list);
+ finish_shutdown(pollset, call_list);
+ grpc_call_list_run(call_list);
/* Continuing to access pollset here is safe -- it is the caller's
* responsibility to not destroy when it has outstanding calls to
* grpc_pollset_work.
diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h
index ec8d83fffa..72b9c1cc87 100644
--- a/src/core/iomgr/resolve_address.h
+++ b/src/core/iomgr/resolve_address.h
@@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H
#define GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H
+#include <stddef.h>
#include "src/core/iomgr/iomgr.h"
#define GRPC_MAX_SOCKADDR_SIZE 128
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index 0a2de4f7bb..1d06df8533 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -280,7 +280,8 @@ static void auth_start_transport_op(grpc_call_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
calld->creds = NULL;
calld->host = NULL;
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index 5da29f5ad5..8c3d85cc58 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -123,7 +123,7 @@ static int is_stack_running_on_compute_engine(void) {
while (!detector.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 2376cbbeb3..201288bbdd 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -200,7 +200,8 @@ static void auth_start_transport_op(grpc_call_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index e6e2eee658..0662839105 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -67,6 +67,7 @@ typedef struct grpc_server_secure_state {
gpr_mu mu;
gpr_refcount refcount;
grpc_closure destroy_closure;
+ grpc_closure *destroy_callback;
} grpc_server_secure_state;
static void state_ref(grpc_server_secure_state *state) {
@@ -86,7 +87,8 @@ static void state_unref(grpc_server_secure_state *state) {
}
static void setup_transport(void *statep, grpc_transport *transport,
- grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
+ grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+ grpc_call_list *call_list) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_server_auth_filter, &grpc_http_server_filter};
grpc_server_secure_state *state = statep;
@@ -100,7 +102,7 @@ static void setup_transport(void *statep, grpc_transport *transport,
GPR_ARRAY_SIZE(args_to_add));
grpc_server_setup_transport(state->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
- args_copy);
+ args_copy, call_list);
grpc_channel_args_destroy(args_copy);
}
@@ -142,9 +144,9 @@ static void on_secure_transport_setup_done(void *statep,
workqueue = grpc_workqueue_create(call_list);
transport = grpc_create_chttp2_transport(
grpc_server_get_channel_args(state->server), secure_endpoint, mdctx,
- 0);
- setup_transport(state, transport, mdctx, workqueue);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ 0, call_list);
+ setup_transport(state, transport, mdctx, workqueue, call_list);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list);
} else {
/* We need to consume this here, because the server may already have gone
* away. */
@@ -185,7 +187,8 @@ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets,
static void destroy_done(void *statep, int success, grpc_call_list *call_list) {
grpc_server_secure_state *state = statep;
- grpc_server_listener_destroy_done(state->server);
+ state->destroy_callback->cb(state->destroy_callback->cb_arg, success,
+ call_list);
gpr_mu_lock(&state->mu);
while (state->handshaking_tcp_endpoints != NULL) {
grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint,
@@ -199,12 +202,13 @@ static void destroy_done(void *statep, int success, grpc_call_list *call_list) {
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
-static void destroy(grpc_server *server, void *statep,
+static void destroy(grpc_server *server, void *statep, grpc_closure *callback,
grpc_call_list *call_list) {
grpc_server_secure_state *state = statep;
grpc_tcp_server *tcp;
gpr_mu_lock(&state->mu);
state->is_shutdown = 1;
+ state->destroy_callback = callback;
tcp = state->tcp;
gpr_mu_unlock(&state->mu);
grpc_closure_init(&state->destroy_closure, destroy_done, state);
@@ -283,7 +287,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
gpr_ref_init(&state->refcount, 1);
/* Register with the server only upon success */
- grpc_server_add_listener(server, state, start, destroy);
+ grpc_server_add_listener(server, state, start, destroy, &call_list);
grpc_call_list_run(&call_list);
return port_num;
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index f2f8f0a6ed..bbaf65759d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -356,7 +356,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
initial_op_ptr = &initial_op;
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
- CALL_STACK_FROM_CALL(call));
+ CALL_STACK_FROM_CALL(call), &call_list);
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
@@ -459,7 +459,7 @@ static void destroy_call(grpc_call *call, grpc_call_list *call_list) {
size_t i;
grpc_call *c = call;
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), call_list);
- GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
+ GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call", call_list);
gpr_mu_destroy(&c->mu);
gpr_mu_destroy(&c->completion_mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
@@ -673,7 +673,8 @@ static void unlock(grpc_call *call, grpc_call_list *call_list) {
if (completing_requests > 0) {
for (i = 0; i < completing_requests; i++) {
completed_requests[i].on_complete(call, completed_requests[i].success,
- completed_requests[i].user_data);
+ completed_requests[i].user_data,
+ call_list);
}
lock(call);
call->completing = 0;
@@ -1556,14 +1557,16 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
*(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
}
-static void finish_batch(grpc_call *call, int success, void *tag) {
+static void finish_batch(grpc_call *call, int success, void *tag,
+ grpc_call_list *call_list) {
grpc_cq_end_op(call->cq, tag, success, done_completion, call,
- allocate_completion(call));
+ allocate_completion(call), call_list);
}
-static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
+static void finish_batch_with_close(grpc_call *call, int success, void *tag,
+ grpc_call_list *call_list) {
grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
- allocate_completion(call));
+ allocate_completion(call), call_list);
}
static int are_write_flags_valid(gpr_uint32 flags) {
@@ -1581,7 +1584,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
- void (*finish_func)(grpc_call *, int, void *) = finish_batch;
+ void (*finish_func)(grpc_call *, int, void *, grpc_call_list *) =
+ finish_batch;
grpc_call_error error;
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
@@ -1596,7 +1600,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_cq_begin_op(call->cq);
GRPC_CALL_INTERNAL_REF(call, "completion");
grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
- allocate_completion(call));
+ allocate_completion(call), &call_list);
error = GRPC_CALL_OK;
goto done;
}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 7a7178bc7b..144aa7cef2 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -87,7 +87,8 @@ typedef struct {
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
- void *user_data);
+ void *user_data,
+ grpc_call_list *call_list);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 propagation_mask,
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index fdba09fcce..46bea13936 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -272,9 +272,9 @@ void grpc_channel_internal_ref(grpc_channel *c) {
gpr_ref(&c->refs);
}
-static void destroy_channel(grpc_channel *channel) {
+static void destroy_channel(grpc_channel *channel, grpc_call_list *call_list) {
size_t i;
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel), call_list);
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]);
}
@@ -303,26 +303,31 @@ static void destroy_channel(grpc_channel *channel) {
}
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
-void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) {
+void grpc_channel_internal_unref(grpc_channel *channel, const char *reason,
+ grpc_call_list *call_list) {
gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel,
channel->refs.count, channel->refs.count - 1, reason);
#else
-void grpc_channel_internal_unref(grpc_channel *channel) {
+void grpc_channel_internal_unref(grpc_channel *channel,
+ grpc_call_list *call_list) {
#endif
if (gpr_unref(&channel->refs)) {
- destroy_channel(channel);
+ destroy_channel(channel, call_list);
}
}
void grpc_channel_destroy(grpc_channel *channel) {
grpc_transport_op op;
grpc_channel_element *elem;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
memset(&op, 0, sizeof(op));
op.disconnect = 1;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->start_transport_op(elem, &op);
+ elem->filter->start_transport_op(elem, &op, &call_list);
- GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
+ GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel", &call_list);
+
+ grpc_call_list_run(&call_list);
}
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 664ecc1c5a..3f51164fcc 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -67,18 +67,20 @@ grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel);
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
-void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);
+void grpc_channel_internal_unref(grpc_channel *channel, const char *reason,
+ grpc_call_list *call_list);
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel, reason)
-#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
- grpc_channel_internal_unref(channel, reason)
+#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, call_list) \
+ grpc_channel_internal_unref(channel, reason, call_list)
#else
void grpc_channel_internal_ref(grpc_channel *channel);
-void grpc_channel_internal_unref(grpc_channel *channel);
+void grpc_channel_internal_unref(grpc_channel *channel,
+ grpc_call_list *call_list);
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel)
-#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
- grpc_channel_internal_unref(channel)
+#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason, call_list) \
+ grpc_channel_internal_unref(channel, call_list)
#endif
#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 4d0cf1ed8b..7891669e35 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -56,7 +56,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
}
state = grpc_client_channel_check_connectivity_state(
client_channel_elem, try_to_connect, &call_list);
- grpc_call_list_run(call_list);
+ grpc_call_list_run(&call_list);
return state;
}
@@ -80,17 +80,18 @@ typedef struct {
void *tag;
} state_watcher;
-static void delete_state_watcher(state_watcher *w) {
+static void delete_state_watcher(state_watcher *w, grpc_call_list *call_list) {
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
grpc_client_channel_del_interested_party(client_channel_elem,
- grpc_cq_pollset(w->cq));
- GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
+ grpc_cq_pollset(w->cq), call_list);
+ GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity", call_list);
gpr_mu_destroy(&w->mu);
gpr_free(w);
}
-static void finished_completion(void *pw, grpc_cq_completion *ignored) {
+static void finished_completion(void *pw, grpc_cq_completion *ignored,
+ grpc_call_list *call_list) {
int delete = 0;
state_watcher *w = pw;
gpr_mu_lock(&w->mu);
@@ -110,18 +111,19 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) {
gpr_mu_unlock(&w->mu);
if (delete) {
- delete_state_watcher(w);
+ delete_state_watcher(w, call_list);
}
}
-static void partly_done(state_watcher *w, int due_to_completion) {
+static void partly_done(state_watcher *w, int due_to_completion,
+ grpc_call_list *call_list) {
int delete = 0;
if (due_to_completion) {
gpr_mu_lock(&w->mu);
w->success = 1;
gpr_mu_unlock(&w->mu);
- grpc_alarm_cancel(&w->alarm);
+ grpc_alarm_cancel(&w->alarm, call_list);
}
gpr_mu_lock(&w->mu);
@@ -129,7 +131,7 @@ static void partly_done(state_watcher *w, int due_to_completion) {
case WAITING:
w->phase = CALLING_BACK;
grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w,
- &w->completion_storage);
+ &w->completion_storage, call_list);
break;
case CALLING_BACK:
w->phase = CALLING_BACK_AND_FINISHED;
@@ -145,13 +147,17 @@ static void partly_done(state_watcher *w, int due_to_completion) {
gpr_mu_unlock(&w->mu);
if (delete) {
- delete_state_watcher(w);
+ delete_state_watcher(w, call_list);
}
}
-static void watch_complete(void *pw, int success) { partly_done(pw, 1); }
+static void watch_complete(void *pw, int success, grpc_call_list *call_list) {
+ partly_done(pw, 1, call_list);
+}
-static void timeout_complete(void *pw, int success) { partly_done(pw, 0); }
+static void timeout_complete(void *pw, int success, grpc_call_list *call_list) {
+ partly_done(pw, 0, call_list);
+}
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
@@ -172,9 +178,9 @@ void grpc_channel_watch_connectivity_state(
w->tag = tag;
w->channel = channel;
- grpc_alarm_init(&w->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_alarm_init(
+ &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC), &call_list);
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
@@ -185,10 +191,10 @@ void grpc_channel_watch_connectivity_state(
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem,
- grpc_cq_pollset(cq));
+ grpc_cq_pollset(cq), &call_list);
grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
&w->on_complete, &call_list);
}
- grpc_call_list_run(call_list);
+ grpc_call_list_run(&call_list);
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index f6f42b3d7a..7ac76da10a 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -55,6 +55,13 @@ typedef struct {
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
+
+ grpc_endpoint *tcp;
+
+ grpc_mdctx *mdctx;
+ grpc_workqueue *workqueue;
+
+ grpc_closure connected;
} connector;
static void connector_ref(grpc_connector *con) {
@@ -62,21 +69,24 @@ static void connector_ref(grpc_connector *con) {
gpr_ref(&c->refs);
}
-static void connector_unref(grpc_connector *con) {
+static void connector_unref(grpc_connector *con, grpc_call_list *call_list) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
+ grpc_mdctx_unref(c->mdctx);
+ GRPC_WORKQUEUE_UNREF(c->workqueue, "connector", call_list);
gpr_free(c);
}
}
-static void connected(void *arg, grpc_endpoint *tcp) {
+static void connected(void *arg, int success, grpc_call_list *call_list) {
connector *c = arg;
grpc_closure *notify;
+ grpc_endpoint *tcp = c->tcp;
if (tcp != NULL) {
c->result->transport = grpc_create_chttp2_transport(
- c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue,
- 1);
- grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
+ c->args.channel_args, tcp, c->mdctx, 1, call_list);
+ grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0,
+ call_list);
GPR_ASSERT(c->result->transport);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
c->result->filters[0] = &grpc_http_client_filter;
@@ -86,24 +96,27 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
notify = c->notify;
c->notify = NULL;
- notify->cb(notify->cb_arg, 1);
+ notify->cb(notify->cb_arg, 1, call_list);
}
-static void connector_shutdown(grpc_connector *con) {}
+static void connector_shutdown(grpc_connector *con, grpc_call_list *call_list) {
+}
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
- grpc_closure *notify) {
+ grpc_closure *notify, grpc_call_list *call_list) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
c->notify = notify;
c->args = *args;
c->result = result;
- grpc_tcp_client_connect(connected, c, args->interested_parties,
- args->workqueue, args->addr, args->addr_len,
- args->deadline);
+ c->tcp = NULL;
+ grpc_closure_init(&c->connected, connected, c);
+ grpc_tcp_client_connect(&c->connected, &c->tcp, args->interested_parties,
+ args->addr, args->addr_len, args->deadline,
+ call_list);
}
static const grpc_connector_vtable connector_vtable = {
@@ -122,10 +135,11 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
+static void subchannel_factory_unref(grpc_subchannel_factory *scf,
+ grpc_call_list *call_list) {
subchannel_factory *f = (subchannel_factory *)scf;
if (gpr_unref(&f->refs)) {
- GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
+ GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory", call_list);
grpc_channel_args_destroy(f->merge_args);
grpc_mdctx_unref(f->mdctx);
gpr_free(f);
@@ -133,7 +147,8 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
}
static grpc_subchannel *subchannel_factory_create_subchannel(
- grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
+ grpc_subchannel_factory *scf, grpc_subchannel_args *args,
+ grpc_call_list *call_list) {
subchannel_factory *f = (subchannel_factory *)scf;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
@@ -146,7 +161,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
args->args = final_args;
args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
- grpc_connector_unref(&c->base);
+ grpc_connector_unref(&c->base, call_list);
grpc_channel_args_destroy(final_args);
return s;
}
@@ -168,7 +183,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_resolver *resolver;
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
- grpc_workqueue *workqueue = grpc_workqueue_create();
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_workqueue *workqueue = grpc_workqueue_create(&call_list);
size_t n = 0;
GPR_ASSERT(!reserved);
if (grpc_channel_args_is_census_enabled(args)) {
@@ -179,7 +195,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx,
- workqueue, 1);
+ workqueue, 1, &call_list);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
@@ -189,15 +205,17 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
f->merge_args = grpc_channel_args_copy(args);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base, workqueue);
+ resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
return NULL;
}
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
- resolver);
- GRPC_RESOLVER_UNREF(resolver, "create");
- grpc_subchannel_factory_unref(&f->base);
+ resolver, &call_list);
+ GRPC_RESOLVER_UNREF(resolver, "create", &call_list);
+ grpc_subchannel_factory_unref(&f->base, &call_list);
+
+ grpc_call_list_run(&call_list);
return channel;
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b58115a93f..49dfc3c0e1 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -67,8 +67,12 @@ struct grpc_completion_queue {
int is_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
+ grpc_closure pollset_destroy_done;
};
+static void on_pollset_destroy_done(void *cc, int success,
+ grpc_call_list *call_list);
+
grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
GPR_ASSERT(!reserved);
@@ -80,6 +84,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_pollset_init(&cc->pollset);
cc->completed_tail = &cc->completed_head;
cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
+ grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc);
return cc;
}
@@ -94,7 +99,8 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
gpr_ref(&cc->owning_refs);
}
-static void on_pollset_destroy_done(void *arg) {
+static void on_pollset_destroy_done(void *arg, int success,
+ grpc_call_list *call_list) {
grpc_completion_queue *cc = arg;
GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
}
@@ -127,8 +133,10 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) {
event, then enter shutdown mode */
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
- void (*done)(void *done_arg, grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
+ void (*done)(void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list),
+ void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list) {
int shutdown;
int i;
grpc_pollset_worker *pluck_worker;
@@ -162,7 +170,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, call_list);
}
}
@@ -172,6 +180,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_pollset_worker worker;
int first_loop = 1;
gpr_timespec now;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GPR_ASSERT(!reserved);
@@ -190,7 +199,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(c->done_arg, c, &call_list);
break;
}
if (cc->shutdown) {
@@ -207,10 +216,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
first_loop = 0;
- grpc_pollset_work(&cc->pollset, &worker, now, deadline);
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
+ grpc_call_list_run(&call_list);
return ret;
}
@@ -247,6 +257,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker worker;
gpr_timespec now;
int first_loop = 1;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GPR_ASSERT(!reserved);
@@ -268,7 +279,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(c->done_arg, c, &call_list);
goto done;
}
prev = c;
@@ -299,18 +310,20 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
break;
}
first_loop = 0;
- grpc_pollset_work(&cc->pollset, &worker, now, deadline);
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list);
del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
+ grpc_call_list_run(&call_list);
return ret;
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
if (cc->shutdown_called) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
@@ -324,8 +337,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, &call_list);
}
+ grpc_call_list_run(&call_list);
}
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 793baff03a..6d8d1ce959 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -44,7 +44,8 @@ typedef struct grpc_cq_completion {
void *tag;
/** done callback - called when this queue element is no longer
needed by the completion queue */
- void (*done)(void *done_arg, struct grpc_cq_completion *c);
+ void (*done)(void *done_arg, struct grpc_cq_completion *c,
+ grpc_call_list *call_list);
void *done_arg;
/** next pointer; low bit is used to indicate success or not */
gpr_uintptr next;
@@ -74,7 +75,8 @@ void grpc_cq_begin_op(grpc_completion_queue *cc);
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage,
grpc_call_list *call_list),
- void *done_arg, grpc_cq_completion *storage);
+ void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index a5de900eff..c5cf33f1f9 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -55,13 +55,14 @@ typedef struct {
} channel_data;
static void lame_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->send_ops != NULL) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb(op->on_done_send->cb_arg, 0);
+ op->on_done_send->cb(op->on_done_send->cb_arg, 0, call_list);
}
if (op->recv_ops != NULL) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
@@ -80,44 +81,48 @@ static void lame_start_transport_stream_op(grpc_call_element *elem,
mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
+ op->on_done_recv->cb(op->on_done_recv->cb_arg, 1, call_list);
}
if (op->on_consumed != NULL) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0, call_list);
}
}
-static char *lame_get_peer(grpc_call_element *elem) {
+static char *lame_get_peer(grpc_call_element *elem, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
return grpc_channel_get_target(chand->master);
}
static void lame_start_transport_op(grpc_channel_element *elem,
- grpc_transport_op *op) {
+ grpc_transport_op *op,
+ grpc_call_list *call_list) {
if (op->on_connectivity_state_change) {
GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE);
*op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
op->on_connectivity_state_change->cb(
- op->on_connectivity_state_change->cb_arg, 1);
+ op->on_connectivity_state_change->cb_arg, 1, call_list);
}
if (op->on_consumed != NULL) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 1);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 1, call_list);
}
}
static void init_call_elem(grpc_call_element *elem,
const void *transport_server_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
if (initial_op) {
- grpc_transport_stream_op_finish_with_failure(initial_op);
+ grpc_transport_stream_op_finish_with_failure(initial_op, call_list);
}
}
-static void destroy_call_elem(grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_call_element *elem,
+ grpc_call_list *call_list) {}
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ int is_first, int is_last,
+ grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
@@ -125,7 +130,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand->master = master;
}
-static void destroy_channel_elem(grpc_channel_element *elem) {}
+static void destroy_channel_elem(grpc_channel_element *elem,
+ grpc_call_list *call_list) {}
static const grpc_channel_filter lame_filter = {
lame_start_transport_stream_op,
@@ -148,14 +154,16 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
grpc_channel *channel;
grpc_channel_element *elem;
channel_data *chand;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
static const grpc_channel_filter *filters[] = {&lame_filter};
- channel = grpc_channel_create_from_filters(target, filters, 1, NULL,
- grpc_mdctx_create(),
- grpc_workqueue_create(), 1);
+ channel = grpc_channel_create_from_filters(
+ target, filters, 1, NULL, grpc_mdctx_create(),
+ grpc_workqueue_create(&call_list), 1, &call_list);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GPR_ASSERT(elem->filter == &lame_filter);
chand = (channel_data *)elem->channel_data;
chand->error_code = error_code;
chand->error_message = error_message;
+ grpc_call_list_run(&call_list);
return channel;
}
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 4a75d03f0a..b5b9ee173e 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -108,8 +108,9 @@ static void on_secure_transport_setup_done(void *arg,
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
c->result->transport = grpc_create_chttp2_transport(
- c->args.channel_args, secure_endpoint, c->mdctx, 1);
- grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
+ c->args.channel_args, secure_endpoint, c->mdctx, 1, call_list);
+ grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0,
+ call_list);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
c->result->filters[0] = &grpc_http_client_filter;
c->result->filters[1] = &grpc_client_auth_filter;
@@ -187,12 +188,13 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
+static void subchannel_factory_unref(grpc_subchannel_factory *scf,
+ grpc_call_list *call_list) {
subchannel_factory *f = (subchannel_factory *)scf;
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"subchannel_factory");
- GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory");
+ GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory", call_list);
grpc_channel_args_destroy(f->merge_args);
grpc_mdctx_unref(f->mdctx);
gpr_free(f);
@@ -279,7 +281,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(target, filters, n, args_copy,
- mdctx, workqueue, 1);
+ mdctx, workqueue, 1, &call_list);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
@@ -299,7 +301,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
resolver, &call_list);
GRPC_RESOLVER_UNREF(resolver, "create", &call_list);
- grpc_subchannel_factory_unref(&f->base);
+ grpc_subchannel_factory_unref(&f->base, &call_list);
GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create");
grpc_channel_args_destroy(args_copy);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e38c6028d9..24545c67e1 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -57,9 +57,11 @@
typedef struct listener {
void *arg;
void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
- size_t pollset_count);
- void (*destroy)(grpc_server *server, void *arg);
+ size_t pollset_count, grpc_call_list *call_list);
+ void (*destroy)(grpc_server *server, void *arg, grpc_closure *closure,
+ grpc_call_list *call_list);
struct listener *next;
+ grpc_closure destroy_done;
} listener;
typedef struct call_data call_data;
@@ -219,19 +221,19 @@ struct grpc_server {
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
-
- grpc_workqueue *workqueue;
};
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
static void begin_call(grpc_server *server, call_data *calld,
- requested_call *rc);
-static void fail_call(grpc_server *server, requested_call *rc);
+ requested_call *rc, grpc_call_list *call_list);
+static void fail_call(grpc_server *server, requested_call *rc,
+ grpc_call_list *call_list);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
-static void maybe_finish_shutdown(grpc_server *server);
+static void maybe_finish_shutdown(grpc_server *server,
+ grpc_call_list *call_list);
/*
* channel broadcaster
@@ -258,14 +260,15 @@ struct shutdown_cleanup_args {
gpr_slice slice;
};
-static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
+static void shutdown_cleanup(void *arg, int iomgr_status_ignored,
+ grpc_call_list *call_list) {
struct shutdown_cleanup_args *a = arg;
gpr_slice_unref(a->slice);
gpr_free(a);
}
static void send_shutdown(grpc_channel *channel, int send_goaway,
- int send_disconnect) {
+ int send_disconnect, grpc_call_list *call_list) {
grpc_transport_op op;
struct shutdown_cleanup_args *sc;
grpc_channel_element *elem;
@@ -281,17 +284,17 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
op.on_consumed = &sc->closure;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(elem, &op);
+ elem->filter->start_transport_op(elem, &op, call_list);
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
- int send_goaway,
- int force_disconnect) {
+ int send_goaway, int force_disconnect,
+ grpc_call_list *call_list) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
- send_shutdown(cb->channels[i], send_goaway, force_disconnect);
- GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
+ send_shutdown(cb->channels[i], send_goaway, force_disconnect, call_list);
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast", call_list);
}
gpr_free(cb->channels);
}
@@ -311,12 +314,12 @@ static void request_matcher_destroy(request_matcher *request_matcher) {
gpr_stack_lockfree_destroy(request_matcher->requests);
}
-static void kill_zombie(void *elem, int success) {
+static void kill_zombie(void *elem, int success, grpc_call_list *call_list) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
static void request_matcher_zombify_all_pending_calls(
- request_matcher *request_matcher, grpc_workqueue *workqueue) {
+ request_matcher *request_matcher, grpc_call_list *call_list) {
while (request_matcher->pending_head) {
call_data *calld = request_matcher->pending_head;
request_matcher->pending_head = calld->pending_next;
@@ -326,15 +329,16 @@ static void request_matcher_zombify_all_pending_calls(
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
}
}
static void request_matcher_kill_requests(grpc_server *server,
- request_matcher *rm) {
+ request_matcher *rm,
+ grpc_call_list *call_list) {
int request_id;
while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
- fail_call(server, &server->requested_calls[request_id]);
+ fail_call(server, &server->requested_calls[request_id], call_list);
}
}
@@ -346,7 +350,7 @@ static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
-static void server_delete(grpc_server *server) {
+static void server_delete(grpc_server *server, grpc_call_list *call_list) {
registered_method *rm;
size_t i;
grpc_channel_args_destroy(server->channel_args);
@@ -365,7 +369,6 @@ static void server_delete(grpc_server *server) {
}
request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
- GRPC_WORKQUEUE_UNREF(server->workqueue, "destroy");
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -373,9 +376,9 @@ static void server_delete(grpc_server *server) {
gpr_free(server);
}
-static void server_unref(grpc_server *server) {
+static void server_unref(grpc_server *server, grpc_call_list *call_list) {
if (gpr_unref(&server->internal_refcount)) {
- server_delete(server);
+ server_delete(server, call_list);
}
}
@@ -389,30 +392,29 @@ static void orphan_channel(channel_data *chand) {
chand->next = chand->prev = chand;
}
-static void finish_destroy_channel(void *cd, int success) {
+static void finish_destroy_channel(void *cd, int success,
+ grpc_call_list *call_list) {
channel_data *chand = cd;
grpc_server *server = chand->server;
gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel);
- GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
- server_unref(server);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server", call_list);
+ server_unref(server, call_list);
}
-static void destroy_channel(channel_data *chand) {
+static void destroy_channel(channel_data *chand, grpc_call_list *call_list) {
if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
server_ref(chand->server);
- maybe_finish_shutdown(chand->server);
+ maybe_finish_shutdown(chand->server, call_list);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- gpr_log(GPR_DEBUG, "queue finish_destroy_channel: %p on %p", chand->channel,
- chand->server->workqueue);
- grpc_workqueue_push(chand->server->workqueue,
- &chand->finish_destroy_channel_closure, 1);
+ grpc_call_list_add(call_list, &chand->finish_destroy_channel_closure, 1);
}
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
- request_matcher *request_matcher) {
+ request_matcher *request_matcher,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
int request_id;
@@ -421,7 +423,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
return;
}
@@ -443,11 +445,11 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(server, calld, &server->requested_calls[request_id]);
+ begin_call(server, calld, &server->requested_calls[request_id], call_list);
}
}
-static void start_new_rpc(grpc_call_element *elem) {
+static void start_new_rpc(grpc_call_element *elem, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_server *server = chand->server;
@@ -466,7 +468,8 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
finish_start_new_rpc(server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ call_list);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -478,11 +481,13 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
finish_start_new_rpc(server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ call_list);
return;
}
}
- finish_start_new_rpc(server, elem, &server->unregistered_request_matcher);
+ finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
+ call_list);
}
static int num_listeners(grpc_server *server) {
@@ -494,8 +499,9 @@ static int num_listeners(grpc_server *server) {
return n;
}
-static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
- server_unref(server);
+static void done_shutdown_event(void *server, grpc_cq_completion *completion,
+ grpc_call_list *call_list) {
+ server_unref(server, call_list);
}
static int num_channels(grpc_server *server) {
@@ -508,25 +514,27 @@ static int num_channels(grpc_server *server) {
return n;
}
-static void kill_pending_work_locked(grpc_server *server) {
+static void kill_pending_work_locked(grpc_server *server,
+ grpc_call_list *call_list) {
registered_method *rm;
- request_matcher_kill_requests(server, &server->unregistered_request_matcher);
+ request_matcher_kill_requests(server, &server->unregistered_request_matcher,
+ call_list);
request_matcher_zombify_all_pending_calls(
- &server->unregistered_request_matcher, server->workqueue);
+ &server->unregistered_request_matcher, call_list);
for (rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_kill_requests(server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(&rm->request_matcher,
- server->workqueue);
+ request_matcher_kill_requests(server, &rm->request_matcher, call_list);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher, call_list);
}
}
-static void maybe_finish_shutdown(grpc_server *server) {
+static void maybe_finish_shutdown(grpc_server *server,
+ grpc_call_list *call_list) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
- kill_pending_work_locked(server);
+ kill_pending_work_locked(server, call_list);
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
@@ -548,7 +556,7 @@ static void maybe_finish_shutdown(grpc_server *server) {
server_ref(server);
grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
done_shutdown_event, server,
- &server->shutdown_tags[i].completion);
+ &server->shutdown_tags[i].completion, call_list);
}
}
@@ -566,10 +574,9 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void server_on_recv(void *ptr, int success) {
+static void server_on_recv(void *ptr, int success, grpc_call_list *call_list) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
gpr_timespec op_deadline;
if (success && !calld->got_initial_metadata) {
@@ -587,7 +594,7 @@ static void server_on_recv(void *ptr, int success) {
}
if (calld->host && calld->path) {
calld->got_initial_metadata = 1;
- start_new_rpc(elem);
+ start_new_rpc(elem, call_list);
}
break;
}
@@ -604,8 +611,7 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_workqueue_push(chand->server->workqueue,
- &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
} else {
gpr_mu_unlock(&calld->mu_state);
}
@@ -616,8 +622,7 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_workqueue_push(chand->server->workqueue,
- &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -629,7 +634,7 @@ static void server_on_recv(void *ptr, int success) {
break;
}
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, call_list);
}
static void server_mutate_op(grpc_call_element *elem,
@@ -646,10 +651,11 @@ static void server_mutate_op(grpc_call_element *elem,
}
static void server_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
- grpc_call_next_op(elem, op);
+ grpc_call_next_op(elem, op, call_list);
}
static void accept_stream(void *cd, grpc_transport *transport,
@@ -660,7 +666,8 @@ static void accept_stream(void *cd, grpc_transport *transport,
0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
-static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
+static void channel_connectivity_changed(void *cd, int iomgr_status_ignored,
+ grpc_call_list *call_list) {
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
@@ -670,18 +677,19 @@ static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
op.connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
- &op);
+ &op, call_list);
} else {
gpr_mu_lock(&server->mu_global);
- destroy_channel(chand);
+ destroy_channel(chand, call_list);
gpr_mu_unlock(&server->mu_global);
- GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity", call_list);
}
}
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -696,7 +704,8 @@ static void init_call_elem(grpc_call_element *elem,
if (initial_op) server_mutate_op(elem, initial_op);
}
-static void destroy_call_elem(grpc_call_element *elem) {
+static void destroy_call_elem(grpc_call_element *elem,
+ grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
@@ -711,13 +720,13 @@ static void destroy_call_elem(grpc_call_element *elem) {
gpr_mu_destroy(&calld->mu_state);
- server_unref(chand->server);
+ server_unref(chand->server, call_list);
}
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
- int is_last) {
+ int is_last, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(is_first);
GPR_ASSERT(!is_last);
@@ -733,7 +742,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channel_connectivity_changed, chand);
}
-static void destroy_channel_elem(grpc_channel_element *elem) {
+static void destroy_channel_elem(grpc_channel_element *elem,
+ grpc_call_list *call_list) {
size_t i;
channel_data *chand = elem->channel_data;
if (chand->registered_methods) {
@@ -752,11 +762,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
- maybe_finish_shutdown(chand->server);
+ maybe_finish_shutdown(chand->server, call_list);
gpr_mu_unlock(&chand->server->mu_global);
GRPC_MDSTR_UNREF(chand->path_key);
GRPC_MDSTR_UNREF(chand->authority_key);
- server_unref(chand->server);
+ server_unref(chand->server, call_list);
}
}
@@ -810,7 +820,6 @@ grpc_server *grpc_server_create_from_filters(
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
- server->workqueue = grpc_workqueue_create();
/* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls = 32768;
@@ -881,23 +890,26 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
void grpc_server_start(grpc_server *server) {
listener *l;
size_t i;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
- grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]);
}
for (l = server->listeners; l; l = l->next) {
- l->start(server, l->arg, server->pollsets, server->cq_count);
+ l->start(server, l->arg, server->pollsets, server->cq_count, &call_list);
}
+
+ grpc_call_list_run(&call_list);
}
void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
grpc_workqueue *workqueue,
- const grpc_channel_args *args) {
+ const grpc_channel_args *args,
+ grpc_call_list *call_list) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters =
gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
@@ -927,11 +939,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
for (i = 0; i < s->cq_count; i++) {
memset(&op, 0, sizeof(op));
op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
- grpc_transport_perform_op(transport, &op);
+ grpc_transport_perform_op(transport, &op, call_list);
}
channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
- mdctx, workqueue, 0);
+ mdctx, workqueue, 0, call_list);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
@@ -987,19 +999,30 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
op.on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state;
op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
- grpc_transport_perform_op(transport, &op);
+ grpc_transport_perform_op(transport, &op, call_list);
}
-void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) {
+void done_published_shutdown(void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list) {
(void) done_arg;
gpr_free(storage);
}
+static void listener_destroy_done(void *s, int success,
+ grpc_call_list *call_list) {
+ grpc_server *server = s;
+ gpr_mu_lock(&server->mu_global);
+ server->listeners_destroyed++;
+ maybe_finish_shutdown(server, call_list);
+ gpr_mu_unlock(&server->mu_global);
+}
+
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
@@ -1008,9 +1031,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_cq_begin_op(cq);
if (server->shutdown_published) {
grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL,
- gpr_malloc(sizeof(grpc_cq_completion)));
+ gpr_malloc(sizeof(grpc_cq_completion)), &call_list);
gpr_mu_unlock(&server->mu_global);
- return;
+ goto done;
}
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
@@ -1020,7 +1043,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
sdt->cq = cq;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_unlock(&server->mu_global);
- return;
+ goto done;
}
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
@@ -1029,41 +1052,40 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- kill_pending_work_locked(server);
+ kill_pending_work_locked(server, &call_list);
gpr_mu_unlock(&server->mu_call);
gpr_atm_rel_store(&server->shutdown_flag, 1);
- maybe_finish_shutdown(server);
+ maybe_finish_shutdown(server, &call_list);
gpr_mu_unlock(&server->mu_global);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
- l->destroy(server, l->arg);
+ grpc_closure_init(&l->destroy_done, listener_destroy_done, server);
+ l->destroy(server, l->arg, &l->destroy_done, &call_list);
}
- channel_broadcaster_shutdown(&broadcaster, 1, 0);
-}
+ channel_broadcaster_shutdown(&broadcaster, 1, 0, &call_list);
-void grpc_server_listener_destroy_done(void *s) {
- grpc_server *server = s;
- gpr_mu_lock(&server->mu_global);
- server->listeners_destroyed++;
- maybe_finish_shutdown(server);
- gpr_mu_unlock(&server->mu_global);
+done:
+ grpc_call_list_run(&call_list);
}
void grpc_server_cancel_all_calls(grpc_server *server) {
channel_broadcaster broadcaster;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&server->mu_global);
channel_broadcaster_init(server, &broadcaster);
gpr_mu_unlock(&server->mu_global);
- channel_broadcaster_shutdown(&broadcaster, 0, 1);
+ channel_broadcaster_shutdown(&broadcaster, 0, 1, &call_list);
+ grpc_call_list_run(&call_list);
}
void grpc_server_destroy(grpc_server *server) {
listener *l;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&server->mu_global);
GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
@@ -1077,16 +1099,17 @@ void grpc_server_destroy(grpc_server *server) {
gpr_mu_unlock(&server->mu_global);
- grpc_workqueue_flush(server->workqueue);
-
- server_unref(server);
+ server_unref(server, &call_list);
+ grpc_call_list_run(&call_list);
}
-void grpc_server_add_listener(grpc_server *server, void *arg,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset **pollsets,
- size_t pollset_count),
- void (*destroy)(grpc_server *server, void *arg)) {
+void grpc_server_add_listener(
+ grpc_server *server, void *arg,
+ void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
+ size_t pollset_count, grpc_call_list *call_list),
+ void (*destroy)(grpc_server *server, void *arg, grpc_closure *on_done,
+ grpc_call_list *call_list),
+ grpc_call_list *call_list) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
l->start = start;
@@ -1096,18 +1119,19 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
}
static grpc_call_error queue_call_request(grpc_server *server,
- requested_call *rc) {
+ requested_call *rc,
+ grpc_call_list *call_list) {
call_data *calld = NULL;
request_matcher *request_matcher = NULL;
int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
- fail_call(server, rc);
+ fail_call(server, rc, call_list);
return GRPC_CALL_OK;
}
request_id = gpr_stack_lockfree_pop(server->request_freelist);
if (request_id == -1) {
/* out of request ids: just fail this one */
- fail_call(server, rc);
+ fail_call(server, rc, call_list);
return GRPC_CALL_OK;
}
switch (rc->type) {
@@ -1135,12 +1159,13 @@ static grpc_call_error queue_call_request(grpc_server *server,
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
+ grpc_call_list_add(call_list, &calld->kill_zombie_closure, 1);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(server, calld, &server->requested_calls[request_id]);
+ begin_call(server, calld, &server->requested_calls[request_id],
+ call_list);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1154,13 +1179,16 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
+ grpc_call_error error;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
requested_call *rc = gpr_malloc(sizeof(*rc));
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
if (!grpc_cq_is_server_cq(cq_for_notification)) {
gpr_free(rc);
- return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ goto done;
}
grpc_cq_begin_op(cq_for_notification);
details->reserved = NULL;
@@ -1172,7 +1200,10 @@ grpc_call_error grpc_server_request_call(
rc->call = call;
rc->data.batch.details = details;
rc->data.batch.initial_metadata = initial_metadata;
- return queue_call_request(server, rc);
+ error = queue_call_request(server, rc, &call_list);
+done:
+ grpc_call_list_run(&call_list);
+ return error;
}
grpc_call_error grpc_server_request_registered_call(
@@ -1180,11 +1211,14 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
+ grpc_call_error error;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm;
if (!grpc_cq_is_server_cq(cq_for_notification)) {
gpr_free(rc);
- return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ goto done;
}
grpc_cq_begin_op(cq_for_notification);
rc->type = REGISTERED_CALL;
@@ -1197,12 +1231,16 @@ grpc_call_error grpc_server_request_registered_call(
rc->data.registered.deadline = deadline;
rc->data.registered.initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
- return queue_call_request(server, rc);
+ error = queue_call_request(server, rc, &call_list);
+done:
+ grpc_call_list_run(&call_list);
+ return error;
}
-static void publish_registered_or_batch(grpc_call *call, int success,
- void *tag);
-static void publish_was_not_set(grpc_call *call, int success, void *tag) {
+static void publish_registered_or_batch(grpc_call *call, int success, void *tag,
+ grpc_call_list *call_list);
+static void publish_was_not_set(grpc_call *call, int success, void *tag,
+ grpc_call_list *call_list) {
abort();
}
@@ -1218,7 +1256,7 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
}
static void begin_call(grpc_server *server, call_data *calld,
- requested_call *rc) {
+ requested_call *rc, grpc_call_list *call_list) {
grpc_ioreq_completion_func publish = publish_was_not_set;
grpc_ioreq req[2];
grpc_ioreq *r = req;
@@ -1229,7 +1267,7 @@ static void begin_call(grpc_server *server, call_data *calld,
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
- grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
+ grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call, call_list);
*rc->call = calld->call;
calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
@@ -1265,10 +1303,11 @@ static void begin_call(grpc_server *server, call_data *calld,
GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req),
- publish, rc);
+ publish, rc, call_list);
}
-static void done_request_event(void *req, grpc_cq_completion *c) {
+static void done_request_event(void *req, grpc_cq_completion *c,
+ grpc_call_list *call_list) {
requested_call *rc = req;
grpc_server *server = rc->server;
@@ -1281,10 +1320,11 @@ static void done_request_event(void *req, grpc_cq_completion *c) {
gpr_free(req);
}
- server_unref(server);
+ server_unref(server, call_list);
}
-static void fail_call(grpc_server *server, requested_call *rc) {
+static void fail_call(grpc_server *server, requested_call *rc,
+ grpc_call_list *call_list) {
*rc->call = NULL;
switch (rc->type) {
case BATCH_CALL:
@@ -1296,11 +1336,11 @@ static void fail_call(grpc_server *server, requested_call *rc) {
}
server_ref(server);
grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
- &rc->completion);
+ &rc->completion, call_list);
}
-static void publish_registered_or_batch(grpc_call *call, int success,
- void *prc) {
+static void publish_registered_or_batch(grpc_call *call, int success, void *prc,
+ grpc_call_list *call_list) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
requested_call *rc = prc;
@@ -1308,7 +1348,7 @@ static void publish_registered_or_batch(grpc_call *call, int success,
channel_data *chand = elem->channel_data;
server_ref(chand->server);
grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
- &rc->completion);
+ &rc->completion, call_list);
GRPC_CALL_INTERNAL_UNREF(call, "server", call_list);
}
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index f87296797c..2f2c5b8948 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -49,9 +49,9 @@ void grpc_server_add_listener(
grpc_server *server, void *listener,
void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
size_t npollsets, grpc_call_list *call_list),
- void (*destroy)(grpc_server *server, void *arg, grpc_call_list *call_list));
-
-void grpc_server_listener_destroy_done(void *server);
+ void (*destroy)(grpc_server *server, void *arg, grpc_closure *on_done,
+ grpc_call_list *call_list),
+ grpc_call_list *call_list);
/* Setup a transport - creates a channel stack, binds the transport to the
server */
@@ -59,7 +59,8 @@ void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
grpc_workqueue *workqueue,
- const grpc_channel_args *args);
+ const grpc_channel_args *args,
+ grpc_call_list *call_list);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 91cf6ece9c..df63d99dea 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -43,15 +43,17 @@
#include <grpc/support/useful.h>
static void setup_transport(void *server, grpc_transport *transport,
- grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
+ grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+ grpc_call_list *call_list) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
- grpc_server_get_channel_args(server));
+ grpc_server_get_channel_args(server), call_list);
}
-static void new_transport(void *server, grpc_endpoint *tcp) {
+static void new_transport(void *server, grpc_endpoint *tcp,
+ grpc_call_list *call_list) {
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code
@@ -60,25 +62,27 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
* case.
*/
grpc_mdctx *mdctx = grpc_mdctx_create();
- grpc_workqueue *workqueue = grpc_workqueue_create();
+ grpc_workqueue *workqueue = grpc_workqueue_create(call_list);
grpc_transport *transport = grpc_create_chttp2_transport(
- grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0);
- setup_transport(server, transport, mdctx, workqueue);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_server_get_channel_args(server), tcp, mdctx, 0, call_list);
+ setup_transport(server, transport, mdctx, workqueue, call_list);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list);
}
/* Server callback: start listening on our ports */
static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets,
- size_t pollset_count) {
+ size_t pollset_count, grpc_call_list *call_list) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server);
+ grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server,
+ call_list);
}
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
-static void destroy(grpc_server *server, void *tcpp) {
+static void destroy(grpc_server *server, void *tcpp, grpc_closure *destroy_done,
+ grpc_call_list *call_list) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server);
+ grpc_tcp_server_destroy(tcp, destroy_done, call_list);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
@@ -88,6 +92,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
unsigned count = 0;
int port_num = -1;
int port_temp;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
resolved = grpc_blocking_resolve_address(addr, "http");
if (!resolved) {
@@ -124,9 +129,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
grpc_resolved_addresses_destroy(resolved);
/* Register with the server only upon success */
- grpc_server_add_listener(server, tcp, start, destroy);
-
- return port_num;
+ grpc_server_add_listener(server, tcp, start, destroy, &call_list);
+ goto done;
/* Error path: cleanup and return */
error:
@@ -136,5 +140,9 @@ error:
if (tcp) {
grpc_tcp_server_destroy(tcp, NULL, NULL);
}
+ port_num = 0;
+
+done:
+ grpc_call_list_run(&call_list);
return 0;
}
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 403358016d..ef12c910cd 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -71,7 +71,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index 23957b05ad..7530f0f644 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -36,6 +36,7 @@
/* Parser for GRPC streams embedded in DATA frames */
+#include "src/core/iomgr/iomgr.h"
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include "src/core/transport/stream_op.h"
@@ -74,7 +75,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
frame */
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
/* create a slice with an empty data frame and is_last set */
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id);
diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c
index 09d4da234c..1c2bce6736 100644
--- a/src/core/transport/chttp2/frame_goaway.c
+++ b/src/core/transport/chttp2/frame_goaway.c
@@ -64,7 +64,8 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
diff --git a/src/core/transport/chttp2/frame_goaway.h b/src/core/transport/chttp2/frame_goaway.h
index 9c5edfc821..ec991f4350 100644
--- a/src/core/transport/chttp2/frame_goaway.h
+++ b/src/core/transport/chttp2/frame_goaway.h
@@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_GOAWAY_H
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_GOAWAY_H
+#include "src/core/iomgr/iomgr.h"
#include "src/core/transport/chttp2/frame.h"
#include <grpc/support/port_platform.h>
#include <grpc/support/slice.h>
@@ -66,7 +67,8 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
grpc_chttp2_goaway_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
void grpc_chttp2_goaway_append(gpr_uint32 last_stream_id, gpr_uint32 error_code,
gpr_slice debug_data,
diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c
index 10d1e0a523..2fb8850a45 100644
--- a/src/core/transport/chttp2/frame_ping.c
+++ b/src/core/transport/chttp2/frame_ping.c
@@ -70,7 +70,8 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
@@ -89,9 +90,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
for (ping = transport_parsing->pings.next;
ping != &transport_parsing->pings; ping = ping->next) {
if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) {
- /* we know no locks are held here, we may as well just call up
- * directly */
- ping->on_recv->cb(ping->on_recv->cb_arg, 1);
+ grpc_call_list_add(call_list, ping->on_recv, 1);
}
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
diff --git a/src/core/transport/chttp2/frame_ping.h b/src/core/transport/chttp2/frame_ping.h
index 99197e8352..70e19eb8ab 100644
--- a/src/core/transport/chttp2/frame_ping.h
+++ b/src/core/transport/chttp2/frame_ping.h
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H
#include <grpc/support/slice.h>
+#include "src/core/iomgr/iomgr.h"
#include "src/core/transport/chttp2/frame.h"
typedef struct {
@@ -49,6 +50,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
grpc_chttp2_ping_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_PING_H */
diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c
index 67da245239..7cf8abe88f 100644
--- a/src/core/transport/chttp2/frame_rst_stream.c
+++ b/src/core/transport/chttp2/frame_rst_stream.c
@@ -72,7 +72,8 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h
index ed69e588af..17d57fae5e 100644
--- a/src/core/transport/chttp2/frame_rst_stream.h
+++ b/src/core/transport/chttp2/frame_rst_stream.h
@@ -36,6 +36,7 @@
#include <grpc/support/slice.h>
#include "src/core/transport/chttp2/frame.h"
+#include "src/core/iomgr/iomgr.h"
typedef struct {
gpr_uint8 byte;
@@ -48,6 +49,7 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */
diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c
index 54d3694a5c..78bd4bb09d 100644
--- a/src/core/transport/chttp2/frame_settings.c
+++ b/src/core/transport/chttp2/frame_settings.c
@@ -139,7 +139,8 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
void *p, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
grpc_chttp2_settings_parser *parser = p;
const gpr_uint8 *cur = GPR_SLICE_START_PTR(slice);
const gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
diff --git a/src/core/transport/chttp2/frame_settings.h b/src/core/transport/chttp2/frame_settings.h
index 0ac68a9fa8..a04a28b7da 100644
--- a/src/core/transport/chttp2/frame_settings.h
+++ b/src/core/transport/chttp2/frame_settings.h
@@ -37,6 +37,7 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/slice.h>
#include "src/core/transport/chttp2/frame.h"
+#include "src/core/iomgr/iomgr.h"
typedef enum {
GRPC_CHTTP2_SPS_ID0,
@@ -95,6 +96,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
gpr_uint32 *settings);
grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */
diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c
index ea13969e8c..51eb261346 100644
--- a/src/core/transport/chttp2/frame_window_update.c
+++ b/src/core/transport/chttp2/frame_window_update.c
@@ -75,7 +75,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
diff --git a/src/core/transport/chttp2/frame_window_update.h b/src/core/transport/chttp2/frame_window_update.h
index deba801d00..7f1168e551 100644
--- a/src/core/transport/chttp2/frame_window_update.h
+++ b/src/core/transport/chttp2/frame_window_update.h
@@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H
+#include "src/core/iomgr/iomgr.h"
#include <grpc/support/slice.h>
#include "src/core/transport/chttp2/frame.h"
@@ -51,6 +52,7 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c
index 9c40e8a4e6..e3b8e54e8d 100644
--- a/src/core/transport/chttp2/hpack_parser.c
+++ b/src/core/transport/chttp2/hpack_parser.c
@@ -1379,7 +1379,8 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
grpc_chttp2_hpack_parser *parser = hpack_parser;
if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice),
GPR_SLICE_END_PTR(slice))) {
diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h
index 4f489d67fb..c9ae6a9767 100644
--- a/src/core/transport/chttp2/hpack_parser.h
+++ b/src/core/transport/chttp2/hpack_parser.h
@@ -37,6 +37,7 @@
#include <stddef.h>
#include <grpc/support/port_platform.h>
+#include "src/core/iomgr/iomgr.h"
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/hpack_table.h"
#include "src/core/transport/metadata.h"
@@ -108,6 +109,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
the transport */
grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
void *hpack_parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_PARSER_H */
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 386f2dd315..b9dbbc25ee 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -163,8 +163,6 @@ typedef struct grpc_chttp2_outstanding_ping {
typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
- /** queued callbacks */
- grpc_call_list run_at_unlock;
/** window available for us to send to peer */
gpr_int64 outgoing_window;
@@ -269,7 +267,8 @@ struct grpc_chttp2_transport_parsing {
grpc_chttp2_stream_parsing *incoming_stream;
grpc_chttp2_parse_error (*parser)(
void *parser_user_data, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
/* received settings */
gpr_uint32 settings[GRPC_CHTTP2_NUM_SETTINGS];
@@ -469,19 +468,23 @@ struct grpc_chttp2_stream {
int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
void grpc_chttp2_perform_writes(
- grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
-void grpc_chttp2_terminate_writing(void *transport_writing, int success);
+ grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint,
+ grpc_call_list *call_list);
+void grpc_chttp2_terminate_writing(void *transport_writing, int success,
+ grpc_call_list *call_list);
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
- grpc_chttp2_transport_writing *writing);
+ grpc_chttp2_transport_writing *writing,
+ grpc_call_list *call_list);
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
/** Process one slice of incoming data; return 1 if the connection is still
viable after reading, or 0 if the connection should be torn down */
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
- gpr_slice slice);
+ gpr_slice slice, grpc_call_list *call_list);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global,
- grpc_chttp2_transport_parsing *parsing);
+ grpc_chttp2_transport_parsing *parsing,
+ grpc_call_list *call_list);
/** Get a writable stream
returns non-zero if there was a stream available */
@@ -574,7 +577,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
void grpc_chttp2_add_incoming_goaway(
grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
- gpr_slice goaway_text);
+ gpr_slice goaway_text, grpc_call_list *call_list);
void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index f26f446787..2d95963f1f 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -59,7 +59,8 @@ static int init_skip_frame_parser(
grpc_chttp2_transport_parsing *transport_parsing, int is_header);
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
- gpr_slice slice, int is_last);
+ gpr_slice slice, int is_last,
+ grpc_call_list *call_list);
void grpc_chttp2_prepare_to_read(
grpc_chttp2_transport_global *transport_global,
@@ -90,9 +91,9 @@ void grpc_chttp2_prepare_to_read(
}
}
-void grpc_chttp2_publish_reads(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_transport_parsing *transport_parsing) {
+void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_call_list *call_list) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_parsing *stream_parsing;
@@ -132,7 +133,7 @@ void grpc_chttp2_publish_reads(
if (transport_parsing->goaway_received) {
grpc_chttp2_add_incoming_goaway(transport_global,
(gpr_uint32)transport_parsing->goaway_error,
- transport_parsing->goaway_text);
+ transport_parsing->goaway_text, call_list);
transport_parsing->goaway_text = gpr_empty_slice();
transport_parsing->goaway_received = 0;
}
@@ -235,7 +236,7 @@ void grpc_chttp2_publish_reads(
}
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
- gpr_slice slice) {
+ gpr_slice slice, grpc_call_list *call_list) {
gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
@@ -364,7 +365,8 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
transport_parsing->incoming_stream_id;
}
if (transport_parsing->incoming_frame_size == 0) {
- if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1)) {
+ if (!parse_frame_slice(transport_parsing, gpr_empty_slice(), 1,
+ call_list)) {
return 0;
}
transport_parsing->incoming_stream = NULL;
@@ -384,7 +386,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
if (!parse_frame_slice(transport_parsing,
gpr_slice_sub_no_ref(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
- 1)) {
+ 1, call_list)) {
return 0;
}
transport_parsing->deframe_state = GRPC_DTS_FH_0;
@@ -398,7 +400,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice_sub_no_ref(
slice, cur_offset,
cur_offset + transport_parsing->incoming_frame_size),
- 1)) {
+ 1, call_list)) {
return 0;
}
cur += transport_parsing->incoming_frame_size;
@@ -408,7 +410,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
if (!parse_frame_slice(transport_parsing,
gpr_slice_sub_no_ref(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
- 0)) {
+ 0, call_list)) {
return 0;
}
transport_parsing->incoming_frame_size -= (gpr_uint32)(end - cur);
@@ -470,7 +472,8 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
static grpc_chttp2_parse_error skip_parser(
void *parser, grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
return GRPC_CHTTP2_PARSE_OK;
}
@@ -789,12 +792,13 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
*/
static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
- gpr_slice slice, int is_last) {
+ gpr_slice slice, int is_last,
+ grpc_call_list *call_list) {
grpc_chttp2_stream_parsing *stream_parsing =
transport_parsing->incoming_stream;
switch (transport_parsing->parser(transport_parsing->parser_data,
transport_parsing, stream_parsing, slice,
- is_last)) {
+ is_last, call_list)) {
case GRPC_CHTTP2_PARSE_OK:
if (stream_parsing) {
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index db6715b43a..18f4bfbc77 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -163,7 +163,8 @@ int grpc_chttp2_unlocking_check_writes(
}
void grpc_chttp2_perform_writes(
- grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint) {
+ grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint,
+ grpc_call_list *call_list) {
GPR_ASSERT(transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing));
@@ -172,17 +173,8 @@ void grpc_chttp2_perform_writes(
GPR_ASSERT(transport_writing->outbuf.count > 0);
GPR_ASSERT(endpoint);
- switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf,
- &transport_writing->done_cb)) {
- case GRPC_ENDPOINT_DONE:
- grpc_chttp2_terminate_writing(transport_writing, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- grpc_chttp2_terminate_writing(transport_writing, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_write(endpoint, &transport_writing->outbuf,
+ &transport_writing->done_cb, call_list);
}
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
@@ -220,7 +212,8 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
void grpc_chttp2_cleanup_writing(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_transport_writing *transport_writing) {
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_call_list *call_list) {
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
@@ -238,8 +231,7 @@ void grpc_chttp2_cleanup_writing(
stream_global->outgoing_sopb->nops == 0) {
GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
stream_global->outgoing_sopb = NULL;
- grpc_call_list_add(&transport_global->run_at_unlock,
- stream_global->send_done_closure, 1);
+ grpc_call_list_add(call_list, stream_global->send_done_closure, 1);
}
}
stream_global->writing_now = 0;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index bc056ac0b8..acd7cbdc1d 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -78,27 +78,31 @@ int grpc_flowctl_trace = 0;
static const grpc_transport_vtable vtable;
static void lock(grpc_chttp2_transport *t);
-static void unlock(grpc_chttp2_transport *t);
+static void unlock(grpc_chttp2_transport *t, grpc_call_list *call_list);
-static void unlock_check_read_write_state(grpc_chttp2_transport *t);
+static void unlock_check_read_write_state(grpc_chttp2_transport *t,
+ grpc_call_list *call_list);
/* forward declarations of various callbacks that we'll build closures around */
-static void writing_action(void *t, int iomgr_success_ignored);
+static void writing_action(void *t, int iomgr_success_ignored,
+ grpc_call_list *call_list);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
/** Endpoint callback to process incoming data */
-static void recv_data(void *tp, int success);
+static void recv_data(void *tp, int success, grpc_call_list *call_list);
/** Start disconnection chain */
-static void drop_connection(grpc_chttp2_transport *t);
+static void drop_connection(grpc_chttp2_transport *t,
+ grpc_call_list *call_list);
/** Perform a transport_op */
static void perform_stream_op_locked(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
+ grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op,
+ grpc_call_list *call_list);
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
@@ -112,23 +116,27 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global,
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_chttp2_transport *t,
- grpc_pollset *pollset);
+ grpc_pollset *pollset,
+ grpc_call_list *call_list);
static void add_to_pollset_set_locked(grpc_chttp2_transport *t,
- grpc_pollset_set *pollset_set);
+ grpc_pollset_set *pollset_set,
+ grpc_call_list *call_list);
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
- grpc_chttp2_transport_global *transport_global);
+ grpc_chttp2_transport_global *transport_global, grpc_call_list *call_list);
static void connectivity_state_set(
grpc_chttp2_transport_global *transport_global,
- grpc_connectivity_state state, const char *reason);
+ grpc_connectivity_state state, const char *reason,
+ grpc_call_list *call_list);
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
-static void destruct_transport(grpc_chttp2_transport *t) {
+static void destruct_transport(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
size_t i;
gpr_mu_lock(&t->mu);
@@ -157,7 +165,8 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
- grpc_connectivity_state_destroy(&t->channel_callback.state_tracker);
+ grpc_connectivity_state_destroy(&t->channel_callback.state_tracker,
+ call_list);
gpr_mu_unlock(&t->mu);
gpr_mu_destroy(&t->mu);
@@ -166,7 +175,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
and maybe they hold resources that need to be freed */
while (t->global.pings.next != &t->global.pings) {
grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
- ping->on_recv->cb(ping->on_recv->cb_arg, 0);
+ grpc_call_list_add(call_list, ping->on_recv, 0);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -180,13 +189,13 @@ static void destruct_transport(grpc_chttp2_transport *t) {
#ifdef REFCOUNTING_DEBUG
#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
-#define UNREF_TRANSPORT(t, r) unref_transport(t, r, __FILE__, __LINE__)
-static void unref_transport(grpc_chttp2_transport *t, const char *reason,
- const char *file, int line) {
+#define UNREF_TRANSPORT(t, r, cl) unref_transport(t, cl, r, __FILE__, __LINE__)
+static void unref_transport(grpc_chttp2_transport *t, grpc_call_list *call_list,
+ const char *reason, const char *file, int line) {
gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count,
t->refs.count - 1, reason, file, line);
if (!gpr_unref(&t->refs)) return;
- destruct_transport(t);
+ destruct_transport(t, call_list);
}
static void ref_transport(grpc_chttp2_transport *t, const char *reason,
@@ -197,10 +206,11 @@ static void ref_transport(grpc_chttp2_transport *t, const char *reason,
}
#else
#define REF_TRANSPORT(t, r) ref_transport(t)
-#define UNREF_TRANSPORT(t, r) unref_transport(t)
-static void unref_transport(grpc_chttp2_transport *t) {
+#define UNREF_TRANSPORT(t, r, cl) unref_transport(t, cl)
+static void unref_transport(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
if (!gpr_unref(&t->refs)) return;
- destruct_transport(t);
+ destruct_transport(t, call_list);
}
static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
@@ -209,7 +219,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport(grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
grpc_endpoint *ep, grpc_mdctx *mdctx,
- grpc_workqueue *workqueue, gpr_uint8 is_client) {
+ gpr_uint8 is_client, grpc_call_list *call_list) {
size_t i;
int j;
@@ -329,15 +339,15 @@ static void init_transport(grpc_chttp2_transport *t,
}
}
-static void destroy_transport(grpc_transport *gt) {
+static void destroy_transport(grpc_transport *gt, grpc_call_list *call_list) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
t->destroying = 1;
- drop_connection(t);
- unlock(t);
+ drop_connection(t, call_list);
+ unlock(t, call_list);
- UNREF_TRANSPORT(t, "destroy");
+ UNREF_TRANSPORT(t, "destroy", call_list);
}
/** block grpc_endpoint_shutdown being called until a paired
@@ -347,45 +357,50 @@ static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
gpr_ref(&t->shutdown_ep_refs);
}
-static void allow_endpoint_shutdown_locked(grpc_chttp2_transport *t) {
+static void allow_endpoint_shutdown_locked(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
if (gpr_unref(&t->shutdown_ep_refs)) {
if (t->ep) {
- grpc_endpoint_shutdown(t->ep);
+ grpc_endpoint_shutdown(t->ep, call_list);
}
}
}
-static void allow_endpoint_shutdown_unlocked(grpc_chttp2_transport *t) {
+static void allow_endpoint_shutdown_unlocked(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
if (gpr_unref(&t->shutdown_ep_refs)) {
gpr_mu_lock(&t->mu);
if (t->ep) {
- grpc_endpoint_shutdown(t->ep);
+ grpc_endpoint_shutdown(t->ep, call_list);
}
gpr_mu_unlock(&t->mu);
}
}
-static void destroy_endpoint(grpc_chttp2_transport *t) {
- grpc_endpoint_destroy(t->ep);
+static void destroy_endpoint(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
+ grpc_endpoint_destroy(t->ep, call_list);
t->ep = NULL;
- UNREF_TRANSPORT(
- t, "disconnect"); /* safe because we'll still have the ref for write */
+ /* safe because we'll still have the ref for write */
+ UNREF_TRANSPORT(t, "disconnect", call_list);
}
-static void close_transport_locked(grpc_chttp2_transport *t) {
+static void close_transport_locked(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
if (!t->closed) {
t->closed = 1;
connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE,
- "close_transport");
+ "close_transport", call_list);
if (t->ep) {
- allow_endpoint_shutdown_locked(t);
+ allow_endpoint_shutdown_locked(t, call_list);
}
}
}
static int init_stream(grpc_transport *gt, grpc_stream *gs,
const void *server_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@@ -416,13 +431,15 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.in_stream_map = 1;
}
- if (initial_op) perform_stream_op_locked(&t->global, &s->global, initial_op);
- unlock(t);
+ if (initial_op)
+ perform_stream_op_locked(&t->global, &s->global, initial_op, call_list);
+ unlock(t, call_list);
return 0;
}
-static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
+static void destroy_stream(grpc_transport *gt, grpc_stream *gs,
+ grpc_call_list *call_list) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
int i;
@@ -433,7 +450,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
- close_transport_locked(t);
+ close_transport_locked(t, call_list);
}
if (!t->parsing_active && s->global.id) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
@@ -463,7 +480,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
grpc_chttp2_incoming_metadata_live_op_buffer_end(
&s->global.outstanding_metadata);
- UNREF_TRANSPORT(t, "stream");
+ UNREF_TRANSPORT(t, "stream", call_list);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -498,21 +515,17 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
-static void unlock(grpc_chttp2_transport *t) {
- grpc_call_list run = GRPC_CALL_LIST_INIT;
-
- unlock_check_read_write_state(t);
+static void unlock(grpc_chttp2_transport *t, grpc_call_list *call_list) {
+ unlock_check_read_write_state(t, call_list);
if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
- grpc_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1);
+ grpc_call_list_add(call_list, &t->writing_action, 1);
prevent_endpoint_shutdown(t);
}
- GPR_SWAP(grpc_call_list, run, t->global.run_at_unlock);
gpr_mu_unlock(&t->mu);
- grpc_call_list_run(run);
}
/*
@@ -534,52 +547,54 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
-void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
+void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success,
+ grpc_call_list *call_list) {
grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t);
- allow_endpoint_shutdown_locked(t);
+ allow_endpoint_shutdown_locked(t, call_list);
if (!success) {
- drop_connection(t);
+ drop_connection(t, call_list);
}
/* cleanup writing related jazz */
- grpc_chttp2_cleanup_writing(&t->global, &t->writing);
+ grpc_chttp2_cleanup_writing(&t->global, &t->writing, call_list);
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
t->writing_active = 0;
if (t->ep && !t->endpoint_reading) {
- destroy_endpoint(t);
+ destroy_endpoint(t, call_list);
}
- unlock(t);
+ unlock(t, call_list);
- UNREF_TRANSPORT(t, "writing");
+ UNREF_TRANSPORT(t, "writing", call_list);
}
-static void writing_action(void *gt, int iomgr_success_ignored) {
+static void writing_action(void *gt, int iomgr_success_ignored,
+ grpc_call_list *call_list) {
grpc_chttp2_transport *t = gt;
- grpc_chttp2_perform_writes(&t->writing, t->ep);
+ grpc_chttp2_perform_writes(&t->writing, t->ep, call_list);
}
void grpc_chttp2_add_incoming_goaway(
grpc_chttp2_transport_global *transport_global, gpr_uint32 goaway_error,
- gpr_slice goaway_text) {
+ gpr_slice goaway_text, grpc_call_list *call_list) {
char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE,
- "got_goaway");
+ "got_goaway", call_list);
}
static void maybe_start_some_streams(
- grpc_chttp2_transport_global *transport_global) {
+ grpc_chttp2_transport_global *transport_global, grpc_call_list *call_list) {
grpc_chttp2_stream_global *stream_global;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
@@ -601,7 +616,7 @@ static void maybe_start_some_streams(
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE,
- "no_more_stream_ids");
+ "no_more_stream_ids", call_list);
}
stream_global->outgoing_window =
@@ -631,7 +646,8 @@ static void maybe_start_some_streams(
static void perform_stream_op_locked(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
+ grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(transport_global, stream_global, op->cancel_with_status);
}
@@ -658,14 +674,13 @@ static void perform_stream_op_locked(
transport_global->is_client ? "CLI" : "SVR", stream_global));
grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
stream_global);
- maybe_start_some_streams(transport_global);
+ maybe_start_some_streams(transport_global, call_list);
} else if (stream_global->outgoing_window > 0) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
} else {
grpc_sopb_reset(op->send_ops);
- grpc_call_list_add(&transport_global->run_at_unlock,
- stream_global->send_done_closure, 0);
+ grpc_call_list_add(call_list, stream_global->send_done_closure, 0);
}
}
@@ -700,20 +715,21 @@ static void perform_stream_op_locked(
if (op->bind_pollset) {
add_to_pollset_locked(TRANSPORT_FROM_GLOBAL(transport_global),
- op->bind_pollset);
+ op->bind_pollset, call_list);
}
- grpc_call_list_add(&transport_global->run_at_unlock, op->on_consumed, 1);
+ grpc_call_list_add(call_list, op->on_consumed, 1);
}
static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
lock(t);
- perform_stream_op_locked(&t->global, &s->global, op);
- unlock(t);
+ perform_stream_op_locked(&t->global, &s->global, op, call_list);
+ unlock(t, call_list);
}
static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
@@ -733,18 +749,19 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
}
-static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
+static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op,
+ grpc_call_list *call_list) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
int close_transport = 0;
lock(t);
- grpc_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1);
+ grpc_call_list_add(call_list, op->on_consumed, 1);
if (op->on_connectivity_state_change) {
grpc_connectivity_state_notify_on_state_change(
&t->channel_callback.state_tracker, op->connectivity_state,
- op->on_connectivity_state_change, &t->global.run_at_unlock);
+ op->on_connectivity_state_change, call_list);
}
if (op->send_goaway) {
@@ -763,11 +780,11 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
}
if (op->bind_pollset) {
- add_to_pollset_locked(t, op->bind_pollset);
+ add_to_pollset_locked(t, op->bind_pollset, call_list);
}
if (op->bind_pollset_set) {
- add_to_pollset_set_locked(t, op->bind_pollset_set);
+ add_to_pollset_set_locked(t, op->bind_pollset_set, call_list);
}
if (op->send_ping) {
@@ -775,15 +792,15 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
}
if (op->disconnect) {
- close_transport_locked(t);
+ close_transport_locked(t, call_list);
}
- unlock(t);
+ unlock(t, call_list);
if (close_transport) {
lock(t);
- close_transport_locked(t);
- unlock(t);
+ close_transport_locked(t, call_list);
+ unlock(t, call_list);
}
}
@@ -799,7 +816,8 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN;
}
-static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
+static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id,
+ grpc_call_list *call_list) {
size_t new_stream_count;
grpc_chttp2_stream *s =
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
@@ -814,7 +832,7 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
grpc_chttp2_parsing_become_skip_parser(&t->parsing);
}
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
- close_transport_locked(t);
+ close_transport_locked(t, call_list);
}
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
@@ -822,11 +840,12 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
GPR_ASSERT(new_stream_count <= GPR_UINT32_MAX);
if (new_stream_count != t->global.concurrent_stream_count) {
t->global.concurrent_stream_count = (gpr_uint32)new_stream_count;
- maybe_start_some_streams(&t->global);
+ maybe_start_some_streams(&t->global, call_list);
}
}
-static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
+static void unlock_check_read_write_state(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global;
grpc_stream_state state;
@@ -840,7 +859,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
GPR_ASSERT(stream_global->in_stream_map);
GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN);
GPR_ASSERT(stream_global->read_closed);
- remove_stream(t, stream_global->id);
+ remove_stream(t, stream_global->id, call_list);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
@@ -866,8 +885,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (stream_global->outgoing_sopb != NULL) {
grpc_sopb_reset(stream_global->outgoing_sopb);
stream_global->outgoing_sopb = NULL;
- grpc_call_list_add(&transport_global->run_at_unlock,
- stream_global->send_done_closure, 1);
+ grpc_call_list_add(call_list, stream_global->send_done_closure, 1);
}
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
@@ -889,7 +907,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
stream_global);
} else {
- remove_stream(t, stream_global->id);
+ remove_stream(t, stream_global->id, call_list);
}
}
if (!stream_global->publish_sopb) {
@@ -917,8 +935,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
&stream_global->outstanding_metadata);
grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
stream_global->published_state = *stream_global->publish_state = state;
- grpc_call_list_add(&transport_global->run_at_unlock,
- stream_global->recv_done_closure, 1);
+ grpc_call_list_add(call_list, stream_global->recv_done_closure, 1);
stream_global->recv_done_closure = NULL;
stream_global->publish_sopb = NULL;
stream_global->publish_state = NULL;
@@ -1053,8 +1070,9 @@ static void end_all_the_calls(grpc_chttp2_transport *t) {
grpc_chttp2_for_all_streams(&t->global, NULL, cancel_stream_cb);
}
-static void drop_connection(grpc_chttp2_transport *t) {
- close_transport_locked(t);
+static void drop_connection(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
+ close_transport_locked(t, call_list);
end_all_the_calls(t);
}
@@ -1079,17 +1097,19 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) {
}
}
-static void read_error_locked(grpc_chttp2_transport *t) {
+static void read_error_locked(grpc_chttp2_transport *t,
+ grpc_call_list *call_list) {
t->endpoint_reading = 0;
if (!t->writing_active && t->ep) {
- destroy_endpoint(t);
+ destroy_endpoint(t, call_list);
}
}
/* tcp read callback */
-static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
+static void recv_data(void *tp, int success, grpc_call_list *call_list) {
size_t i;
int keep_reading = 0;
+ grpc_chttp2_transport *t = tp;
lock(t);
i = 0;
@@ -1102,12 +1122,13 @@ static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
gpr_mu_unlock(&t->mu);
for (; i < t->read_buffer.count &&
- grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
+ grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i],
+ call_list);
i++)
;
gpr_mu_lock(&t->mu);
if (i != t->read_buffer.count) {
- drop_connection(t);
+ drop_connection(t, call_list);
}
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
@@ -1120,52 +1141,27 @@ static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
t->parsing.initial_window_update = 0;
}
/* handle higher level things */
- grpc_chttp2_publish_reads(&t->global, &t->parsing);
+ grpc_chttp2_publish_reads(&t->global, &t->parsing, call_list);
t->parsing_active = 0;
}
- if (!*success || i != t->read_buffer.count) {
- drop_connection(t);
- read_error_locked(t);
+ if (!success || i != t->read_buffer.count) {
+ drop_connection(t, call_list);
+ read_error_locked(t, call_list);
} else if (!t->closed) {
keep_reading = 1;
REF_TRANSPORT(t, "keep_reading");
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
- unlock(t);
+ unlock(t, call_list);
if (keep_reading) {
- int ret = -1;
- switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
- case GRPC_ENDPOINT_DONE:
- *success = 1;
- ret = 1;
- break;
- case GRPC_ENDPOINT_ERROR:
- *success = 0;
- ret = 1;
- break;
- case GRPC_ENDPOINT_PENDING:
- ret = 0;
- break;
- }
- allow_endpoint_shutdown_unlocked(t);
- UNREF_TRANSPORT(t, "keep_reading");
- return ret;
+ grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data, call_list);
+ allow_endpoint_shutdown_unlocked(t, call_list);
+ UNREF_TRANSPORT(t, "keep_reading", call_list);
} else {
- UNREF_TRANSPORT(t, "recv_data");
- return 0;
+ UNREF_TRANSPORT(t, "recv_data", call_list);
}
-
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
-}
-
-static void recv_data(void *tp, int success) {
- grpc_chttp2_transport *t = tp;
-
- while (recv_data_loop(t, &success))
- ;
}
/*
@@ -1174,12 +1170,13 @@ static void recv_data(void *tp, int success) {
static void connectivity_state_set(
grpc_chttp2_transport_global *transport_global,
- grpc_connectivity_state state, const char *reason) {
+ grpc_connectivity_state state, const char *reason,
+ grpc_call_list *call_list) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
grpc_connectivity_state_set(
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
- state, reason, &transport_global->run_at_unlock);
+ state, reason, call_list);
}
/*
@@ -1187,16 +1184,18 @@ static void connectivity_state_set(
*/
static void add_to_pollset_locked(grpc_chttp2_transport *t,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset,
+ grpc_call_list *call_list) {
if (t->ep) {
- grpc_endpoint_add_to_pollset(t->ep, pollset);
+ grpc_endpoint_add_to_pollset(t->ep, pollset, call_list);
}
}
static void add_to_pollset_set_locked(grpc_chttp2_transport *t,
- grpc_pollset_set *pollset_set) {
+ grpc_pollset_set *pollset_set,
+ grpc_call_list *call_list) {
if (t->ep) {
- grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
+ grpc_endpoint_add_to_pollset_set(t->ep, pollset_set, call_list);
}
}
@@ -1235,7 +1234,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
* INTEGRATION GLUE
*/
-static char *chttp2_get_peer(grpc_transport *t) {
+static char *chttp2_get_peer(grpc_transport *t, grpc_call_list *call_list) {
return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
}
@@ -1249,16 +1248,17 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
grpc_transport *grpc_create_chttp2_transport(
const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx,
- grpc_workqueue *workqueue, int is_client) {
+ int is_client, grpc_call_list *call_list) {
grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
- init_transport(t, channel_args, ep, mdctx, workqueue, is_client != 0);
+ init_transport(t, channel_args, ep, mdctx, is_client != 0, call_list);
return &t->base;
}
void grpc_chttp2_transport_start_reading(grpc_transport *transport,
- gpr_slice *slices, size_t nslices) {
+ gpr_slice *slices, size_t nslices,
+ grpc_call_list *call_list) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
- recv_data(t, 1);
+ recv_data(t, 1, call_list);
}
diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h
index fa0d6e4151..e963e16707 100644
--- a/src/core/transport/chttp2_transport.h
+++ b/src/core/transport/chttp2_transport.h
@@ -42,9 +42,10 @@ extern int grpc_flowctl_trace;
grpc_transport *grpc_create_chttp2_transport(
const grpc_channel_args *channel_args, grpc_endpoint *ep,
- grpc_mdctx *metadata_context, int is_client);
+ grpc_mdctx *metadata_context, int is_client, grpc_call_list *call_list);
void grpc_chttp2_transport_start_reading(grpc_transport *transport,
- gpr_slice *slices, size_t nslices);
+ gpr_slice *slices, size_t nslices,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 8b2e1b9b02..dc8392159b 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -66,7 +66,8 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
tracker->name = gpr_strdup(name);
}
-void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker,
+ grpc_call_list *call_list) {
int success;
grpc_connectivity_state_watcher *w;
while ((w = tracker->watchers)) {
@@ -78,7 +79,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
} else {
success = 0;
}
- w->notify->cb(w->notify->cb_arg, success);
+ grpc_call_list_add(call_list, w->notify, success);
gpr_free(w);
}
gpr_free(tracker->name);
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index a29c655b4e..3f92b22d5d 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -61,7 +61,8 @@ extern int grpc_connectivity_state_trace;
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state init_state,
const char *name);
-void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker,
+ grpc_call_list *call_list);
/** Set connectivity state; not thread safe; access must be serialized with an
* external lock */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 3f6b93c3e8..f7c87c1e90 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -40,48 +40,47 @@ size_t grpc_transport_stream_size(grpc_transport *transport) {
return transport->vtable->sizeof_stream;
}
-void grpc_transport_destroy(grpc_transport *transport) {
- transport->vtable->destroy(transport);
+void grpc_transport_destroy(grpc_transport *transport,
+ grpc_call_list *call_list) {
+ transport->vtable->destroy(transport, call_list);
}
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
const void *server_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
return transport->vtable->init_stream(transport, stream, server_data,
- initial_op);
+ initial_op, call_list);
}
void grpc_transport_perform_stream_op(grpc_transport *transport,
grpc_stream *stream,
- grpc_transport_stream_op *op) {
- transport->vtable->perform_stream_op(transport, stream, op);
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
+ transport->vtable->perform_stream_op(transport, stream, op, call_list);
}
-void grpc_transport_perform_op(grpc_transport *transport,
- grpc_transport_op *op) {
- transport->vtable->perform_op(transport, op);
+void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op,
+ grpc_call_list *call_list) {
+ transport->vtable->perform_op(transport, op, call_list);
}
void grpc_transport_destroy_stream(grpc_transport *transport,
- grpc_stream *stream) {
- transport->vtable->destroy_stream(transport, stream);
+ grpc_stream *stream,
+ grpc_call_list *call_list) {
+ transport->vtable->destroy_stream(transport, stream, call_list);
}
-char *grpc_transport_get_peer(grpc_transport *transport) {
- return transport->vtable->get_peer(transport);
+char *grpc_transport_get_peer(grpc_transport *transport,
+ grpc_call_list *call_list) {
+ return transport->vtable->get_peer(transport, call_list);
}
-void grpc_transport_stream_op_finish_with_failure(
- grpc_transport_stream_op *op) {
- if (op->send_ops) {
- op->on_done_send->cb(op->on_done_send->cb_arg, 0);
- }
- if (op->recv_ops) {
- op->on_done_recv->cb(op->on_done_recv->cb_arg, 0);
- }
- if (op->on_consumed) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
- }
+void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
+ grpc_call_list_add(call_list, op->on_done_recv, 0);
+ grpc_call_list_add(call_list, op->on_done_send, 0);
+ grpc_call_list_add(call_list, op->on_consumed, 0);
}
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
@@ -105,11 +104,12 @@ typedef struct {
grpc_closure closure;
} close_message_data;
-static void free_message(void *p, int iomgr_success) {
+static void free_message(void *p, int iomgr_success,
+ grpc_call_list *call_list) {
close_message_data *cmd = p;
gpr_slice_unref(cmd->message);
if (cmd->then_call != NULL) {
- cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success);
+ cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success, call_list);
}
gpr_free(cmd);
}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 271891d430..d5383ad11a 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -136,7 +136,8 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
supplied from the accept_stream callback function */
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
const void *server_data,
- grpc_transport_stream_op *initial_op);
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list);
/* Destroy transport data for a stream.
@@ -149,9 +150,11 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
stream - the grpc_stream to destroy (memory is still owned by the
caller, but any child memory must be cleaned up) */
void grpc_transport_destroy_stream(grpc_transport *transport,
- grpc_stream *stream);
+ grpc_stream *stream,
+ grpc_call_list *call_list);
-void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op);
+void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op,
+ grpc_call_list *call_list);
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
grpc_status_code status);
@@ -173,10 +176,11 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);
op - a grpc_transport_stream_op specifying the op to perform */
void grpc_transport_perform_stream_op(grpc_transport *transport,
grpc_stream *stream,
- grpc_transport_stream_op *op);
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list);
-void grpc_transport_perform_op(grpc_transport *transport,
- grpc_transport_op *op);
+void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op,
+ grpc_call_list *call_list);
/* Send a ping on a transport
@@ -191,9 +195,11 @@ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
-void grpc_transport_destroy(grpc_transport *transport);
+void grpc_transport_destroy(grpc_transport *transport,
+ grpc_call_list *call_list);
/* Get the transports peer */
-char *grpc_transport_get_peer(grpc_transport *transport);
+char *grpc_transport_get_peer(grpc_transport *transport,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index d3bbdf6c27..9adb16b941 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -44,23 +44,27 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_transport *self, grpc_stream *stream,
const void *server_data,
- grpc_transport_stream_op *initial_op);
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list);
/* implementation of grpc_transport_perform_stream_op */
void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream,
- grpc_transport_stream_op *op);
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list);
/* implementation of grpc_transport_perform_op */
- void (*perform_op)(grpc_transport *self, grpc_transport_op *op);
+ void (*perform_op)(grpc_transport *self, grpc_transport_op *op,
+ grpc_call_list *call_list);
/* implementation of grpc_transport_destroy_stream */
- void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
+ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream,
+ grpc_call_list *call_list);
/* implementation of grpc_transport_destroy */
- void (*destroy)(grpc_transport *self);
+ void (*destroy)(grpc_transport *self, grpc_call_list *call_list);
/* implementation of grpc_transport_get_peer */
- char *(*get_peer)(grpc_transport *self);
+ char *(*get_peer)(grpc_transport *self, grpc_call_list *call_list);
} grpc_transport_vtable;
/* an instance of a grpc transport */