diff options
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 11 | ||||
-rw-r--r-- | src/core/client_config/lb_policy_factory.h | 2 | ||||
-rw-r--r-- | src/core/client_config/resolver_factory.c | 5 | ||||
-rw-r--r-- | src/core/client_config/resolver_factory.h | 15 | ||||
-rw-r--r-- | src/core/client_config/resolver_registry.c | 14 | ||||
-rw-r--r-- | src/core/client_config/resolver_registry.h | 5 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 31 | ||||
-rw-r--r-- | src/core/client_config/resolvers/sockaddr_resolver.c | 30 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_posix.c | 11 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 30 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 109 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 15 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.h | 15 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 11 |
20 files changed, 136 insertions, 185 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index c8262e92ef..151b6f12f8 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -52,6 +52,8 @@ typedef struct { /** all our subchannels */ grpc_subchannel **subchannels; size_t num_subchannels; + /** workqueue for async work */ + grpc_workqueue *workqueue; grpc_iomgr_closure connectivity_changed; @@ -102,6 +104,7 @@ void pf_destroy(grpc_lb_policy *pol) { grpc_connectivity_state_destroy(&p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); + grpc_workqueue_unref(p->workqueue); gpr_free(p); } @@ -114,7 +117,7 @@ void pf_shutdown(grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_iomgr_add_delayed_callback(pp->on_complete, 0); + grpc_workqueue_push(p->workqueue, pp->on_complete, 0); gpr_free(pp); } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, @@ -196,7 +199,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->pending_picks = pp->next; *pp->target = p->selected; grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + grpc_workqueue_push(p->workqueue, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -241,7 +244,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + grpc_workqueue_push(p->workqueue, pp->on_complete, 1); gpr_free(pp); } unref = 1; @@ -327,6 +330,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; + p->workqueue = args->workqueue; + grpc_workqueue_ref(p->workqueue); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); memcpy(p->subchannels, args->subchannels, diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h index 04610316ee..755faf8813 100644 --- a/src/core/client_config/lb_policy_factory.h +++ b/src/core/client_config/lb_policy_factory.h @@ -36,6 +36,7 @@ #include "src/core/client_config/lb_policy.h" #include "src/core/client_config/subchannel.h" +#include "src/core/iomgr/workqueue.h" typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; @@ -49,6 +50,7 @@ struct grpc_lb_policy_factory { typedef struct grpc_lb_policy_args { grpc_subchannel **subchannels; size_t num_subchannels; + grpc_workqueue *workqueue; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { diff --git a/src/core/client_config/resolver_factory.c b/src/core/client_config/resolver_factory.c index 5b859a8d10..af04f7d9c7 100644 --- a/src/core/client_config/resolver_factory.c +++ b/src/core/client_config/resolver_factory.c @@ -43,10 +43,9 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *factory) { /** Create a resolver instance for a name */ grpc_resolver *grpc_resolver_factory_create_resolver( - grpc_resolver_factory *factory, grpc_uri *uri, - grpc_subchannel_factory *subchannel_factory) { + grpc_resolver_factory *factory, grpc_resolver_args *args) { if (factory == NULL) return NULL; - return factory->vtable->create_resolver(factory, uri, subchannel_factory); + return factory->vtable->create_resolver(factory, args); } char *grpc_resolver_factory_get_default_authority( diff --git a/src/core/client_config/resolver_factory.h b/src/core/client_config/resolver_factory.h index e243b23988..24da84d89d 100644 --- a/src/core/client_config/resolver_factory.h +++ b/src/core/client_config/resolver_factory.h @@ -37,6 +37,7 @@ #include "src/core/client_config/resolver.h" #include "src/core/client_config/subchannel_factory.h" #include "src/core/client_config/uri_parser.h" +#include "src/core/iomgr/workqueue.h" typedef struct grpc_resolver_factory grpc_resolver_factory; typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable; @@ -47,14 +48,19 @@ struct grpc_resolver_factory { const grpc_resolver_factory_vtable *vtable; }; +typedef struct grpc_resolver_args { + grpc_uri *uri; + grpc_subchannel_factory *subchannel_factory; + grpc_workqueue *workqueue; +} grpc_resolver_args; + struct grpc_resolver_factory_vtable { void (*ref)(grpc_resolver_factory *factory); void (*unref)(grpc_resolver_factory *factory); /** Implementation of grpc_resolver_factory_create_resolver */ - grpc_resolver *(*create_resolver)( - grpc_resolver_factory *factory, grpc_uri *uri, - grpc_subchannel_factory *subchannel_factory); + grpc_resolver *(*create_resolver)(grpc_resolver_factory *factory, + grpc_resolver_args *args); /** Implementation of grpc_resolver_factory_get_default_authority */ char *(*get_default_authority)(grpc_resolver_factory *factory, grpc_uri *uri); @@ -68,8 +74,7 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *resolver); /** Create a resolver instance for a name */ grpc_resolver *grpc_resolver_factory_create_resolver( - grpc_resolver_factory *factory, grpc_uri *uri, - grpc_subchannel_factory *subchannel_factory); + grpc_resolver_factory *factory, grpc_resolver_args *args); /** Return a (freshly allocated with gpr_malloc) string representing the default authority to use for this scheme. */ diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c index 37979b3b86..3611252bda 100644 --- a/src/core/client_config/resolver_registry.c +++ b/src/core/client_config/resolver_registry.c @@ -114,12 +114,18 @@ static grpc_resolver_factory *resolve_factory(const char *target, return factory; } -grpc_resolver *grpc_resolver_create( - const char *target, grpc_subchannel_factory *subchannel_factory) { +grpc_resolver *grpc_resolver_create(const char *target, + grpc_subchannel_factory *subchannel_factory, + grpc_workqueue *workqueue) { grpc_uri *uri = NULL; grpc_resolver_factory *factory = resolve_factory(target, &uri); - grpc_resolver *resolver = - grpc_resolver_factory_create_resolver(factory, uri, subchannel_factory); + grpc_resolver *resolver; + grpc_resolver_args args; + memset(&args, 0, sizeof(args)); + args.uri = uri; + args.subchannel_factory = subchannel_factory; + args.workqueue = workqueue; + resolver = grpc_resolver_factory_create_resolver(factory, &args); grpc_uri_destroy(uri); return resolver; } diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h index 5a7193b7ae..9a9a628262 100644 --- a/src/core/client_config/resolver_registry.h +++ b/src/core/client_config/resolver_registry.h @@ -55,8 +55,9 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ -grpc_resolver *grpc_resolver_create( - const char *target, grpc_subchannel_factory *subchannel_factory); +grpc_resolver *grpc_resolver_create(const char *target, + grpc_subchannel_factory *subchannel_factory, + grpc_workqueue *workqueue); /** Given a target, return a (freshly allocated with gpr_malloc) string representing the default authority to pass from a client. */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index ccec07a08c..b519ce4174 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -49,6 +49,8 @@ typedef struct { grpc_resolver base; /** refcount */ gpr_refcount refs; + /** workqueue */ + grpc_workqueue *workqueue; /** name to resolve */ char *name; /** default port to use */ @@ -94,7 +96,7 @@ static void dns_shutdown(grpc_resolver *resolver) { gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -180,7 +182,7 @@ static void dns_maybe_finish_next_locked(dns_resolver *r) { if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; r->published_version = r->resolved_version; } @@ -193,21 +195,21 @@ static void dns_destroy(grpc_resolver *gr) { grpc_client_config_unref(r->resolved_config); } grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_workqueue_unref(r->workqueue); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r->lb_policy_name); gpr_free(r); } -static grpc_resolver *dns_create( - grpc_uri *uri, const char *default_port, - const char* lb_policy_name, - grpc_subchannel_factory *subchannel_factory) { +static grpc_resolver *dns_create(grpc_resolver_args *args, + const char *default_port, + const char *lb_policy_name) { dns_resolver *r; - const char *path = uri->path; + const char *path = args->uri->path; - if (0 != strcmp(uri->authority, "")) { - gpr_log(GPR_ERROR, "authority based uri's not supported"); + if (0 != strcmp(args->uri->authority, "")) { + gpr_log(GPR_ERROR, "authority based dns uri's not supported"); return NULL; } @@ -220,8 +222,10 @@ static grpc_resolver *dns_create( grpc_resolver_init(&r->base, &dns_resolver_vtable); r->name = gpr_strdup(path); r->default_port = gpr_strdup(default_port); - r->subchannel_factory = subchannel_factory; - grpc_subchannel_factory_ref(subchannel_factory); + r->subchannel_factory = args->subchannel_factory; + grpc_subchannel_factory_ref(r->subchannel_factory); + r->workqueue = args->workqueue; + grpc_workqueue_ref(r->workqueue); r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } @@ -235,9 +239,8 @@ static void dns_factory_ref(grpc_resolver_factory *factory) {} static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( - grpc_resolver_factory *factory, grpc_uri *uri, - grpc_subchannel_factory *subchannel_factory) { - return dns_create(uri, "https", "pick_first", subchannel_factory); + grpc_resolver_factory *factory, grpc_resolver_args *args) { + return dns_create(args, "https", "pick_first"); } char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 2900df285c..260bee27a9 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -56,6 +56,8 @@ typedef struct { gpr_refcount refs; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; + /** workqueue */ + grpc_workqueue *workqueue; /** load balancing policy name */ char *lb_policy_name; @@ -96,8 +98,7 @@ static void sockaddr_shutdown(grpc_resolver *resolver) { gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - /* TODO(ctiller): add delayed callback */ - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -145,7 +146,7 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { GRPC_LB_POLICY_UNREF(lb_policy, "unix"); r->published = 1; *r->target_config = cfg; - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; } } @@ -154,6 +155,7 @@ static void sockaddr_destroy(grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_workqueue_unref(r->workqueue); gpr_free(r->addrs); gpr_free(r->addrs_len); gpr_free(r->lb_policy_name); @@ -278,8 +280,7 @@ done: static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( - grpc_uri *uri, const char *lb_policy_name, - grpc_subchannel_factory *subchannel_factory, + grpc_resolver_args *args, const char *lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { size_t i; int errors_found = 0; /* GPR_FALSE */ @@ -287,7 +288,7 @@ static grpc_resolver *sockaddr_create( gpr_slice path_slice; gpr_slice_buffer path_parts; - if (0 != strcmp(uri->authority, "")) { + if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported"); return NULL; } @@ -295,7 +296,8 @@ static grpc_resolver *sockaddr_create( r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); - path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing); + path_slice = + gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); gpr_slice_buffer_init(&path_parts); gpr_slice_split(path_slice, ",", &path_parts); @@ -304,7 +306,7 @@ static grpc_resolver *sockaddr_create( r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); for(i = 0; i < r->num_addrs; i++) { - grpc_uri ith_uri = *uri; + grpc_uri ith_uri = *args->uri; char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { @@ -324,10 +326,12 @@ static grpc_resolver *sockaddr_create( gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); - r->subchannel_factory = subchannel_factory; + r->subchannel_factory = args->subchannel_factory; + grpc_subchannel_factory_ref(r->subchannel_factory); + r->workqueue = args->workqueue; + grpc_workqueue_ref(r->workqueue); r->lb_policy_name = gpr_strdup(lb_policy_name); - grpc_subchannel_factory_ref(subchannel_factory); return &r->base; } @@ -341,10 +345,8 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} #define DECL_FACTORY(name) \ static grpc_resolver *name##_factory_create_resolver( \ - grpc_resolver_factory *factory, grpc_uri *uri, \ - grpc_subchannel_factory *subchannel_factory) { \ - return sockaddr_create(uri, "pick_first", \ - subchannel_factory, parse_##name); \ + grpc_resolver_factory *factory, grpc_resolver_args *args) { \ + return sockaddr_create(args, "pick_first", parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ sockaddr_factory_ref, sockaddr_factory_unref, \ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index ca52c75beb..0718ffbb8c 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -76,6 +76,7 @@ typedef struct waiting_for_connect { struct grpc_subchannel { grpc_connector *connector; + grpc_workqueue *workqueue; /** non-transport related channel filters */ const grpc_channel_filter **filters; @@ -575,7 +576,7 @@ static void publish_transport(grpc_subchannel *c) { connectivity_state_changed_locked(c, "connected"); while ((w4c = c->waiting)) { c->waiting = w4c->next; - grpc_iomgr_add_callback(&w4c->continuation); + grpc_workqueue_push(c->workqueue, &w4c->continuation, 1); } gpr_mu_unlock(&c->mu); diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h index 095ec5fcc9..25ef1891fb 100644 --- a/src/core/iomgr/endpoint_pair.h +++ b/src/core/iomgr/endpoint_pair.h @@ -42,6 +42,7 @@ typedef struct { } grpc_endpoint_pair; grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size); + size_t read_slice_size, + grpc_workqueue *workqueue); #endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index deae9c6875..dc1f441b4b 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -59,19 +59,20 @@ static void create_sockets(int sv[2]) { } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size) { + size_t read_slice_size, + grpc_workqueue *workqueue) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, - "socketpair-server"); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], workqueue, final_name), + read_slice_size, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, - "socketpair-client"); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], workqueue, final_name), + read_slice_size, "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 2d08a77a70..4bca9f3f6e 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -71,6 +71,9 @@ static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; static void freelist_fd(grpc_fd *fd) { + if (fd->workqueue->wakeup_read_fd != fd) { + grpc_workqueue_unref(fd->workqueue); + } gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -158,8 +161,14 @@ void grpc_fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -grpc_fd *grpc_fd_create(int fd, const char *name) { +grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) { grpc_fd *r = alloc_fd(fd); + r->workqueue = workqueue; + /* if the wakeup_read_fd is NULL, then the workqueue is under construction + ==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */ + if (workqueue->wakeup_read_fd != NULL) { + grpc_workqueue_ref(workqueue); + } grpc_iomgr_register_object(&r->iomgr_object, name); return r; } @@ -220,7 +229,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, fd->closed = 1; close(fd->fd); if (fd->on_done_closure) { - grpc_iomgr_add_callback(fd->on_done_closure); + grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1); } } else { wake_all_watchers_locked(fd); @@ -246,19 +255,19 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif static void process_callback(grpc_iomgr_closure *closure, int success, - int allow_synchronous_callback) { - if (allow_synchronous_callback) { + grpc_workqueue *optional_workqueue) { + if (optional_workqueue == NULL) { closure->cb(closure->cb_arg, success); } else { - grpc_iomgr_add_delayed_callback(closure, success); + grpc_workqueue_push(optional_workqueue, closure, success); } } static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, - int success, int allow_synchronous_callback) { + int success, grpc_workqueue *optional_workqueue) { size_t i; for (i = 0; i < n; i++) { - process_callback(callbacks + i, success, allow_synchronous_callback); + process_callback(callbacks + i, success, optional_workqueue); } } @@ -286,7 +295,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback); + allow_synchronous_callback ? NULL : fd->workqueue); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -339,7 +348,8 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, success = !gpr_atm_acq_load(&fd->shutdown); GPR_ASSERT(ncb <= 1); if (ncb > 0) { - process_callbacks(closure, ncb, success, allow_synchronous_callback); + process_callbacks(closure, ncb, success, + allow_synchronous_callback ? NULL : fd->workqueue); } } @@ -441,7 +451,7 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { fd->closed = 1; close(fd->fd); if (fd->on_done_closure != NULL) { - grpc_iomgr_add_callback(fd->on_done_closure); + grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1); } } gpr_mu_unlock(&fd->watcher_mu); diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 835e9b339a..e5157ad342 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -36,6 +36,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/workqueue.h" #include <grpc/support/atm.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -57,6 +58,7 @@ struct grpc_fd { meaning that mostly we ref by two to avoid altering the orphaned bit, and just unref by 1 when we're ready to flag the object as orphaned */ gpr_atm refst; + grpc_workqueue *workqueue; gpr_mu set_state_mu; gpr_atm shutdown; @@ -103,7 +105,7 @@ struct grpc_fd { /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. This takes ownership of closing fd. */ -grpc_fd *grpc_fd_create(int fd, const char *name); +grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name); /* Releases fd to be asynchronously destroyed. on_done is called when the underlying file descriptor is definitely close()d. diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index cf74a16c0c..66d7e9b110 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -57,7 +57,6 @@ void grpc_kick_poller(void) { } void grpc_iomgr_init(void) { - gpr_thd_id id; g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); @@ -65,8 +64,6 @@ void grpc_iomgr_init(void) { g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_iomgr_platform_init(); - gpr_event_init(&g_background_callback_executor_done); - gpr_thd_new(&id, background_callback_executor, NULL, NULL); } static size_t count_objects(void) { @@ -86,58 +83,36 @@ static void dump_objects(const char *kind) { } void grpc_iomgr_shutdown(void) { - grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN)); gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); gpr_mu_lock(&g_mu); g_shutdown = 1; - while (g_cbs_head != NULL || g_root_object.next != &g_root_object) { + while (g_root_object.next != &g_root_object) { if (gpr_time_cmp( gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time), gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { - if (g_cbs_head != NULL && g_root_object.next != &g_root_object) { - gpr_log(GPR_DEBUG, - "Waiting for %d iomgr objects to be destroyed and executing " - "final callbacks", - count_objects()); - } else if (g_cbs_head != NULL) { - gpr_log(GPR_DEBUG, "Executing final iomgr callbacks"); - } else { + if (g_root_object.next != &g_root_object) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed", count_objects()); } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (g_cbs_head) { - do { - closure = g_cbs_head; - g_cbs_head = closure->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - - closure->cb(closure->cb_arg, 0); - gpr_mu_lock(&g_mu); - } while (g_cbs_head); - continue; - } if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { continue; } if (g_root_object.next != &g_root_object) { int timeout = 0; - while (g_cbs_head == NULL) { - gpr_timespec short_deadline = gpr_time_add( + gpr_timespec short_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); - if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { - if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { - timeout = 1; - break; - } + if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) { + if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { + timeout = 1; + break; } } - if (timeout) { + if (timeout && g_root_object.next != &g_root_object) { gpr_log(GPR_DEBUG, "Failed to free %d iomgr objects before shutdown deadline: " "memory leaks are likely", @@ -149,10 +124,6 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); - grpc_kick_poller(); - gpr_event_wait(&g_background_callback_executor_done, - gpr_inf_future(GPR_CLOCK_REALTIME)); - grpc_alarm_list_shutdown(); grpc_iomgr_platform_shutdown(); @@ -184,67 +155,3 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, closure->cb_arg = cb_arg; closure->next = NULL; } - -static void assert_not_scheduled_locked(grpc_iomgr_closure *closure) { -#ifndef NDEBUG - grpc_iomgr_closure *c; - - for (c = g_cbs_head; c; c = c->next) { - GPR_ASSERT(c != closure); - } -#endif -} - -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { - closure->success = success; - GPR_ASSERT(closure->cb); - gpr_mu_lock(&g_mu); - assert_not_scheduled_locked(closure); - closure->next = NULL; - if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = closure; - } else { - g_cbs_tail->next = closure; - g_cbs_tail = closure; - } - if (g_shutdown) { - gpr_cv_signal(&g_rcv); - } - gpr_mu_unlock(&g_mu); -} - -void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { - grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */); -} - -int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { - int n = 0; - gpr_mu *retake_mu = NULL; - grpc_iomgr_closure *closure; - for (;;) { - /* check for new work */ - if (!gpr_mu_trylock(&g_mu)) { - break; - } - closure = g_cbs_head; - if (!closure) { - gpr_mu_unlock(&g_mu); - break; - } - g_cbs_head = closure->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - /* if we have a mutex to drop, do so before executing work */ - if (drop_mu) { - gpr_mu_unlock(drop_mu); - retake_mu = drop_mu; - drop_mu = NULL; - } - closure->cb(closure->cb_arg, success && closure->success); - n++; - } - if (retake_mu) { - gpr_mu_lock(retake_mu); - } - return n; -} diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 8f62ce2954..971b7b4a01 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -127,7 +127,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, GRPC_FD_REF(fd, "delayed_add"); grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_iomgr_add_callback(&da->closure); + grpc_workqueue_push(fd->workqueue, &da->closure, 1); } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index b98b40b222..ad9c862d6e 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -293,7 +293,7 @@ static void basic_do_promote(void *args, int success) { /* First we need to ensure that nobody is polling concurrently */ if (grpc_pollset_has_workers(pollset)) { grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - grpc_iomgr_add_callback(&up_args->promotion_closure); + grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1); gpr_mu_unlock(&pollset->mu); return; } @@ -385,7 +385,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, up_args->original_vtable = pollset->vtable; up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_iomgr_add_callback(&up_args->promotion_closure); + grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1); grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index 12296bd55b..57f80016c2 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -46,6 +46,7 @@ in this connection being established (in order to continue their work) */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), void *arg, grpc_pollset_set *interested_parties, + grpc_workqueue *workqueue, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline); diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index df4e0aca75..ef1598c711 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -41,21 +41,6 @@ #include <grpc/support/alloc.h> -#include "src/core/iomgr/fd_posix.h" - -struct grpc_workqueue { - gpr_refcount refs; - - gpr_mu mu; - grpc_iomgr_closure head; - grpc_iomgr_closure *tail; - - grpc_wakeup_fd wakeup_fd; - grpc_fd *wakeup_read_fd; - - grpc_iomgr_closure read_closure; -}; - static void on_readable(void *arg, int success); grpc_workqueue *grpc_workqueue_create(void) { diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index ba455522ff..1b3a0e281b 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -34,4 +34,19 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H +struct grpc_fd; + +struct grpc_workqueue { + gpr_refcount refs; + + gpr_mu mu; + grpc_iomgr_closure head; + grpc_iomgr_closure *tail; + + grpc_wakeup_fd wakeup_fd; + struct grpc_fd *wakeup_read_fd; + + grpc_iomgr_closure read_closure; +}; + #endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 06ffa8dd6e..d141260421 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -57,6 +57,7 @@ typedef struct { gpr_refcount refs; grpc_channel_security_connector *security_connector; + grpc_workqueue *workqueue; grpc_iomgr_closure *notify; grpc_connect_in_args args; @@ -71,6 +72,7 @@ static void connector_ref(grpc_connector *con) { static void connector_unref(grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { + grpc_workqueue_unref(c->workqueue); gpr_free(c); } } @@ -122,8 +124,8 @@ static void connector_connect(grpc_connector *con, c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, - args->addr_len, args->deadline); + grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue, + args->addr, args->addr_len, args->deadline); } static const grpc_connector_vtable connector_vtable = { @@ -165,6 +167,8 @@ static grpc_subchannel *subchannel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; + c->workqueue = grpc_channel_get_workqueue(f->master); + grpc_workqueue_ref(c->workqueue); gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; @@ -240,7 +244,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, f->merge_args = grpc_channel_args_copy(args_copy); f->master = channel; GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); - resolver = grpc_resolver_create(target, &f->base); + resolver = grpc_resolver_create(target, &f->base, + grpc_channel_get_workqueue(channel)); if (!resolver) { return NULL; } |