diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-06-02 16:55:54 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-06-02 16:55:54 -0700 |
commit | a4b89fed1c801cdb31a154c7e1c09e5650898e8a (patch) | |
tree | d0a00563db8df1da5d69dee362664a85112f20cb /src/core/iomgr | |
parent | aaa03d74ceae4d48e31f4ba0619e359acd21505a (diff) | |
parent | c86b1dd59d1e61b9377f2dca0741061b2e5de1da (diff) |
Merge github.com:grpc/grpc into you-complete-me
Conflicts:
src/core/iomgr/resolve_address_posix.c
src/core/surface/server.c
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/endpoint_pair.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_posix.c | 17 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_windows.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 60 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 16 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 125 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.h | 37 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_internal.h | 14 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 10 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c | 14 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_windows.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.c | 17 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.h | 9 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 22 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_windows.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 23 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 5 |
19 files changed, 258 insertions, 144 deletions
diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h index dffbd36d4c..25087be0c7 100644 --- a/src/core/iomgr/endpoint_pair.h +++ b/src/core/iomgr/endpoint_pair.h @@ -41,6 +41,7 @@ typedef struct { grpc_endpoint *server; } grpc_endpoint_pair; -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size); +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + size_t read_slice_size); #endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index ac511b97b2..9b3b63f1e7 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -44,6 +44,8 @@ #include <sys/socket.h> #include "src/core/iomgr/tcp_posix.h" +#include "src/core/support/string.h" +#include <grpc/support/alloc.h> #include <grpc/support/log.h> static void create_sockets(int sv[2]) { @@ -55,12 +57,21 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + size_t read_slice_size) { int sv[2]; grpc_endpoint_pair p; + char *final_name; create_sockets(sv); - p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size); - p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size); + + gpr_asprintf(&final_name, "%s:client", name); + p.client = + grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size); + gpr_free(final_name); + gpr_asprintf(&final_name, "%s:server", name); + p.server = + grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size); + gpr_free(final_name); return p; } diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c index 988d622d01..c6790b2937 100644 --- a/src/core/iomgr/endpoint_pair_windows.c +++ b/src/core/iomgr/endpoint_pair_windows.c @@ -77,12 +77,12 @@ static void create_sockets(SOCKET sv[2]) { sv[0] = svr_sock; } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read_slice_size) { SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - p.client = grpc_tcp_create(grpc_winsocket_create(sv[1])); - p.server = grpc_tcp_create(grpc_winsocket_create(sv[0])); + p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client")); + p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server")); return p; } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index b697fcc64a..28ed7708f7 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -41,7 +41,6 @@ #include <sys/socket.h> #include <unistd.h> -#include "src/core/iomgr/iomgr_internal.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -91,6 +90,7 @@ static grpc_fd *alloc_fd(int fd) { gpr_mu_init(&r->set_state_mu); gpr_mu_init(&r->watcher_mu); } + gpr_atm_rel_store(&r->refst, 1); gpr_atm_rel_store(&r->readst, NOT_READY); gpr_atm_rel_store(&r->writest, NOT_READY); @@ -116,10 +116,9 @@ static void ref_by(grpc_fd *fd, int n) { static void unref_by(grpc_fd *fd, int n) { gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - close(fd->fd); - grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); + grpc_iomgr_add_callback(&fd->on_done_closure); freelist_fd(fd); - grpc_iomgr_unref(); + grpc_iomgr_unregister_object(&fd->iomgr_object); } else { GPR_ASSERT(old > n); } @@ -138,9 +137,9 @@ void grpc_fd_global_shutdown(void) { static void do_nothing(void *ignored, int success) {} -grpc_fd *grpc_fd_create(int fd) { +grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); - grpc_iomgr_ref(); + grpc_iomgr_register_object(&r->iomgr_object, name); grpc_pollset_add_fd(grpc_backup_pollset(), r); return r; } @@ -180,8 +179,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) { } void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { - fd->on_done = on_done ? on_done : do_nothing; - fd->on_done_user_data = user_data; + grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing, + user_data); shutdown(fd->fd, SHUT_RDWR); ref_by(fd, 1); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); @@ -195,21 +194,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } -static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, +static void process_callback(grpc_iomgr_closure *closure, int success, int allow_synchronous_callback) { if (allow_synchronous_callback) { - cb(arg, success); + closure->cb(closure->cb_arg, success); } else { - grpc_iomgr_add_delayed_callback(cb, arg, success); + grpc_iomgr_add_delayed_callback(closure, success); } } -static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success, - int allow_synchronous_callback) { +static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, + int success, int allow_synchronous_callback) { size_t i; for (i = 0; i < n; i++) { - make_callback(callbacks[i].cb, callbacks[i].cb_arg, success, - allow_synchronous_callback); + process_callback(callbacks + i, success, allow_synchronous_callback); } } @@ -234,10 +232,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, /* swap was unsuccessful due to an intervening set_ready call. Fall through to the READY code below */ case READY: - assert(gpr_atm_no_barrier_load(st) == READY); + GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); - make_callback(closure->cb, closure->cb_arg, - !gpr_atm_acq_load(&fd->shutdown), + process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), allow_synchronous_callback); return; default: /* WAITING */ @@ -251,7 +248,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, abort(); } -static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks, +static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks, size_t *ncallbacks) { gpr_intptr state = gpr_atm_acq_load(st); @@ -269,9 +266,9 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks, Fall through to the WAITING code below */ state = gpr_atm_acq_load(st); default: /* waiting */ - assert(gpr_atm_no_barrier_load(st) != READY && - gpr_atm_no_barrier_load(st) != NOT_READY); - callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state; + GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY && + gpr_atm_no_barrier_load(st) != NOT_READY); + callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state; gpr_atm_rel_store(st, NOT_READY); return; } @@ -282,25 +279,30 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure cb; + grpc_iomgr_closure* closure; size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); - set_ready_locked(st, &cb, &ncb); + set_ready_locked(st, &closure, &ncb); gpr_mu_unlock(&fd->set_state_mu); success = !gpr_atm_acq_load(&fd->shutdown); - make_callbacks(&cb, ncb, success, allow_synchronous_callback); + GPR_ASSERT(ncb <= 1); + if (ncb > 0) { + process_callbacks(closure, ncb, success, allow_synchronous_callback); + } } void grpc_fd_shutdown(grpc_fd *fd) { - grpc_iomgr_closure cb[2]; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(&fd->readst, cb, &ncb); - set_ready_locked(&fd->writest, cb, &ncb); + set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb); + set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb); gpr_mu_unlock(&fd->set_state_mu); - make_callbacks(cb, ncb, 0, 0); + GPR_ASSERT(ncb <= 2); + process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */, + 0 /* GPR_FALSE */); } void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index cfc533b7f5..0fa71850e3 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -34,17 +34,12 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H -#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset.h" #include <grpc/support/atm.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> -typedef struct { - grpc_iomgr_cb_func cb; - void *cb_arg; -} grpc_iomgr_closure; - typedef struct grpc_fd grpc_fd; typedef struct grpc_fd_watcher { @@ -96,15 +91,18 @@ struct grpc_fd { gpr_atm readst; gpr_atm writest; - grpc_iomgr_cb_func on_done; - void *on_done_user_data; struct grpc_fd *freelist_next; + + grpc_iomgr_closure on_done_closure; + grpc_iomgr_closure *shutdown_closures[2]; + + grpc_iomgr_object iomgr_object; }; /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. This takes ownership of closing fd. */ -grpc_fd *grpc_fd_create(int fd); +grpc_fd *grpc_fd_create(int fd, const char *name); /* Releases fd to be asynchronously destroyed. on_done is called when the underlying file descriptor is definitely close()d. diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index d22542fc91..249228a214 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -37,25 +37,19 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/alarm_internal.h" +#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/sync.h> -typedef struct delayed_callback { - grpc_iomgr_cb_func cb; - void *cb_arg; - int success; - struct delayed_callback *next; -} delayed_callback; - static gpr_mu g_mu; static gpr_cv g_rcv; -static delayed_callback *g_cbs_head = NULL; -static delayed_callback *g_cbs_tail = NULL; +static grpc_iomgr_closure *g_cbs_head = NULL; +static grpc_iomgr_closure *g_cbs_tail = NULL; static int g_shutdown; -static int g_refs; static gpr_event g_background_callback_executor_done; +static grpc_iomgr_object g_root_object; /* Execute followup callbacks continuously. Other threads may check in and help during pollset_work() */ @@ -66,12 +60,11 @@ static void background_callback_executor(void *ignored) { gpr_timespec short_deadline = gpr_time_add(gpr_now(), gpr_time_from_millis(100)); if (g_cbs_head) { - delayed_callback *cb = g_cbs_head; - g_cbs_head = cb->next; + grpc_iomgr_closure *closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - cb->cb(cb->cb_arg, cb->success); - gpr_free(cb); + closure->cb(closure->cb_arg, closure->success); gpr_mu_lock(&g_mu); } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { } else { @@ -96,34 +89,48 @@ void grpc_iomgr_init(void) { gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_alarm_list_init(gpr_now()); - g_refs = 0; + g_root_object.next = g_root_object.prev = &g_root_object; + g_root_object.name = "root"; grpc_iomgr_platform_init(); gpr_event_init(&g_background_callback_executor_done); gpr_thd_new(&id, background_callback_executor, NULL, NULL); } +static size_t count_objects(void) { + grpc_iomgr_object *obj; + size_t n = 0; + for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) { + n++; + } + return n; +} + void grpc_iomgr_shutdown(void) { - delayed_callback *cb; + grpc_iomgr_object *obj; + grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); gpr_mu_lock(&g_mu); g_shutdown = 1; - while (g_cbs_head || g_refs) { - gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, + while (g_cbs_head || g_root_object.next != &g_root_object) { + size_t nobjs = count_objects(); + gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", nobjs, g_cbs_head ? " and executing final callbacks" : ""); - while (g_cbs_head) { - cb = g_cbs_head; - g_cbs_head = cb->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - - cb->cb(cb->cb_arg, 0); - gpr_free(cb); - gpr_mu_lock(&g_mu); + if (g_cbs_head) { + do { + closure = g_cbs_head; + g_cbs_head = closure->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + + closure->cb(closure->cb_arg, 0); + gpr_mu_lock(&g_mu); + } while (g_cbs_head); + continue; } - if (g_refs) { + if (nobjs > 0) { int timeout = 0; gpr_timespec short_deadline = gpr_time_add(gpr_now(), gpr_time_from_millis(100)); @@ -137,7 +144,10 @@ void grpc_iomgr_shutdown(void) { gpr_log(GPR_DEBUG, "Failed to free %d iomgr objects before shutdown deadline: " "memory leaks are likely", - g_refs); + count_objects()); + for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) { + gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s", obj->name); + } break; } } @@ -153,56 +163,66 @@ void grpc_iomgr_shutdown(void) { gpr_cv_destroy(&g_rcv); } -void grpc_iomgr_ref(void) { +void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) { + obj->name = gpr_strdup(name); gpr_mu_lock(&g_mu); - ++g_refs; + obj->next = &g_root_object; + obj->prev = obj->next->prev; + obj->next->prev = obj->prev->next = obj; gpr_mu_unlock(&g_mu); } -void grpc_iomgr_unref(void) { +void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { + gpr_free(obj->name); gpr_mu_lock(&g_mu); - if (0 == --g_refs) { - gpr_cv_signal(&g_rcv); - } + obj->next->prev = obj->prev; + obj->prev->next = obj->next; + gpr_cv_signal(&g_rcv); gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, - int success) { - delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback)); - dcb->cb = cb; - dcb->cb_arg = cb_arg; - dcb->success = success; + +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg) { + closure->cb = cb; + closure->cb_arg = cb_arg; + closure->next = NULL; +} + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { + closure->success = success; gpr_mu_lock(&g_mu); - dcb->next = NULL; + closure->next = NULL; if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = dcb; + g_cbs_head = g_cbs_tail = closure; } else { - g_cbs_tail->next = dcb; - g_cbs_tail = dcb; + g_cbs_tail->next = closure; + g_cbs_tail = closure; } gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { - grpc_iomgr_add_delayed_callback(cb, cb_arg, 1); + +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { + grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */); } + int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { int n = 0; gpr_mu *retake_mu = NULL; - delayed_callback *cb; + grpc_iomgr_closure *closure; for (;;) { /* check for new work */ if (!gpr_mu_trylock(&g_mu)) { break; } - cb = g_cbs_head; - if (!cb) { + closure = g_cbs_head; + if (!closure) { gpr_mu_unlock(&g_mu); break; } - g_cbs_head = cb->next; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); /* if we have a mutex to drop, do so before executing work */ @@ -211,8 +231,7 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { retake_mu = drop_mu; drop_mu = NULL; } - cb->cb(cb->cb_arg, success && cb->success); - gpr_free(cb); + closure->cb(closure->cb_arg, success && closure->success); n++; } if (retake_mu) { diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 1f5d23fdda..a10e481e48 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -34,14 +34,43 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_H #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_H -/* gRPC Callback definition */ +/** gRPC Callback definition. + * + * \param arg Arbitrary input. + * \param success An indication on the state of the iomgr. On false, cleanup + * actions should be taken (eg, shutdown). */ typedef void (*grpc_iomgr_cb_func)(void *arg, int success); +/** A closure over a grpc_iomgr_cb_func. */ +typedef struct grpc_iomgr_closure { + /** Bound callback. */ + grpc_iomgr_cb_func cb; + + /** Arguments to be passed to "cb". */ + void *cb_arg; + + /** Internal. A boolean indication to "cb" on the state of the iomgr. + * For instance, closures created during a shutdown would have this field set + * to false. */ + int success; + + /**< Internal. Do not touch */ + struct grpc_iomgr_closure *next; +} grpc_iomgr_closure; + +/** Initializes \a closure with \a cb and \a cb_arg. */ +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg); + +/** Initializes the iomgr. */ void grpc_iomgr_init(void); + +/** Signals the intention to shutdown the iomgr. */ void grpc_iomgr_shutdown(void); -/* This function is called from within a callback or from anywhere else - and causes the invocation of a callback at some point in the future */ -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg); +/** Registers a closure to be invoked at some point in the future. + * + * Can be called from within a callback or from anywhere else */ +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index 07923258b9..6c1e0e1799 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -35,15 +35,19 @@ #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/iomgr_internal.h" #include <grpc/support/sync.h> +typedef struct grpc_iomgr_object { + char *name; + struct grpc_iomgr_object *next; + struct grpc_iomgr_object *prev; +} grpc_iomgr_object; + int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); -void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, - int success); +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); -void grpc_iomgr_ref(void); -void grpc_iomgr_unref(void); +void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); +void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); void grpc_iomgr_platform_init(void); void grpc_iomgr_platform_shutdown(void); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 6da7daabc8..d2f615271e 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -272,6 +272,7 @@ typedef struct grpc_unary_promote_args { const grpc_pollset_vtable *original_vtable; grpc_pollset *pollset; grpc_fd *fd; + grpc_iomgr_closure promotion_closure; } grpc_unary_promote_args; static void unary_poll_do_promote(void *args, int success) { @@ -294,7 +295,7 @@ static void unary_poll_do_promote(void *args, int success) { /* First we need to ensure that nobody is polling concurrently */ while (pollset->counter != 0) { grpc_pollset_kick(pollset); - grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + grpc_iomgr_add_callback(&up_args->promotion_closure); gpr_mu_unlock(&pollset->mu); return; } @@ -378,7 +379,9 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { up_args->pollset = pollset; up_args->fd = fd; up_args->original_vtable = pollset->vtable; - grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + up_args->promotion_closure.cb = unary_poll_do_promote; + up_args->promotion_closure.cb_arg = up_args; + grpc_iomgr_add_callback(&up_args->promotion_closure); grpc_pollset_kick(pollset); } diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 5af0685f9d..b1f4c09a2c 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -66,15 +66,15 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { gpr_timespec now; now = gpr_now(); if (gpr_time_cmp(now, deadline) > 0) { - return 0; + return 0 /* GPR_FALSE */; } - if (grpc_maybe_call_delayed_callbacks(NULL, 1)) { - return 1; + if (grpc_maybe_call_delayed_callbacks(NULL, 1 /* GPR_TRUE */)) { + return 1 /* GPR_TRUE */; } if (grpc_alarm_check(NULL, now, &deadline)) { - return 1; + return 1 /* GPR_TRUE */; } - return 0; + return 0 /* GPR_FALSE */; } void grpc_pollset_kick(grpc_pollset *p) { } diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index 43fd704a6d..fcf48fe0d7 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -55,6 +55,7 @@ typedef struct { char *default_port; grpc_resolve_cb cb; void *arg; + grpc_iomgr_object iomgr_object; } request; grpc_resolved_addresses *grpc_blocking_resolve_address( @@ -153,9 +154,9 @@ static void do_request(void *rp) { grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); + grpc_iomgr_unregister_object(&r->iomgr_object); gpr_free(r); cb(arg, resolved); - grpc_iomgr_unref(); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -166,14 +167,17 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); - /*gpr_thd_id id;*/ - grpc_iomgr_ref(); + gpr_thd_id id; + char *tmp; + gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name, + default_port); + grpc_iomgr_register_object(&r->iomgr_object, tmp); + gpr_free(tmp); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; r->arg = arg; - /*gpr_thd_new(&id, do_request, r, NULL);*/ - do_request(r); + gpr_thd_new(&id, do_request, r, NULL); } #endif diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 9b416dfe8a..7d0d2f9e7a 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -54,6 +54,7 @@ typedef struct { char *default_port; grpc_resolve_cb cb; void *arg; + grpc_iomgr_object iomgr_object; } request; grpc_resolved_addresses *grpc_blocking_resolve_address( @@ -135,7 +136,7 @@ static void do_request(void *rp) { gpr_free(r->default_port); gpr_free(r); cb(arg, resolved); - grpc_iomgr_unref(); + grpc_iomgr_unregister_object(&r->iomgr_object); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -147,7 +148,10 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); gpr_thd_id id; - grpc_iomgr_ref(); + const char *label; + gpr_asprintf(&label, "resolve:%s", name); + grpc_iomgr_register_object(&r->iomgr_object, label); + gpr_free(label); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index ee5150a696..e4ba0a2b66 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -39,18 +39,17 @@ #include <grpc/support/log.h> #include "src/core/iomgr/iocp_windows.h" -#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" #include "src/core/iomgr/socket_windows.h" -grpc_winsocket *grpc_winsocket_create(SOCKET socket) { +grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) { grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket)); memset(r, 0, sizeof(grpc_winsocket)); r->socket = socket; gpr_mu_init(&r->state_mu); - grpc_iomgr_ref(); + grpc_iomgr_register_object(&r->iomgr_object, name); grpc_iocp_add_socket(r); return r; } @@ -64,13 +63,15 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) { gpr_mu_lock(&socket->state_mu); if (socket->read_info.cb) { callbacks_set++; - grpc_iomgr_add_delayed_callback(socket->read_info.cb, - socket->read_info.opaque, 0); + grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb, + socket->read_info.opaque); + grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); } if (socket->write_info.cb) { callbacks_set++; - grpc_iomgr_add_delayed_callback(socket->write_info.cb, - socket->write_info.opaque, 0); + grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb, + socket->write_info.opaque); + grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); } gpr_mu_unlock(&socket->state_mu); return callbacks_set; @@ -90,7 +91,7 @@ void grpc_winsocket_orphan(grpc_winsocket *winsocket) { grpc_winsocket_destroy(winsocket); } closesocket(socket); - grpc_iomgr_unref(); + grpc_iomgr_unregister_object(&winsocket->iomgr_object); } void grpc_winsocket_destroy(grpc_winsocket *winsocket) { diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index b27eb14219..7080919af0 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -39,6 +39,8 @@ #include <grpc/support/sync.h> #include <grpc/support/atm.h> +#include "src/core/iomgr/iomgr_internal.h" + /* This holds the data for an outstanding read or write on a socket. The mutex to protect the concurrent access to that data is the one inside the winsocket wrapper. */ @@ -93,11 +95,16 @@ typedef struct grpc_winsocket { there is a pending operation that the IO Completion Port will have to wait for. The socket will be collected at that time. */ int orphan; + + grpc_iomgr_closure shutdown_closure; + + /* A label for iomgr to track outstanding objects */ + grpc_iomgr_object iomgr_object; } grpc_winsocket; /* Create a wrapped windows handle. This takes ownership of it, meaning that it will be responsible for closing it. */ -grpc_winsocket *grpc_winsocket_create(SOCKET socket); +grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name); /* Initiate an asynchronous shutdown of the socket. Will call off any pending operation to cancel them. Returns the number of callbacks that got setup. */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 2401fe00e4..aa21ba9b9e 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -48,6 +48,7 @@ #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -185,6 +186,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), async_connect *ac; struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in addr4_copy; + char *name; + char *addr_str; /* Use dualstack sockets where available. */ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { @@ -211,24 +214,27 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), err = connect(fd, addr, addr_len); } while (err < 0 && errno == EINTR); + grpc_sockaddr_to_string(&addr_str, addr, 1); + gpr_asprintf(&name, "tcp-client:%s", addr_str); + if (err >= 0) { gpr_log(GPR_DEBUG, "instant connect"); - cb(arg, - grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); - return; + cb(arg, grpc_tcp_create(grpc_fd_create(fd, name), + GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - gpr_log(GPR_ERROR, "connect error: %s", strerror(errno)); + gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); close(fd); cb(arg, NULL); - return; + goto done; } ac = gpr_malloc(sizeof(async_connect)); ac->cb = cb; ac->cb_arg = arg; - ac->fd = grpc_fd_create(fd); + ac->fd = grpc_fd_create(fd, name); gpr_mu_init(&ac->mu); ac->refs = 2; ac->write_closure.cb = on_writable; @@ -236,6 +242,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); + +done: + gpr_free(name); + gpr_free(addr_str); } #endif diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index cf5174327d..2a040ffc4a 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -193,7 +193,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), goto failure; } - socket = grpc_winsocket_create(sock); + socket = grpc_winsocket_create(sock, "client"); info = &socket->write_info; info->outstanding = 1; success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index cd6b2ecae6..2f19f9d442 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -280,6 +280,8 @@ typedef struct { grpc_iomgr_closure read_closure; grpc_iomgr_closure write_closure; + + grpc_iomgr_closure handle_read_closure; } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); @@ -443,7 +445,8 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, tcp->finished_edge = 0; grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { - grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp); + tcp->handle_read_closure.cb_arg = tcp; + grpc_iomgr_add_callback(&tcp->handle_read_closure); } } @@ -592,6 +595,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = grpc_tcp_handle_write; tcp->write_closure.cb_arg = tcp; + + tcp->handle_read_closure.cb = grpc_tcp_handle_read; return &tcp->base; } diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index d1cd8a769c..3cd40faafc 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -60,6 +60,7 @@ #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -281,6 +282,8 @@ static void on_read(void *arg, int success) { for (;;) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); + char *addr_str; + char *name; /* Note: If we ever decide to return this address to the user, remember to strip off the ::ffff:0.0.0.0/96 prefix first. */ int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1); @@ -299,9 +302,15 @@ static void on_read(void *arg, int success) { grpc_set_socket_no_sigpipe_if_possible(fd); - sp->server->cb( - sp->server->cb_arg, - grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); + + sp->server->cb(sp->server->cb_arg, + grpc_tcp_create(grpc_fd_create(fd, name), + GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + + gpr_free(addr_str); + gpr_free(name); } abort(); @@ -318,9 +327,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, const struct sockaddr *addr, int addr_len) { server_port *sp; int port; + char *addr_str; + char *name; port = prepare_socket(fd, addr, addr_len); if (port >= 0) { + grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); GPR_ASSERT(!s->cb && "must add ports before starting server"); /* append it to the list under a lock */ @@ -331,11 +344,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd); + sp->emfd = grpc_fd_create(fd, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); + gpr_free(addr_str); + gpr_free(name); } return port; diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index d22acc7453..9ef369dfd8 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -270,7 +270,8 @@ static void on_accept(void *arg, int from_iocp) { gpr_free(utf8_message); closesocket(sock); } else { - ep = grpc_tcp_create(grpc_winsocket_create(sock)); + /* TODO(ctiller): add sockaddr address to label */ + ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); } } else { /* If we're not notified from the IOCP, it means we are asked to shutdown. @@ -336,7 +337,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, } sp = &s->ports[s->nports++]; sp->server = s; - sp->socket = grpc_winsocket_create(sock); + sp->socket = grpc_winsocket_create(sock, "listener"); sp->shutting_down = 0; sp->AcceptEx = AcceptEx; sp->new_socket = INVALID_SOCKET; |