aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/client_channel.c5
-rw-r--r--src/core/client_config/connector.h2
-rw-r--r--src/core/client_config/lb_policies/pick_first.c4
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c34
-rw-r--r--src/core/client_config/subchannel.c7
-rw-r--r--src/core/httpcli/httpcli.c73
-rw-r--r--src/core/iomgr/tcp_client_posix.c3
-rw-r--r--src/core/iomgr/tcp_posix.c2
-rw-r--r--src/core/iomgr/tcp_server_posix.c9
-rw-r--r--src/core/iomgr/udp_server.c6
-rw-r--r--src/core/iomgr/workqueue_posix.c6
-rw-r--r--src/core/security/server_secure_chttp2.c11
-rw-r--r--src/core/surface/call.c3
-rw-r--r--src/core/surface/channel.c7
-rw-r--r--src/core/surface/channel.h3
-rw-r--r--src/core/surface/channel_connectivity.c3
-rw-r--r--src/core/surface/channel_create.c17
-rw-r--r--src/core/surface/lame_client.c3
-rw-r--r--src/core/surface/secure_channel_create.c21
-rw-r--r--src/core/surface/server.c31
-rw-r--r--src/core/surface/server.h1
-rw-r--r--src/core/surface/server_chttp2.c9
-rw-r--r--src/core/transport/chttp2/frame_ping.c4
-rw-r--r--src/core/transport/chttp2_transport.c10
-rw-r--r--src/core/transport/chttp2_transport.h3
-rw-r--r--src/core/transport/connectivity_state.c15
-rw-r--r--src/core/transport/connectivity_state.h4
27 files changed, 175 insertions, 121 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 2624fcdd53..6fefdec2f6 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -670,8 +670,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
- "client_channel");
+ grpc_connectivity_state_init(&chand->state_tracker,
+ grpc_channel_get_workqueue(master),
+ GRPC_CHANNEL_IDLE, "client_channel");
}
/* Destructor for channel_data */
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h
index 39f3467990..bdaeda86ae 100644
--- a/src/core/client_config/connector.h
+++ b/src/core/client_config/connector.h
@@ -57,6 +57,8 @@ typedef struct {
const grpc_channel_args *channel_args;
/** metadata context */
grpc_mdctx *metadata_context;
+ /** workqueue */
+ grpc_workqueue *workqueue;
} grpc_connect_in_args;
typedef struct {
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 151b6f12f8..06186403e5 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -332,8 +332,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
p->num_subchannels = args->num_subchannels;
p->workqueue = args->workqueue;
grpc_workqueue_ref(p->workqueue);
- grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
- "pick_first");
+ grpc_connectivity_state_init(&p->state_tracker, args->workqueue,
+ GRPC_CHANNEL_IDLE, "pick_first");
memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels);
grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 2594e6fae9..bc04203744 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -61,6 +61,8 @@ typedef struct {
grpc_subchannel_factory *subchannel_factory;
/** load balancing policy name */
char *lb_policy_name;
+ /** work queue */
+ grpc_workqueue *workqueue;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -108,7 +110,7 @@ static void zookeeper_shutdown(grpc_resolver *resolver) {
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_iomgr_add_callback(r->next_completion);
+ grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
}
zookeeper_close(r->zookeeper_handle);
@@ -409,7 +411,7 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_iomgr_add_callback(r->next_completion);
+ grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
@@ -422,19 +424,19 @@ static void zookeeper_destroy(grpc_resolver *gr) {
grpc_client_config_unref(r->resolved_config);
}
grpc_subchannel_factory_unref(r->subchannel_factory);
+ grpc_workqueue_unref(r->workqueue);
gpr_free(r->name);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
-static grpc_resolver *zookeeper_create(
- grpc_uri *uri, const char *lb_policy_name,
- grpc_subchannel_factory *subchannel_factory) {
+static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
+ const char *lb_policy_name) {
zookeeper_resolver *r;
size_t length;
- char *path = uri->path;
+ char *path = args->uri->path;
- if (0 == strcmp(uri->authority, "")) {
+ if (0 == strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "No authority specified in zookeeper uri");
return NULL;
}
@@ -452,14 +454,19 @@ static grpc_resolver *zookeeper_create(
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
- r->subchannel_factory = subchannel_factory;
+ r->workqueue = args->workqueue;
+ grpc_workqueue_ref(r->workqueue);
+
+ r->subchannel_factory = args->subchannel_factory;
+ grpc_subchannel_factory_ref(r->subchannel_factory);
+
r->lb_policy_name = gpr_strdup(lb_policy_name);
- grpc_subchannel_factory_ref(subchannel_factory);
/** Initializes zookeeper client */
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher,
- GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
+ r->zookeeper_handle =
+ zookeeper_init(args->uri->authority, zookeeper_global_watcher,
+ GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
if (r->zookeeper_handle == NULL) {
gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
return NULL;
@@ -490,9 +497,8 @@ static char *zookeeper_factory_get_default_hostname(
}
static grpc_resolver *zookeeper_factory_create_resolver(
- grpc_resolver_factory *factory, grpc_uri *uri,
- grpc_subchannel_factory *subchannel_factory) {
- return zookeeper_create(uri, "pick_first", subchannel_factory);
+ grpc_resolver_factory *factory, grpc_resolver_args *args) {
+ return zookeeper_create(args, "pick_first");
}
static const grpc_resolver_factory_vtable zookeeper_factory_vtable = {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 0718ffbb8c..82212d2c6b 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -260,6 +260,7 @@ static void subchannel_destroy(grpc_subchannel *c) {
grpc_mdctx_unref(c->mdctx);
grpc_connectivity_state_destroy(&c->state_tracker);
grpc_connector_unref(c->connector);
+ grpc_workqueue_unref(c->workqueue);
gpr_free(c);
}
@@ -296,12 +297,14 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->args = grpc_channel_args_copy(args->args);
c->mdctx = args->mdctx;
c->master = args->master;
+ c->workqueue = grpc_channel_get_workqueue(c->master);
+ grpc_workqueue_ref(c->workqueue);
c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
c->random = random_seed();
grpc_mdctx_ref(c->mdctx);
grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
- grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
- "subchannel");
+ grpc_connectivity_state_init(&c->state_tracker, c->workqueue,
+ GRPC_CHANNEL_IDLE, "subchannel");
gpr_mu_init(&c->mu);
return c;
}
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 1e38479eb1..4bfe3cf973 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -65,6 +65,7 @@ typedef struct {
gpr_slice_buffer outgoing;
grpc_iomgr_closure on_read;
grpc_iomgr_closure done_write;
+ grpc_workqueue *workqueue;
} internal_request;
static grpc_httpcli_get_override g_get_override = NULL;
@@ -105,6 +106,7 @@ static void finish(internal_request *req, int success) {
grpc_iomgr_unregister_object(&req->iomgr_obj);
gpr_slice_buffer_destroy(&req->incoming);
gpr_slice_buffer_destroy(&req->outgoing);
+ grpc_workqueue_unref(req->workqueue);
gpr_free(req);
}
@@ -202,8 +204,8 @@ static void next_address(internal_request *req) {
}
addr = &req->addresses->addrs[req->next_address++];
grpc_tcp_client_connect(on_connected, req, &req->context->pollset_set,
- (struct sockaddr *)&addr->addr, addr->len,
- req->deadline);
+ req->workqueue, (struct sockaddr *)&addr->addr,
+ addr->len, req->deadline);
}
static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
@@ -217,19 +219,16 @@ static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
next_address(req);
}
-void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
- const grpc_httpcli_request *request,
- gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
- internal_request *req;
- char *name;
- if (g_get_override &&
- g_get_override(request, deadline, on_response, user_data)) {
- return;
- }
- req = gpr_malloc(sizeof(internal_request));
+static void internal_request_begin(grpc_httpcli_context *context,
+ grpc_pollset *pollset,
+ const grpc_httpcli_request *request,
+ gpr_timespec deadline,
+ grpc_httpcli_response_cb on_response,
+ void *user_data, const char *name,
+ gpr_slice request_text) {
+ internal_request *req = gpr_malloc(sizeof(internal_request));
memset(req, 0, sizeof(*req));
- req->request_text = grpc_httpcli_format_get_request(request);
+ req->request_text = request_text;
grpc_httpcli_parser_init(&req->parser);
req->on_response = on_response;
req->user_data = user_data;
@@ -242,51 +241,47 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
grpc_iomgr_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);
gpr_slice_buffer_init(&req->outgoing);
- gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name);
- gpr_free(name);
req->host = gpr_strdup(request->host);
+ req->workqueue = grpc_workqueue_create();
+ grpc_workqueue_add_to_pollset(req->workqueue, pollset);
grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset);
grpc_resolve_address(request->host, req->handshaker->default_port,
on_resolved, req);
}
+void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
+ const grpc_httpcli_request *request,
+ gpr_timespec deadline,
+ grpc_httpcli_response_cb on_response, void *user_data) {
+ char *name;
+ if (g_get_override &&
+ g_get_override(request, deadline, on_response, user_data)) {
+ return;
+ }
+ gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
+ internal_request_begin(context, pollset, request, deadline, on_response,
+ user_data, name,
+ grpc_httpcli_format_get_request(request));
+ gpr_free(name);
+}
+
void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data) {
- internal_request *req;
char *name;
if (g_post_override && g_post_override(request, body_bytes, body_size,
deadline, on_response, user_data)) {
return;
}
- req = gpr_malloc(sizeof(internal_request));
- memset(req, 0, sizeof(*req));
- req->request_text =
- grpc_httpcli_format_post_request(request, body_bytes, body_size);
- grpc_httpcli_parser_init(&req->parser);
- req->on_response = on_response;
- req->user_data = user_data;
- req->deadline = deadline;
- req->handshaker =
- request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
- req->context = context;
- req->pollset = pollset;
- grpc_iomgr_closure_init(&req->on_read, on_read, req);
- grpc_iomgr_closure_init(&req->done_write, done_write, req);
- gpr_slice_buffer_init(&req->incoming);
- gpr_slice_buffer_init(&req->outgoing);
gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
- grpc_iomgr_register_object(&req->iomgr_obj, name);
+ internal_request_begin(
+ context, pollset, request, deadline, on_response, user_data, name,
+ grpc_httpcli_format_post_request(request, body_bytes, body_size));
gpr_free(name);
- req->host = gpr_strdup(request->host);
-
- grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset);
- grpc_resolve_address(request->host, req->handshaker->default_port,
- on_resolved, req);
}
void grpc_httpcli_set_override(grpc_httpcli_get_override get,
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index c3668f6a92..8b1a3b0f9e 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -195,6 +195,7 @@ finish:
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
void *arg, grpc_pollset_set *interested_parties,
+ grpc_workqueue *workqueue,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) {
int fd;
@@ -236,7 +237,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
addr_str = grpc_sockaddr_to_uri(addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- fdobj = grpc_fd_create(fd, name);
+ fdobj = grpc_fd_create(fd, workqueue, name);
if (err >= 0) {
cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 68f469c368..c539cf2d34 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -261,7 +261,7 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
- grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1);
+ grpc_workqueue_push(tcp->em_fd->workqueue, &tcp->read_closure, 1);
}
/* TODO(ctiller): immediate return */
return GRPC_ENDPOINT_PENDING;
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index bcbd0afe6b..02d37350f7 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -124,6 +124,9 @@ struct grpc_tcp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+
+ /** workqueue for interally created async work */
+ grpc_workqueue *workqueue;
};
grpc_tcp_server *grpc_tcp_server_create(void) {
@@ -137,6 +140,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
+ s->workqueue = grpc_workqueue_create();
return s;
}
@@ -147,6 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) {
gpr_mu_destroy(&s->mu);
gpr_free(s->ports);
+ grpc_workqueue_unref(s->workqueue);
gpr_free(s);
}
@@ -339,7 +344,7 @@ static void on_read(void *arg, int success) {
addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
- fdobj = grpc_fd_create(fd, name);
+ fdobj = grpc_fd_create(fd, sp->server->workqueue, name);
/* TODO(ctiller): revise this when we have server-side sharding
of channels -- we certainly should not be automatically adding every
incoming channel to every pollset owned by the server */
@@ -387,7 +392,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, s->workqueue, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
GPR_ASSERT(sp->emfd);
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index ed9eee8726..d4e8e99bce 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -121,6 +121,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+
+ grpc_workqueue *workqueue;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -135,6 +137,7 @@ grpc_udp_server *grpc_udp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
+ s->workqueue = grpc_workqueue_create();
return s;
}
@@ -146,6 +149,7 @@ static void finish_shutdown(grpc_udp_server *s) {
gpr_cv_destroy(&s->cv);
gpr_free(s->ports);
+ grpc_workqueue_unref(s->workqueue);
gpr_free(s);
}
@@ -310,7 +314,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, s->workqueue, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->read_cb = read_cb;
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index ef1598c711..26626bef3b 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -35,6 +35,7 @@
#ifdef GPR_POSIX_SOCKET
+#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/workqueue.h"
#include <stdio.h>
@@ -52,8 +53,9 @@ grpc_workqueue *grpc_workqueue_create(void) {
workqueue->tail = &workqueue->head;
grpc_wakeup_fd_init(&workqueue->wakeup_fd);
sprintf(name, "workqueue:%p", (void *)workqueue);
- workqueue->wakeup_read_fd =
- grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name);
+ workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */
+ workqueue->wakeup_read_fd = grpc_fd_create(
+ GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name);
grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
return workqueue;
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 4749f5f516..a6f50712f5 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -85,7 +85,7 @@ static void state_unref(grpc_server_secure_state *state) {
}
static void setup_transport(void *statep, grpc_transport *transport,
- grpc_mdctx *mdctx) {
+ grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_server_auth_filter, &grpc_http_server_filter};
grpc_server_secure_state *state = statep;
@@ -98,7 +98,8 @@ static void setup_transport(void *statep, grpc_transport *transport,
grpc_server_get_channel_args(state->server), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
grpc_server_setup_transport(state->server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx, args_copy);
+ GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
+ args_copy);
grpc_channel_args_destroy(args_copy);
}
@@ -130,15 +131,17 @@ static void on_secure_transport_setup_done(void *statep,
grpc_server_secure_state *state = statep;
grpc_transport *transport;
grpc_mdctx *mdctx;
+ grpc_workqueue *workqueue;
if (status == GRPC_SECURITY_OK) {
gpr_mu_lock(&state->mu);
remove_tcp_from_list_locked(state, wrapped_endpoint);
if (!state->is_shutdown) {
mdctx = grpc_mdctx_create();
+ workqueue = grpc_workqueue_create();
transport = grpc_create_chttp2_transport(
grpc_server_get_channel_args(state->server), secure_endpoint, mdctx,
- 0);
- setup_transport(state, transport, mdctx);
+ workqueue, 0);
+ setup_transport(state, transport, mdctx, workqueue);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
} else {
/* We need to consume this here, because the server may already have gone
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4168c2ef0c..c2b3040319 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -499,7 +499,8 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
} else {
c->destroy_closure.cb = destroy_call;
c->destroy_closure.cb_arg = c;
- grpc_iomgr_add_callback(&c->destroy_closure);
+ grpc_workqueue_push(grpc_channel_get_workqueue(c->channel),
+ &c->destroy_closure, 1);
}
}
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index a89523b3ab..bf4aee190f 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -79,6 +79,7 @@ struct grpc_channel {
registered_call *registered_calls;
grpc_iomgr_closure destroy_closure;
char *target;
+ grpc_workqueue *workqueue;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -92,7 +93,8 @@ struct grpc_channel {
grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t num_filters,
- const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+ const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+ int is_client) {
size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
@@ -104,6 +106,7 @@ grpc_channel *grpc_channel_create_from_filters(
/* decremented by grpc_channel_destroy */
gpr_ref_init(&channel->refs, 1);
channel->metadata_context = mdctx;
+ channel->workqueue = workqueue;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0);
channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
@@ -311,7 +314,7 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
if (gpr_unref(&channel->refs)) {
channel->destroy_closure.cb = destroy_channel;
channel->destroy_closure.cb_arg = channel;
- grpc_iomgr_add_callback(&channel->destroy_closure);
+ grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1);
}
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 593faec7df..9fc821d64b 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -40,7 +40,8 @@
grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t count,
- const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
+ const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+ int is_client);
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 88a7c16598..12b15f353f 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -176,7 +176,8 @@ void grpc_channel_watch_connectivity_state(
"grpc_channel_watch_connectivity_state called on something that is "
"not a client channel, but '%s'",
client_channel_elem->filter->name);
- grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
+ grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete,
+ 1);
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem,
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 9e2cf1cf66..7a4ec00abb 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -74,7 +74,8 @@ static void connected(void *arg, grpc_endpoint *tcp) {
grpc_iomgr_closure *notify;
if (tcp != NULL) {
c->result->transport = grpc_create_chttp2_transport(
- c->args.channel_args, tcp, c->args.metadata_context, 1);
+ c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue,
+ 1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
GPR_ASSERT(c->result->transport);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
@@ -85,7 +86,7 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
notify = c->notify;
c->notify = NULL;
- grpc_iomgr_add_callback(notify);
+ notify->cb(notify->cb_arg, 1);
}
static void connector_connect(grpc_connector *con,
@@ -98,8 +99,9 @@ static void connector_connect(grpc_connector *con,
c->notify = notify;
c->args = *args;
c->result = result;
- grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
- args->addr_len, args->deadline);
+ grpc_tcp_client_connect(connected, c, args->interested_parties,
+ args->workqueue, args->addr, args->addr_len,
+ args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
@@ -164,6 +166,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_resolver *resolver;
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
+ grpc_workqueue *workqueue = grpc_workqueue_create();
size_t n = 0;
GPR_ASSERT(!reserved);
if (grpc_channel_args_is_census_enabled(args)) {
@@ -173,8 +176,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
- channel =
- grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1);
+ channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx,
+ workqueue, 1);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
@@ -184,7 +187,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
f->merge_args = grpc_channel_args_copy(args);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
+ resolver = grpc_resolver_create(target, &f->base, workqueue);
if (!resolver) {
return NULL;
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 80704cbf67..a5de900eff 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -150,7 +150,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
channel_data *chand;
static const grpc_channel_filter *filters[] = {&lame_filter};
channel = grpc_channel_create_from_filters(target, filters, 1, NULL,
- grpc_mdctx_create(), 1);
+ grpc_mdctx_create(),
+ grpc_workqueue_create(), 1);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GPR_ASSERT(elem->filter == &lame_filter);
chand = (channel_data *)elem->channel_data;
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index d141260421..ec077af8dd 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -57,7 +57,6 @@ typedef struct {
gpr_refcount refs;
grpc_channel_security_connector *security_connector;
- grpc_workqueue *workqueue;
grpc_iomgr_closure *notify;
grpc_connect_in_args args;
@@ -72,7 +71,6 @@ static void connector_ref(grpc_connector *con) {
static void connector_unref(grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
- grpc_workqueue_unref(c->workqueue);
gpr_free(c);
}
}
@@ -88,7 +86,8 @@ static void on_secure_transport_setup_done(void *arg,
memset(c->result, 0, sizeof(*c->result));
} else {
c->result->transport = grpc_create_chttp2_transport(
- c->args.channel_args, secure_endpoint, c->args.metadata_context, 1);
+ c->args.channel_args, secure_endpoint, c->args.metadata_context,
+ c->args.workqueue, 1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
c->result->filters[0] = &grpc_http_client_filter;
@@ -124,8 +123,9 @@ static void connector_connect(grpc_connector *con,
c->notify = notify;
c->args = *args;
c->result = result;
- grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue,
- args->addr, args->addr_len, args->deadline);
+ grpc_tcp_client_connect(connected, c, args->interested_parties,
+ args->workqueue, args->addr, args->addr_len,
+ args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
@@ -167,8 +167,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
- c->workqueue = grpc_channel_get_workqueue(f->master);
- grpc_workqueue_ref(c->workqueue);
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
args->args = final_args;
@@ -197,6 +195,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *connector;
grpc_mdctx *mdctx;
+ grpc_workqueue *workqueue;
grpc_resolver *resolver;
subchannel_factory *f;
#define MAX_FILTERS 3
@@ -219,6 +218,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
"Failed to create security connector.");
}
mdctx = grpc_mdctx_create();
+ workqueue = grpc_workqueue_create();
connector_arg = grpc_security_connector_to_arg(&connector->base);
args_copy = grpc_channel_args_copy_and_add(
@@ -231,8 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
- channel =
- grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1);
+ channel = grpc_channel_create_from_filters(target, filters, n, args_copy,
+ mdctx, workqueue, 1);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
@@ -244,8 +244,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
f->merge_args = grpc_channel_args_copy(args_copy);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base,
- grpc_channel_get_workqueue(channel));
+ resolver = grpc_resolver_create(target, &f->base, workqueue);
if (!resolver) {
return NULL;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 3d404f78a4..aba0f94fd4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -219,6 +219,8 @@ struct grpc_server {
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
+
+ grpc_workqueue *workqueue;
};
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -314,7 +316,7 @@ static void kill_zombie(void *elem, int success) {
}
static void request_matcher_zombify_all_pending_calls(
- request_matcher *request_matcher) {
+ request_matcher *request_matcher, grpc_workqueue *workqueue) {
while (request_matcher->pending_head) {
call_data *calld = request_matcher->pending_head;
request_matcher->pending_head = calld->pending_next;
@@ -324,7 +326,7 @@ static void request_matcher_zombify_all_pending_calls(
grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
}
}
@@ -363,6 +365,7 @@ static void server_delete(grpc_server *server) {
}
request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
+ grpc_workqueue_unref(server->workqueue);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -401,7 +404,8 @@ static void destroy_channel(channel_data *chand) {
maybe_finish_shutdown(chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
+ grpc_workqueue_push(chand->server->workqueue,
+ &chand->finish_destroy_channel_closure, 1);
}
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
@@ -414,7 +418,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
return;
}
@@ -505,10 +509,11 @@ static void kill_pending_work_locked(grpc_server *server) {
registered_method *rm;
request_matcher_kill_requests(server, &server->unregistered_request_matcher);
request_matcher_zombify_all_pending_calls(
- &server->unregistered_request_matcher);
+ &server->unregistered_request_matcher, server->workqueue);
for (rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_kill_requests(server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(&rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher,
+ server->workqueue);
}
}
@@ -561,6 +566,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
gpr_timespec op_deadline;
if (success && !calld->got_initial_metadata) {
@@ -595,7 +601,8 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(chand->server->workqueue,
+ &calld->kill_zombie_closure, 1);
} else {
gpr_mu_unlock(&calld->mu_state);
}
@@ -606,7 +613,8 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(chand->server->workqueue,
+ &calld->kill_zombie_closure, 1);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -799,6 +807,7 @@ grpc_server *grpc_server_create_from_filters(
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
+ server->workqueue = grpc_workqueue_create();
/* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls = 32768;
@@ -873,6 +882,7 @@ void grpc_server_start(grpc_server *server) {
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+ grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]);
}
for (l = server->listeners; l; l = l->next) {
@@ -883,6 +893,7 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
+ grpc_workqueue *workqueue,
const grpc_channel_args *args) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters =
@@ -917,7 +928,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
}
channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
- mdctx, 0);
+ mdctx, workqueue, 0);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
@@ -1119,7 +1130,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index c638d682bb..1d82d07ced 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -58,6 +58,7 @@ void grpc_server_listener_destroy_done(void *server);
void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
+ grpc_workqueue *workqueue,
const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 4ab845bc00..91cf6ece9c 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -43,11 +43,11 @@
#include <grpc/support/useful.h>
static void setup_transport(void *server, grpc_transport *transport,
- grpc_mdctx *mdctx) {
+ grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx,
+ GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
grpc_server_get_channel_args(server));
}
@@ -60,9 +60,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
* case.
*/
grpc_mdctx *mdctx = grpc_mdctx_create();
+ grpc_workqueue *workqueue = grpc_workqueue_create();
grpc_transport *transport = grpc_create_chttp2_transport(
- grpc_server_get_channel_args(server), tcp, mdctx, 0);
- setup_transport(server, transport, mdctx);
+ grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0);
+ setup_transport(server, transport, mdctx, workqueue);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
}
diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c
index 05451c7a8a..10d1e0a523 100644
--- a/src/core/transport/chttp2/frame_ping.c
+++ b/src/core/transport/chttp2/frame_ping.c
@@ -89,7 +89,9 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
for (ping = transport_parsing->pings.next;
ping != &transport_parsing->pings; ping = ping->next) {
if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) {
- grpc_iomgr_add_delayed_callback(ping->on_recv, 1);
+ /* we know no locks are held here, we may as well just call up
+ * directly */
+ ping->on_recv->cb(ping->on_recv->cb_arg, 1);
}
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index deb2fedf0c..705a025cca 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -166,7 +166,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
and maybe they hold resources that need to be freed */
while (t->global.pings.next != &t->global.pings) {
grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
- grpc_iomgr_add_delayed_callback(ping->on_recv, 0);
+ ping->on_recv->cb(ping->on_recv->cb_arg, 0);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -209,7 +209,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport(grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
grpc_endpoint *ep, grpc_mdctx *mdctx,
- gpr_uint8 is_client) {
+ grpc_workqueue *workqueue, gpr_uint8 is_client) {
size_t i;
int j;
@@ -242,7 +242,7 @@ static void init_transport(grpc_chttp2_transport *t,
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
- grpc_connectivity_state_init(&t->channel_callback.state_tracker,
+ grpc_connectivity_state_init(&t->channel_callback.state_tracker, workqueue,
GRPC_CHANNEL_READY, "transport");
gpr_slice_buffer_init(&t->global.qbuf);
@@ -1280,9 +1280,9 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
grpc_transport *grpc_create_chttp2_transport(
const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx,
- int is_client) {
+ grpc_workqueue *workqueue, int is_client) {
grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
- init_transport(t, channel_args, ep, mdctx, is_client != 0);
+ init_transport(t, channel_args, ep, mdctx, workqueue, is_client != 0);
return &t->base;
}
diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h
index fa0d6e4151..8bd8af6236 100644
--- a/src/core/transport/chttp2_transport.h
+++ b/src/core/transport/chttp2_transport.h
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H
#include "src/core/iomgr/endpoint.h"
+#include "src/core/iomgr/workqueue.h"
#include "src/core/transport/transport.h"
extern int grpc_http_trace;
@@ -42,7 +43,7 @@ extern int grpc_flowctl_trace;
grpc_transport *grpc_create_chttp2_transport(
const grpc_channel_args *channel_args, grpc_endpoint *ep,
- grpc_mdctx *metadata_context, int is_client);
+ grpc_mdctx *metadata_context, grpc_workqueue *workqueue, int is_client);
void grpc_chttp2_transport_start_reading(grpc_transport *transport,
gpr_slice *slices, size_t nslices);
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 61d26f06f0..716280505e 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -56,6 +56,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
}
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+ grpc_workqueue *workqueue,
grpc_connectivity_state init_state,
const char *name) {
tracker->current_state = init_state;
@@ -64,16 +65,18 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
}
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
+ int success;
grpc_connectivity_state_watcher *w;
while ((w = tracker->watchers)) {
tracker->watchers = w->next;
if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) {
*w->current = GRPC_CHANNEL_FATAL_FAILURE;
- grpc_iomgr_add_callback(w->notify);
+ success = 1;
} else {
- grpc_iomgr_add_delayed_callback(w->notify, 0);
+ success = 0;
}
+ grpc_workqueue_push(tracker->workqueue, w->notify, success);
gpr_free(w);
}
gpr_free(tracker->name);
@@ -94,7 +97,7 @@ int grpc_connectivity_state_notify_on_state_change(
}
if (tracker->current_state != *current) {
*current = tracker->current_state;
- grpc_iomgr_add_callback(notify);
+ grpc_workqueue_push(tracker->workqueue, notify, 1);
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -136,13 +139,13 @@ void grpc_connectivity_state_set_with_scheduler(
tracker->watchers = new;
}
-static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) {
- grpc_iomgr_add_callback(closure);
+static void default_scheduler(void *workqueue, grpc_iomgr_closure *closure) {
+ grpc_workqueue_push(workqueue, closure, 1);
}
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
const char *reason) {
grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler,
- NULL, reason);
+ tracker->workqueue, reason);
}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index a3b0b80c98..6c61e02623 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -36,6 +36,7 @@
#include <grpc/grpc.h>
#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/workqueue.h"
typedef struct grpc_connectivity_state_watcher {
/** we keep watchers in a linked list */
@@ -53,11 +54,14 @@ typedef struct {
grpc_connectivity_state_watcher *watchers;
/** a name to help debugging */
char *name;
+ /** workqueue for async work */
+ grpc_workqueue *workqueue;
} grpc_connectivity_state_tracker;
extern int grpc_connectivity_state_trace;
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+ grpc_workqueue *grpc_workqueue,
grpc_connectivity_state init_state,
const char *name);
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);