aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-15 07:41:28 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-15 07:41:28 -0700
commit06a43f5d7e7ec90dfe133c5dfa1bb2c3acb85059 (patch)
tree3280f1a990f3bf9f8d86fe899fc70b66641482fc
parent1701b093339fc124bd9c7f08eb7d8511799281ec (diff)
Progress towards workqueue transition
-rw-r--r--src/core/client_config/lb_policies/pick_first.c11
-rw-r--r--src/core/client_config/lb_policy_factory.h2
-rw-r--r--src/core/client_config/resolver_factory.c5
-rw-r--r--src/core/client_config/resolver_factory.h15
-rw-r--r--src/core/client_config/resolver_registry.c14
-rw-r--r--src/core/client_config/resolver_registry.h5
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c31
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c30
-rw-r--r--src/core/client_config/subchannel.c3
-rw-r--r--src/core/iomgr/endpoint_pair.h3
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c11
-rw-r--r--src/core/iomgr/fd_posix.c30
-rw-r--r--src/core/iomgr/fd_posix.h4
-rw-r--r--src/core/iomgr/iomgr.c109
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c2
-rw-r--r--src/core/iomgr/pollset_posix.c4
-rw-r--r--src/core/iomgr/tcp_client.h1
-rw-r--r--src/core/iomgr/workqueue_posix.c15
-rw-r--r--src/core/iomgr/workqueue_posix.h15
-rw-r--r--src/core/surface/secure_channel_create.c11
20 files changed, 136 insertions, 185 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index c8262e92ef..151b6f12f8 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -52,6 +52,8 @@ typedef struct {
/** all our subchannels */
grpc_subchannel **subchannels;
size_t num_subchannels;
+ /** workqueue for async work */
+ grpc_workqueue *workqueue;
grpc_iomgr_closure connectivity_changed;
@@ -102,6 +104,7 @@ void pf_destroy(grpc_lb_policy *pol) {
grpc_connectivity_state_destroy(&p->state_tracker);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
+ grpc_workqueue_unref(p->workqueue);
gpr_free(p);
}
@@ -114,7 +117,7 @@ void pf_shutdown(grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
+ grpc_workqueue_push(p->workqueue, pp->on_complete, 0);
gpr_free(pp);
}
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
@@ -196,7 +199,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
+ grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@@ -241,7 +244,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
+ grpc_workqueue_push(p->workqueue, pp->on_complete, 1);
gpr_free(pp);
}
unref = 1;
@@ -327,6 +330,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
p->num_subchannels = args->num_subchannels;
+ p->workqueue = args->workqueue;
+ grpc_workqueue_ref(p->workqueue);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"pick_first");
memcpy(p->subchannels, args->subchannels,
diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h
index 04610316ee..755faf8813 100644
--- a/src/core/client_config/lb_policy_factory.h
+++ b/src/core/client_config/lb_policy_factory.h
@@ -36,6 +36,7 @@
#include "src/core/client_config/lb_policy.h"
#include "src/core/client_config/subchannel.h"
+#include "src/core/iomgr/workqueue.h"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
@@ -49,6 +50,7 @@ struct grpc_lb_policy_factory {
typedef struct grpc_lb_policy_args {
grpc_subchannel **subchannels;
size_t num_subchannels;
+ grpc_workqueue *workqueue;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/client_config/resolver_factory.c b/src/core/client_config/resolver_factory.c
index 5b859a8d10..af04f7d9c7 100644
--- a/src/core/client_config/resolver_factory.c
+++ b/src/core/client_config/resolver_factory.c
@@ -43,10 +43,9 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *factory) {
/** Create a resolver instance for a name */
grpc_resolver *grpc_resolver_factory_create_resolver(
- grpc_resolver_factory *factory, grpc_uri *uri,
- grpc_subchannel_factory *subchannel_factory) {
+ grpc_resolver_factory *factory, grpc_resolver_args *args) {
if (factory == NULL) return NULL;
- return factory->vtable->create_resolver(factory, uri, subchannel_factory);
+ return factory->vtable->create_resolver(factory, args);
}
char *grpc_resolver_factory_get_default_authority(
diff --git a/src/core/client_config/resolver_factory.h b/src/core/client_config/resolver_factory.h
index e243b23988..24da84d89d 100644
--- a/src/core/client_config/resolver_factory.h
+++ b/src/core/client_config/resolver_factory.h
@@ -37,6 +37,7 @@
#include "src/core/client_config/resolver.h"
#include "src/core/client_config/subchannel_factory.h"
#include "src/core/client_config/uri_parser.h"
+#include "src/core/iomgr/workqueue.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable;
@@ -47,14 +48,19 @@ struct grpc_resolver_factory {
const grpc_resolver_factory_vtable *vtable;
};
+typedef struct grpc_resolver_args {
+ grpc_uri *uri;
+ grpc_subchannel_factory *subchannel_factory;
+ grpc_workqueue *workqueue;
+} grpc_resolver_args;
+
struct grpc_resolver_factory_vtable {
void (*ref)(grpc_resolver_factory *factory);
void (*unref)(grpc_resolver_factory *factory);
/** Implementation of grpc_resolver_factory_create_resolver */
- grpc_resolver *(*create_resolver)(
- grpc_resolver_factory *factory, grpc_uri *uri,
- grpc_subchannel_factory *subchannel_factory);
+ grpc_resolver *(*create_resolver)(grpc_resolver_factory *factory,
+ grpc_resolver_args *args);
/** Implementation of grpc_resolver_factory_get_default_authority */
char *(*get_default_authority)(grpc_resolver_factory *factory, grpc_uri *uri);
@@ -68,8 +74,7 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *resolver);
/** Create a resolver instance for a name */
grpc_resolver *grpc_resolver_factory_create_resolver(
- grpc_resolver_factory *factory, grpc_uri *uri,
- grpc_subchannel_factory *subchannel_factory);
+ grpc_resolver_factory *factory, grpc_resolver_args *args);
/** Return a (freshly allocated with gpr_malloc) string representing
the default authority to use for this scheme. */
diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c
index 37979b3b86..3611252bda 100644
--- a/src/core/client_config/resolver_registry.c
+++ b/src/core/client_config/resolver_registry.c
@@ -114,12 +114,18 @@ static grpc_resolver_factory *resolve_factory(const char *target,
return factory;
}
-grpc_resolver *grpc_resolver_create(
- const char *target, grpc_subchannel_factory *subchannel_factory) {
+grpc_resolver *grpc_resolver_create(const char *target,
+ grpc_subchannel_factory *subchannel_factory,
+ grpc_workqueue *workqueue) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
- grpc_resolver *resolver =
- grpc_resolver_factory_create_resolver(factory, uri, subchannel_factory);
+ grpc_resolver *resolver;
+ grpc_resolver_args args;
+ memset(&args, 0, sizeof(args));
+ args.uri = uri;
+ args.subchannel_factory = subchannel_factory;
+ args.workqueue = workqueue;
+ resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
}
diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h
index 5a7193b7ae..9a9a628262 100644
--- a/src/core/client_config/resolver_registry.h
+++ b/src/core/client_config/resolver_registry.h
@@ -55,8 +55,9 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
If a resolver factory was found, use it to instantiate a resolver and
return it.
If a resolver factory was not found, return NULL. */
-grpc_resolver *grpc_resolver_create(
- const char *target, grpc_subchannel_factory *subchannel_factory);
+grpc_resolver *grpc_resolver_create(const char *target,
+ grpc_subchannel_factory *subchannel_factory,
+ grpc_workqueue *workqueue);
/** Given a target, return a (freshly allocated with gpr_malloc) string
representing the default authority to pass from a client. */
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index ccec07a08c..b519ce4174 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -49,6 +49,8 @@ typedef struct {
grpc_resolver base;
/** refcount */
gpr_refcount refs;
+ /** workqueue */
+ grpc_workqueue *workqueue;
/** name to resolve */
char *name;
/** default port to use */
@@ -94,7 +96,7 @@ static void dns_shutdown(grpc_resolver *resolver) {
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_iomgr_add_callback(r->next_completion);
+ grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -180,7 +182,7 @@ static void dns_maybe_finish_next_locked(dns_resolver *r) {
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_iomgr_add_callback(r->next_completion);
+ grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
@@ -193,21 +195,21 @@ static void dns_destroy(grpc_resolver *gr) {
grpc_client_config_unref(r->resolved_config);
}
grpc_subchannel_factory_unref(r->subchannel_factory);
+ grpc_workqueue_unref(r->workqueue);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
-static grpc_resolver *dns_create(
- grpc_uri *uri, const char *default_port,
- const char* lb_policy_name,
- grpc_subchannel_factory *subchannel_factory) {
+static grpc_resolver *dns_create(grpc_resolver_args *args,
+ const char *default_port,
+ const char *lb_policy_name) {
dns_resolver *r;
- const char *path = uri->path;
+ const char *path = args->uri->path;
- if (0 != strcmp(uri->authority, "")) {
- gpr_log(GPR_ERROR, "authority based uri's not supported");
+ if (0 != strcmp(args->uri->authority, "")) {
+ gpr_log(GPR_ERROR, "authority based dns uri's not supported");
return NULL;
}
@@ -220,8 +222,10 @@ static grpc_resolver *dns_create(
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
- r->subchannel_factory = subchannel_factory;
- grpc_subchannel_factory_ref(subchannel_factory);
+ r->subchannel_factory = args->subchannel_factory;
+ grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->workqueue = args->workqueue;
+ grpc_workqueue_ref(r->workqueue);
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
@@ -235,9 +239,8 @@ static void dns_factory_ref(grpc_resolver_factory *factory) {}
static void dns_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *dns_factory_create_resolver(
- grpc_resolver_factory *factory, grpc_uri *uri,
- grpc_subchannel_factory *subchannel_factory) {
- return dns_create(uri, "https", "pick_first", subchannel_factory);
+ grpc_resolver_factory *factory, grpc_resolver_args *args) {
+ return dns_create(args, "https", "pick_first");
}
char *dns_factory_get_default_host_name(grpc_resolver_factory *factory,
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 2900df285c..260bee27a9 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -56,6 +56,8 @@ typedef struct {
gpr_refcount refs;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
+ /** workqueue */
+ grpc_workqueue *workqueue;
/** load balancing policy name */
char *lb_policy_name;
@@ -96,8 +98,7 @@ static void sockaddr_shutdown(grpc_resolver *resolver) {
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- /* TODO(ctiller): add delayed callback */
- grpc_iomgr_add_callback(r->next_completion);
+ grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -145,7 +146,7 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
*r->target_config = cfg;
- grpc_iomgr_add_callback(r->next_completion);
+ grpc_workqueue_push(r->workqueue, r->next_completion, 1);
r->next_completion = NULL;
}
}
@@ -154,6 +155,7 @@ static void sockaddr_destroy(grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
+ grpc_workqueue_unref(r->workqueue);
gpr_free(r->addrs);
gpr_free(r->addrs_len);
gpr_free(r->lb_policy_name);
@@ -278,8 +280,7 @@ done:
static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
- grpc_uri *uri, const char *lb_policy_name,
- grpc_subchannel_factory *subchannel_factory,
+ grpc_resolver_args *args, const char *lb_policy_name,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
size_t i;
int errors_found = 0; /* GPR_FALSE */
@@ -287,7 +288,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice path_slice;
gpr_slice_buffer path_parts;
- if (0 != strcmp(uri->authority, "")) {
+ if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
return NULL;
}
@@ -295,7 +296,8 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
- path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing);
+ path_slice =
+ gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing);
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
@@ -304,7 +306,7 @@ static grpc_resolver *sockaddr_create(
r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs);
for(i = 0; i < r->num_addrs; i++) {
- grpc_uri ith_uri = *uri;
+ grpc_uri ith_uri = *args->uri;
char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) {
@@ -324,10 +326,12 @@ static grpc_resolver *sockaddr_create(
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
- r->subchannel_factory = subchannel_factory;
+ r->subchannel_factory = args->subchannel_factory;
+ grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->workqueue = args->workqueue;
+ grpc_workqueue_ref(r->workqueue);
r->lb_policy_name = gpr_strdup(lb_policy_name);
- grpc_subchannel_factory_ref(subchannel_factory);
return &r->base;
}
@@ -341,10 +345,8 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
#define DECL_FACTORY(name) \
static grpc_resolver *name##_factory_create_resolver( \
- grpc_resolver_factory *factory, grpc_uri *uri, \
- grpc_subchannel_factory *subchannel_factory) { \
- return sockaddr_create(uri, "pick_first", \
- subchannel_factory, parse_##name); \
+ grpc_resolver_factory *factory, grpc_resolver_args *args) { \
+ return sockaddr_create(args, "pick_first", parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index ca52c75beb..0718ffbb8c 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -76,6 +76,7 @@ typedef struct waiting_for_connect {
struct grpc_subchannel {
grpc_connector *connector;
+ grpc_workqueue *workqueue;
/** non-transport related channel filters */
const grpc_channel_filter **filters;
@@ -575,7 +576,7 @@ static void publish_transport(grpc_subchannel *c) {
connectivity_state_changed_locked(c, "connected");
while ((w4c = c->waiting)) {
c->waiting = w4c->next;
- grpc_iomgr_add_callback(&w4c->continuation);
+ grpc_workqueue_push(c->workqueue, &w4c->continuation, 1);
}
gpr_mu_unlock(&c->mu);
diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h
index 095ec5fcc9..25ef1891fb 100644
--- a/src/core/iomgr/endpoint_pair.h
+++ b/src/core/iomgr/endpoint_pair.h
@@ -42,6 +42,7 @@ typedef struct {
} grpc_endpoint_pair;
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size);
+ size_t read_slice_size,
+ grpc_workqueue *workqueue);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
index deae9c6875..dc1f441b4b 100644
--- a/src/core/iomgr/endpoint_pair_posix.c
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -59,19 +59,20 @@ static void create_sockets(int sv[2]) {
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size) {
+ size_t read_slice_size,
+ grpc_workqueue *workqueue) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size,
- "socketpair-server");
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], workqueue, final_name),
+ read_slice_size, "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size,
- "socketpair-client");
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], workqueue, final_name),
+ read_slice_size, "socketpair-client");
gpr_free(final_name);
return p;
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 2d08a77a70..4bca9f3f6e 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -71,6 +71,9 @@ static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
+ if (fd->workqueue->wakeup_read_fd != fd) {
+ grpc_workqueue_unref(fd->workqueue);
+ }
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@@ -158,8 +161,14 @@ void grpc_fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-grpc_fd *grpc_fd_create(int fd, const char *name) {
+grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) {
grpc_fd *r = alloc_fd(fd);
+ r->workqueue = workqueue;
+ /* if the wakeup_read_fd is NULL, then the workqueue is under construction
+ ==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */
+ if (workqueue->wakeup_read_fd != NULL) {
+ grpc_workqueue_ref(workqueue);
+ }
grpc_iomgr_register_object(&r->iomgr_object, name);
return r;
}
@@ -220,7 +229,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure) {
- grpc_iomgr_add_callback(fd->on_done_closure);
+ grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1);
}
} else {
wake_all_watchers_locked(fd);
@@ -246,19 +255,19 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static void process_callback(grpc_iomgr_closure *closure, int success,
- int allow_synchronous_callback) {
- if (allow_synchronous_callback) {
+ grpc_workqueue *optional_workqueue) {
+ if (optional_workqueue == NULL) {
closure->cb(closure->cb_arg, success);
} else {
- grpc_iomgr_add_delayed_callback(closure, success);
+ grpc_workqueue_push(optional_workqueue, closure, success);
}
}
static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
- int success, int allow_synchronous_callback) {
+ int success, grpc_workqueue *optional_workqueue) {
size_t i;
for (i = 0; i < n; i++) {
- process_callback(callbacks + i, success, allow_synchronous_callback);
+ process_callback(callbacks + i, success, optional_workqueue);
}
}
@@ -286,7 +295,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback);
+ allow_synchronous_callback ? NULL : fd->workqueue);
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
@@ -339,7 +348,8 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
success = !gpr_atm_acq_load(&fd->shutdown);
GPR_ASSERT(ncb <= 1);
if (ncb > 0) {
- process_callbacks(closure, ncb, success, allow_synchronous_callback);
+ process_callbacks(closure, ncb, success,
+ allow_synchronous_callback ? NULL : fd->workqueue);
}
}
@@ -441,7 +451,7 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure != NULL) {
- grpc_iomgr_add_callback(fd->on_done_closure);
+ grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1);
}
}
gpr_mu_unlock(&fd->watcher_mu);
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 835e9b339a..e5157ad342 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -36,6 +36,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/workqueue.h"
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@@ -57,6 +58,7 @@ struct grpc_fd {
meaning that mostly we ref by two to avoid altering the orphaned bit,
and just unref by 1 when we're ready to flag the object as orphaned */
gpr_atm refst;
+ grpc_workqueue *workqueue;
gpr_mu set_state_mu;
gpr_atm shutdown;
@@ -103,7 +105,7 @@ struct grpc_fd {
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
This takes ownership of closing fd. */
-grpc_fd *grpc_fd_create(int fd, const char *name);
+grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name);
/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index cf74a16c0c..66d7e9b110 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -57,7 +57,6 @@ void grpc_kick_poller(void) {
}
void grpc_iomgr_init(void) {
- gpr_thd_id id;
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
@@ -65,8 +64,6 @@ void grpc_iomgr_init(void) {
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
- gpr_event_init(&g_background_callback_executor_done);
- gpr_thd_new(&id, background_callback_executor, NULL, NULL);
}
static size_t count_objects(void) {
@@ -86,58 +83,36 @@ static void dump_objects(const char *kind) {
}
void grpc_iomgr_shutdown(void) {
- grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN));
gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_mu_lock(&g_mu);
g_shutdown = 1;
- while (g_cbs_head != NULL || g_root_object.next != &g_root_object) {
+ while (g_root_object.next != &g_root_object) {
if (gpr_time_cmp(
gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time),
gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
- if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
- gpr_log(GPR_DEBUG,
- "Waiting for %d iomgr objects to be destroyed and executing "
- "final callbacks",
- count_objects());
- } else if (g_cbs_head != NULL) {
- gpr_log(GPR_DEBUG, "Executing final iomgr callbacks");
- } else {
+ if (g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
count_objects());
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
- if (g_cbs_head) {
- do {
- closure = g_cbs_head;
- g_cbs_head = closure->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
-
- closure->cb(closure->cb_arg, 0);
- gpr_mu_lock(&g_mu);
- } while (g_cbs_head);
- continue;
- }
if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
continue;
}
if (g_root_object.next != &g_root_object) {
int timeout = 0;
- while (g_cbs_head == NULL) {
- gpr_timespec short_deadline = gpr_time_add(
+ gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
- if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
- if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
- timeout = 1;
- break;
- }
+ if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) {
+ if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
+ timeout = 1;
+ break;
}
}
- if (timeout) {
+ if (timeout && g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
@@ -149,10 +124,6 @@ void grpc_iomgr_shutdown(void) {
}
gpr_mu_unlock(&g_mu);
- grpc_kick_poller();
- gpr_event_wait(&g_background_callback_executor_done,
- gpr_inf_future(GPR_CLOCK_REALTIME));
-
grpc_alarm_list_shutdown();
grpc_iomgr_platform_shutdown();
@@ -184,67 +155,3 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
closure->cb_arg = cb_arg;
closure->next = NULL;
}
-
-static void assert_not_scheduled_locked(grpc_iomgr_closure *closure) {
-#ifndef NDEBUG
- grpc_iomgr_closure *c;
-
- for (c = g_cbs_head; c; c = c->next) {
- GPR_ASSERT(c != closure);
- }
-#endif
-}
-
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
- closure->success = success;
- GPR_ASSERT(closure->cb);
- gpr_mu_lock(&g_mu);
- assert_not_scheduled_locked(closure);
- closure->next = NULL;
- if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = closure;
- } else {
- g_cbs_tail->next = closure;
- g_cbs_tail = closure;
- }
- if (g_shutdown) {
- gpr_cv_signal(&g_rcv);
- }
- gpr_mu_unlock(&g_mu);
-}
-
-void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) {
- grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */);
-}
-
-int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
- int n = 0;
- gpr_mu *retake_mu = NULL;
- grpc_iomgr_closure *closure;
- for (;;) {
- /* check for new work */
- if (!gpr_mu_trylock(&g_mu)) {
- break;
- }
- closure = g_cbs_head;
- if (!closure) {
- gpr_mu_unlock(&g_mu);
- break;
- }
- g_cbs_head = closure->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
- /* if we have a mutex to drop, do so before executing work */
- if (drop_mu) {
- gpr_mu_unlock(drop_mu);
- retake_mu = drop_mu;
- drop_mu = NULL;
- }
- closure->cb(closure->cb_arg, success && closure->success);
- n++;
- }
- if (retake_mu) {
- gpr_mu_lock(retake_mu);
- }
- return n;
-}
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 8f62ce2954..971b7b4a01 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -127,7 +127,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
GRPC_FD_REF(fd, "delayed_add");
grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
- grpc_iomgr_add_callback(&da->closure);
+ grpc_workqueue_push(fd->workqueue, &da->closure, 1);
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index b98b40b222..ad9c862d6e 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -293,7 +293,7 @@ static void basic_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
- grpc_iomgr_add_callback(&up_args->promotion_closure);
+ grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
gpr_mu_unlock(&pollset->mu);
return;
}
@@ -385,7 +385,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->original_vtable = pollset->vtable;
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
- grpc_iomgr_add_callback(&up_args->promotion_closure);
+ grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h
index 12296bd55b..57f80016c2 100644
--- a/src/core/iomgr/tcp_client.h
+++ b/src/core/iomgr/tcp_client.h
@@ -46,6 +46,7 @@
in this connection being established (in order to continue their work) */
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
void *arg, grpc_pollset_set *interested_parties,
+ grpc_workqueue *workqueue,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline);
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index df4e0aca75..ef1598c711 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -41,21 +41,6 @@
#include <grpc/support/alloc.h>
-#include "src/core/iomgr/fd_posix.h"
-
-struct grpc_workqueue {
- gpr_refcount refs;
-
- gpr_mu mu;
- grpc_iomgr_closure head;
- grpc_iomgr_closure *tail;
-
- grpc_wakeup_fd wakeup_fd;
- grpc_fd *wakeup_read_fd;
-
- grpc_iomgr_closure read_closure;
-};
-
static void on_readable(void *arg, int success);
grpc_workqueue *grpc_workqueue_create(void) {
diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h
index ba455522ff..1b3a0e281b 100644
--- a/src/core/iomgr/workqueue_posix.h
+++ b/src/core/iomgr/workqueue_posix.h
@@ -34,4 +34,19 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
+struct grpc_fd;
+
+struct grpc_workqueue {
+ gpr_refcount refs;
+
+ gpr_mu mu;
+ grpc_iomgr_closure head;
+ grpc_iomgr_closure *tail;
+
+ grpc_wakeup_fd wakeup_fd;
+ struct grpc_fd *wakeup_read_fd;
+
+ grpc_iomgr_closure read_closure;
+};
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 06ffa8dd6e..d141260421 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -57,6 +57,7 @@ typedef struct {
gpr_refcount refs;
grpc_channel_security_connector *security_connector;
+ grpc_workqueue *workqueue;
grpc_iomgr_closure *notify;
grpc_connect_in_args args;
@@ -71,6 +72,7 @@ static void connector_ref(grpc_connector *con) {
static void connector_unref(grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
+ grpc_workqueue_unref(c->workqueue);
gpr_free(c);
}
}
@@ -122,8 +124,8 @@ static void connector_connect(grpc_connector *con,
c->notify = notify;
c->args = *args;
c->result = result;
- grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
- args->addr_len, args->deadline);
+ grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue,
+ args->addr, args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
@@ -165,6 +167,8 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
+ c->workqueue = grpc_channel_get_workqueue(f->master);
+ grpc_workqueue_ref(c->workqueue);
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
args->args = final_args;
@@ -240,7 +244,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
f->merge_args = grpc_channel_args_copy(args_copy);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
+ resolver = grpc_resolver_create(target, &f->base,
+ grpc_channel_get_workqueue(channel));
if (!resolver) {
return NULL;
}