diff options
Diffstat (limited to 'src/core/iomgr')
29 files changed, 239 insertions, 141 deletions
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index 9b3b63f1e7..fa2d2555d6 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -47,6 +47,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> static void create_sockets(int sv[2]) { int flags; diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 2ac1866a66..632d2a4609 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -74,6 +74,7 @@ static void freelist_fd(grpc_fd *fd) { gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; + grpc_iomgr_unregister_object(&fd->iomgr_object); gpr_mu_unlock(&fd_freelist_mu); } @@ -100,6 +101,7 @@ static grpc_fd *alloc_fd(int fd) { &r->inactive_watcher_root; r->freelist_next = NULL; r->read_watcher = r->write_watcher = NULL; + r->on_done_closure = NULL; return r; } @@ -112,7 +114,8 @@ static void destroy(grpc_fd *fd) { #ifdef GRPC_FD_REF_COUNT_DEBUG #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) -static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { +static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, + int line) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); @@ -125,7 +128,8 @@ static void ref_by(grpc_fd *fd, int n) { } #ifdef GRPC_FD_REF_COUNT_DEBUG -static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { +static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, + int line) { gpr_atm old; gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -136,10 +140,6 @@ static void unref_by(grpc_fd *fd, int n) { #endif old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - if (fd->on_done_closure) { - grpc_iomgr_add_callback(fd->on_done_closure); - } - grpc_iomgr_unregister_object(&fd->iomgr_object); freelist_fd(fd); } else { GPR_ASSERT(old > n); @@ -197,13 +197,24 @@ static void wake_all_watchers_locked(grpc_fd *fd) { } } +static int has_watchers(grpc_fd *fd) { + return fd->read_watcher != NULL || fd->write_watcher != NULL || fd->inactive_watcher_root.next != &fd->inactive_watcher_root; +} + void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, const char *reason) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); - wake_all_watchers_locked(fd); + if (!has_watchers(fd)) { + close(fd->fd); + if (fd->on_done_closure) { + grpc_iomgr_add_callback(fd->on_done_closure); + } + } else { + wake_all_watchers_locked(fd); + } gpr_mu_unlock(&fd->watcher_mu); UNREF_BY(fd, 2, reason); /* drop the reference */ } @@ -225,7 +236,7 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif static void process_callback(grpc_iomgr_closure *closure, int success, - int allow_synchronous_callback) { + int allow_synchronous_callback) { if (allow_synchronous_callback) { closure->cb(closure->cb_arg, success); } else { @@ -265,7 +276,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback); + allow_synchronous_callback); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -309,7 +320,7 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure* closure; + grpc_iomgr_closure *closure; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); @@ -352,14 +363,22 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, GRPC_FD_REF(fd, "poll"); gpr_mu_lock(&fd->watcher_mu); + /* if we are shutdown, then don't add to the watcher set */ + if (gpr_atm_no_barrier_load(&fd->shutdown)) { + watcher->fd = NULL; + watcher->pollset = NULL; + gpr_mu_unlock(&fd->watcher_mu); + GRPC_FD_UNREF(fd, "poll"); + return 0; + } /* if there is nobody polling for read, but we need to, then start doing so */ - if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { + if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { fd->read_watcher = watcher; mask |= read_mask; } /* if there is nobody polling for write, but we need to, then start doing so */ - if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { + if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { fd->write_watcher = watcher; mask |= write_mask; } @@ -381,6 +400,10 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { int kick = 0; grpc_fd *fd = watcher->fd; + if (fd == NULL) { + return; + } + gpr_mu_lock(&fd->watcher_mu); if (watcher == fd->read_watcher) { /* remove read watcher, kick if we still need a read */ @@ -402,6 +425,12 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { if (kick) { maybe_wake_one_watcher_locked(fd); } + if (grpc_fd_is_orphaned(fd) && !has_watchers(fd)) { + close(fd->fd); + if (fd->on_done_closure != NULL) { + grpc_iomgr_add_callback(fd->on_done_closure); + } + } gpr_mu_unlock(&fd->watcher_mu); GRPC_FD_UNREF(fd, "poll"); diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 523d040d17..94d0019fa4 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -62,12 +62,12 @@ struct grpc_fd { gpr_atm shutdown; /* The watcher list. - + The following watcher related fields are protected by watcher_mu. - + An fd_watcher is an ephemeral object created when an fd wants to begin polling, and destroyed after the poll. - + It denotes the fd's interest in whether to read poll or write poll or both or neither on this fd. @@ -175,4 +175,4 @@ void grpc_fd_unref(grpc_fd *fd); void grpc_fd_global_init(void); void grpc_fd_global_shutdown(void); -#endif /* GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index c6c44658df..c47528aa94 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -40,8 +40,9 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/thd.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> +#include <grpc/support/thd.h> static gpr_mu g_mu; static gpr_cv g_rcv; @@ -143,8 +144,8 @@ void grpc_iomgr_shutdown(void) { } if (g_root_object.next != &g_root_object) { int timeout = 0; - gpr_timespec short_deadline = gpr_time_add(gpr_now(), - gpr_time_from_millis(100)); + gpr_timespec short_deadline = + gpr_time_add(gpr_now(), gpr_time_from_millis(100)); while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { if (gpr_time_cmp(gpr_now(), shutdown_deadline) > 0) { timeout = 1; @@ -193,7 +194,6 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_mu_unlock(&g_mu); } - void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; @@ -217,12 +217,10 @@ void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { gpr_mu_unlock(&g_mu); } - void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */); } - int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { int n = 0; gpr_mu *retake_mu = NULL; diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index a10e481e48..6d4a82917b 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -73,4 +73,8 @@ void grpc_iomgr_shutdown(void); * Can be called from within a callback or from anywhere else */ void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); +/** As per grpc_iomgr_add_callback, with the ability to set the success + argument. */ +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); + #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 334e0ebde1..c40188b3c9 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -58,16 +58,18 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, void *shutdown_done_arg); void grpc_pollset_destroy(grpc_pollset *pollset); - /* Do some work on a pollset. May involve invoking asynchronous callbacks, or actually polling file descriptors. Requires GRPC_POLLSET_MU(pollset) locked. - May unlock GRPC_POLLSET_MU(pollset) during its execution. */ + May unlock GRPC_POLLSET_MU(pollset) during its execution. + + Returns true if some work has been done, and false if the deadline + got attained. */ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline); -/* Break a pollset out of polling work +/* Break one polling thread out of polling work for this pollset. Requires GRPC_POLLSET_MU(pollset) locked. */ void grpc_pollset_kick(grpc_pollset *pollset); -#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c index 21c9bb6542..51021784f2 100644 --- a/src/core/iomgr/pollset_kick_posix.c +++ b/src/core/iomgr/pollset_kick_posix.c @@ -73,7 +73,7 @@ static grpc_kick_fd_info *allocate_wfd(void) { return info; } -static void destroy_wfd(grpc_kick_fd_info* wfd) { +static void destroy_wfd(grpc_kick_fd_info *wfd) { grpc_wakeup_fd_destroy(&wfd->wakeup_fd); gpr_free(wfd); } @@ -104,7 +104,8 @@ void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) { GPR_ASSERT(kick_state->fd_list.next == &kick_state->fd_list); } -grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) { +grpc_kick_fd_info *grpc_pollset_kick_pre_poll( + grpc_pollset_kick_state *kick_state) { grpc_kick_fd_info *fd_info; gpr_mu_lock(&kick_state->mu); if (kick_state->kicked) { @@ -120,11 +121,13 @@ grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_stat return fd_info; } -void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info) { +void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, + grpc_kick_fd_info *fd_info) { grpc_wakeup_fd_consume_wakeup(&fd_info->wakeup_fd); } -void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info) { +void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, + grpc_kick_fd_info *fd_info) { gpr_mu_lock(&kick_state->mu); fd_info->next->prev = fd_info->prev; fd_info->prev->next = fd_info->next; @@ -162,5 +165,4 @@ void grpc_pollset_kick_global_destroy(void) { gpr_mu_destroy(&fd_freelist_mu); } - -#endif /* GPR_POSIX_SOCKET */ +#endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h index b35c2cfbe0..77e32a8d51 100644 --- a/src/core/iomgr/pollset_kick_posix.h +++ b/src/core/iomgr/pollset_kick_posix.h @@ -37,6 +37,11 @@ #include "src/core/iomgr/wakeup_fd_posix.h" #include <grpc/support/sync.h> +/* pollset kicking allows breaking a thread out of polling work for + a given pollset. + writing a byte to a pipe is used as a posix-ly portable base + mechanism, and eventfds are utilized on Linux for better performance. */ + typedef struct grpc_kick_fd_info { grpc_wakeup_fd_info wakeup_fd; /* used for polling list and free list */ @@ -51,7 +56,8 @@ typedef struct grpc_pollset_kick_state { struct grpc_kick_fd_info fd_list; } grpc_pollset_kick_state; -#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd) +#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) \ + GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd) /* This is an abstraction around the typical pipe mechanism for waking up a thread sitting in a poll() style call. */ @@ -66,18 +72,22 @@ void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state); * applicable. Intended for testing. */ void grpc_pollset_kick_global_init_fallback_fd(void); -/* Must be called before entering poll(). If return value is -1, this consumed +/* Must be called before entering poll(). If return value is NULL, this consumed an existing kick. Otherwise the return value is an FD to add to the poll set. */ -grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state); +grpc_kick_fd_info *grpc_pollset_kick_pre_poll( + grpc_pollset_kick_state *kick_state); /* Consume an existing kick. Must be called after poll returns that the fd was readable, and before calling kick_post_poll. */ -void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info); +void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, + grpc_kick_fd_info *fd_info); /* Must be called after pre_poll, and after consume if applicable */ -void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info); +void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, + grpc_kick_fd_info *fd_info); +/* Actually kick */ void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state); -#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */ diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 05b522bbf2..1900bbf9e1 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -54,17 +54,25 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; - - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - ev.data.ptr = fd; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); - if (err < 0) { - /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ - if (errno != EEXIST) { - gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, - strerror(errno)); + grpc_fd_watcher watcher; + + /* We pretend to be polling whilst adding an fd to keep the fd from being + closed during the add. This may result in a spurious wakeup being assigned + to this pollset whilst adding, but that should be benign. */ + GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0); + if (watcher.fd != NULL) { + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.data.ptr = fd; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (err < 0) { + /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ + if (errno != EEXIST) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, + strerror(errno)); + } } } + grpc_fd_end_poll(&watcher, 0, 0); } static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, @@ -83,7 +91,7 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, /* TODO(klempner): We probably want to turn this down a bit */ #define GRPC_EPOLL_MAX_EVENTS 1000 -static int multipoll_with_epoll_pollset_maybe_work( +static void multipoll_with_epoll_pollset_maybe_work( grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; @@ -97,14 +105,7 @@ static int multipoll_with_epoll_pollset_maybe_work( * here. */ - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout_ms = -1; - } else { - timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout_ms <= 0) { - return 1; - } - } + timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); pollset->counter += 1; gpr_mu_unlock(&pollset->mu); @@ -140,10 +141,10 @@ static int multipoll_with_epoll_pollset_maybe_work( gpr_mu_lock(&pollset->mu); pollset->counter -= 1; - return 1; } -static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset) {} +static void multipoll_with_epoll_pollset_finish_shutdown( + grpc_pollset *pollset) {} static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { pollset_hdr *h = pollset->data.ptr; @@ -158,9 +159,12 @@ static void epoll_kick(grpc_pollset *pollset) { } static const grpc_pollset_vtable multipoll_with_epoll_pollset = { - multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd, - multipoll_with_epoll_pollset_maybe_work, epoll_kick, - multipoll_with_epoll_pollset_finish_shutdown, multipoll_with_epoll_pollset_destroy}; + multipoll_with_epoll_pollset_add_fd, + multipoll_with_epoll_pollset_del_fd, + multipoll_with_epoll_pollset_maybe_work, + epoll_kick, + multipoll_with_epoll_pollset_finish_shutdown, + multipoll_with_epoll_pollset_destroy}; static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, size_t nfds) { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index afce5f65db..7b717bd159 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -103,7 +103,7 @@ static void end_polling(grpc_pollset *pollset) { } } -static int multipoll_with_poll_pollset_maybe_work( +static void multipoll_with_poll_pollset_maybe_work( grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback) { int timeout; @@ -113,14 +113,7 @@ static int multipoll_with_poll_pollset_maybe_work( grpc_kick_fd_info *kfd; h = pollset->data.ptr; - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { - return 1; - } - } + timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); if (h->pfd_capacity < h->fd_count + 1) { h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); gpr_free(h->pfds); @@ -133,7 +126,7 @@ static int multipoll_with_poll_pollset_maybe_work( kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state); if (kfd == NULL) { /* Already kicked */ - return 1; + return; } h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd); h->pfds[0].events = POLLIN; @@ -161,7 +154,7 @@ static int multipoll_with_poll_pollset_maybe_work( h->del_count = 0; if (h->pfd_count == 0) { end_polling(pollset); - return 0; + return; } pollset->counter++; gpr_mu_unlock(&pollset->mu); @@ -186,6 +179,9 @@ static int multipoll_with_poll_pollset_maybe_work( grpc_pollset_kick_consume(&pollset->kick_state, kfd); } for (i = 1; i < np; i++) { + if (h->watchers[i].fd == NULL) { + continue; + } if (h->pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback); } @@ -198,8 +194,6 @@ static int multipoll_with_poll_pollset_maybe_work( gpr_mu_lock(&pollset->mu); pollset->counter--; - - return 1; } static void multipoll_with_poll_pollset_kick(grpc_pollset *p) { @@ -231,9 +225,12 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable multipoll_with_poll_pollset = { - multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, - multipoll_with_poll_pollset_maybe_work, multipoll_with_poll_pollset_kick, - multipoll_with_poll_pollset_finish_shutdown, multipoll_with_poll_pollset_destroy}; + multipoll_with_poll_pollset_add_fd, + multipoll_with_poll_pollset_del_fd, + multipoll_with_poll_pollset_maybe_work, + multipoll_with_poll_pollset_kick, + multipoll_with_poll_pollset_finish_shutdown, + multipoll_with_poll_pollset_destroy}; void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, size_t nfds) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 796b1b9ebf..15ed8e75e6 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -99,6 +99,7 @@ void grpc_pollset_init(grpc_pollset *pollset) { grpc_pollset_kick_init(&pollset->kick_state); pollset->in_flight_cbs = 0; pollset->shutting_down = 0; + pollset->called_shutdown = 0; become_basic_pollset(pollset, NULL); } @@ -122,7 +123,6 @@ static void finish_shutdown(grpc_pollset *pollset) { int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { /* pollset->mu already held */ gpr_timespec now = gpr_now(); - int r; if (gpr_time_cmp(now, deadline) > 0) { return 0; } @@ -136,12 +136,13 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { return 1; } gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); - r = pollset->vtable->maybe_work(pollset, deadline, now, 1); + pollset->vtable->maybe_work(pollset, deadline, now, 1); gpr_tls_set(&g_current_thread_poller, 0); if (pollset->shutting_down) { if (pollset->counter > 0) { grpc_pollset_kick(pollset); - } else if (pollset->in_flight_cbs == 0) { + } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { + pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(pollset); /* Continuing to access pollset here is safe -- it is the caller's @@ -151,27 +152,29 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { gpr_mu_lock(&pollset->mu); } } - return r; + return 1; } void grpc_pollset_shutdown(grpc_pollset *pollset, void (*shutdown_done)(void *arg), void *shutdown_done_arg) { - int in_flight_cbs; - int counter; + int call_shutdown = 0; gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); pollset->shutting_down = 1; - in_flight_cbs = pollset->in_flight_cbs; - counter = pollset->counter; + if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && + pollset->counter == 0) { + pollset->called_shutdown = 1; + call_shutdown = 1; + } pollset->shutdown_done_cb = shutdown_done; pollset->shutdown_done_arg = shutdown_done_arg; - if (counter > 0) { + if (pollset->counter > 0) { grpc_pollset_kick(pollset); } gpr_mu_unlock(&pollset->mu); - if (in_flight_cbs == 0 && counter == 0) { + if (call_shutdown) { finish_shutdown(pollset); } } @@ -184,6 +187,22 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { + gpr_timespec timeout; + static const int max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + return -1; + } + if (gpr_time_cmp( + deadline, + gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis( + gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1))); +} + /* * basic_pollset - a vtable that provides polling for zero or one file * descriptor via poll() @@ -318,9 +337,9 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { } } -static int basic_pollset_maybe_work(grpc_pollset *pollset, - gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { +static void basic_pollset_maybe_work(grpc_pollset *pollset, + gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback) { struct pollfd pfd[2]; grpc_fd *fd; grpc_fd_watcher fd_watcher; @@ -333,25 +352,18 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset, /* Give do_promote priority so we don't starve it out */ gpr_mu_unlock(&pollset->mu); gpr_mu_lock(&pollset->mu); - return 1; + return; } fd = pollset->data.ptr; if (fd && grpc_fd_is_orphaned(fd)) { GRPC_FD_UNREF(fd, "basicpoll"); fd = pollset->data.ptr = NULL; } - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { - return 1; - } - } + timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state); if (kfd == NULL) { /* Already kicked */ - return 1; + return; } pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd); pfd[0].events = POLLIN; @@ -405,7 +417,6 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); pollset->counter--; - return 1; } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -417,7 +428,7 @@ static void basic_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, + basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy}; static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 2b897caa4b..53585a2886 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -56,6 +56,7 @@ typedef struct grpc_pollset { int counter; int in_flight_cbs; int shutting_down; + int called_shutdown; void (*shutdown_done_cb)(void *arg); void *shutdown_done_arg; union { @@ -67,8 +68,8 @@ typedef struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); - int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, - gpr_timespec now, int allow_synchronous_callback); + void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, + gpr_timespec now, int allow_synchronous_callback); void (*kick)(grpc_pollset *pollset); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); @@ -93,6 +94,15 @@ int grpc_kick_read_fd(grpc_pollset *p); /* Call after polling has been kicked to leave the kicked state */ void grpc_kick_drain(grpc_pollset *p); +/* Convert a timespec to milliseconds: + - very small or negative poll times are clamped to zero to do a + non-blocking poll (which becomes spin polling) + - other small values are rounded up to one millisecond + - longer than a millisecond polls are rounded up to the next nearest + millisecond to avoid spinning + - infinite timeouts are converted to -1 */ +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); + /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset, struct grpc_fd **fds, diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 335ffb21b9..98e3b552a7 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -39,7 +39,7 @@ /* A grpc_pollset_set is a set of pollsets that are interested in an action. Adding a pollset to a pollset_set automatically adds any fd's (etc) that have been registered with the set_set with that pollset. - Registering fd's automatically iterates all current pollsets. */ + Registering fd's automatically adds them to all current pollsets. */ #ifdef GPR_POSIX_SOCKET #include "src/core/iomgr/pollset_set_posix.h" diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index c00d6e6e57..005e938398 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -49,7 +49,11 @@ void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { } void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { + size_t i; gpr_mu_destroy(&pollset_set->mu); + for (i = 0; i < pollset_set->fd_count; i++) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + } gpr_free(pollset_set->pollsets); gpr_free(pollset_set->fds); } @@ -95,6 +99,7 @@ void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) { pollset_set->fds = gpr_realloc( pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); } + GRPC_FD_REF(fd, "pollset_set"); pollset_set->fds[pollset_set->fd_count++] = fd; for (i = 0; i < pollset_set->pollset_count; i++) { grpc_pollset_add_fd(pollset_set->pollsets[i], fd); @@ -110,6 +115,7 @@ void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) { pollset_set->fd_count--; GPR_SWAP(grpc_fd *, pollset_set->fds[i], pollset_set->fds[pollset_set->pollset_count]); + GRPC_FD_UNREF(fd, "pollset_set"); break; } } diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index d295c64b5e..b9c209cd2c 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -44,4 +44,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {} void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset) {} +void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, + grpc_pollset *pollset) {} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h index 0f91f1ede7..cada0d2b61 100644 --- a/src/core/iomgr/pollset_set_windows.h +++ b/src/core/iomgr/pollset_set_windows.h @@ -34,8 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H -typedef struct grpc_pollset_set { - void *unused; -} grpc_pollset_set; +typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set; #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 88db774cc4..8d6bc79c96 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -39,6 +39,7 @@ #include "src/core/iomgr/alarm_internal.h" #include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" /* There isn't really any such thing as a pollset under Windows, due to the @@ -47,17 +48,24 @@ won't actually do any polling, and return as quickly as possible. */ void grpc_pollset_init(grpc_pollset *pollset) { + memset(pollset, 0, sizeof(*pollset)); gpr_mu_init(&pollset->mu); + gpr_cv_init(&pollset->cv); } void grpc_pollset_shutdown(grpc_pollset *pollset, void (*shutdown_done)(void *arg), void *shutdown_done_arg) { + gpr_mu_lock(&pollset->mu); + pollset->shutting_down = 1; + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); shutdown_done(shutdown_done_arg); } void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); + gpr_cv_destroy(&pollset->cv); } int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { @@ -66,15 +74,20 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { if (gpr_time_cmp(now, deadline) > 0) { return 0 /* GPR_FALSE */; } - if (grpc_maybe_call_delayed_callbacks(NULL, 1 /* GPR_TRUE */)) { + if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) { return 1 /* GPR_TRUE */; } - if (grpc_alarm_check(NULL, now, &deadline)) { + if (grpc_alarm_check(&pollset->mu, now, &deadline)) { return 1 /* GPR_TRUE */; } - return 0 /* GPR_FALSE */; + if (!pollset->shutting_down) { + gpr_cv_wait(&pollset->cv, &pollset->mu, deadline); + } + return 1 /* GPR_TRUE */; } -void grpc_pollset_kick(grpc_pollset *p) { } +void grpc_pollset_kick(grpc_pollset *p) { + gpr_cv_signal(&p->cv); +} -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index acd82d0a0a..57a2907926 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -37,18 +37,18 @@ #include <windows.h> #include <grpc/support/sync.h> -#include "src/core/iomgr/pollset_kick.h" #include "src/core/iomgr/socket_windows.h" /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. A Windows "pollset" is merely a mutex - and a condition variable, as this is the minimal set of features we need - implemented for the rest of grpc. But we won't use them directly. */ + and a condition variable, used to synchronize with the IOCP. */ typedef struct grpc_pollset { gpr_mu mu; + gpr_cv cv; + int shutting_down; } grpc_pollset; #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) -#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index fcf48fe0d7..dbf884c769 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -47,6 +47,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> @@ -154,9 +155,9 @@ static void do_request(void *rp) { grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); + cb(arg, resolved); grpc_iomgr_unregister_object(&r->iomgr_object); gpr_free(r); - cb(arg, resolved); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 7d0d2f9e7a..fb5fd0d4f6 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -46,6 +46,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> @@ -134,9 +135,9 @@ static void do_request(void *rp) { grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); + grpc_iomgr_unregister_object(&r->iomgr_object); gpr_free(r); cb(arg, resolved); - grpc_iomgr_unregister_object(&r->iomgr_object); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index 3d202a5cc8..e91b94f8c8 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -40,6 +40,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> +#include <grpc/support/string_util.h> static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index e4ba0a2b66..fbf3fdc949 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -85,13 +85,13 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) { both memory and sockets. */ void grpc_winsocket_orphan(grpc_winsocket *winsocket) { SOCKET socket = winsocket->socket; + grpc_iomgr_unregister_object(&winsocket->iomgr_object); if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) { grpc_iocp_socket_orphan(winsocket); } else { grpc_winsocket_destroy(winsocket); } closesocket(socket); - grpc_iomgr_unregister_object(&winsocket->iomgr_object); } void grpc_winsocket_destroy(grpc_winsocket *winsocket) { diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index e1fdf235ec..0fa08b52b0 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -41,10 +41,12 @@ /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and - NULL on failure) */ + NULL on failure). + interested_parties points to a set of pollsets that would be interested + in this connection being established (in order to continue their work) */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), void *arg, grpc_pollset_set *interested_parties, const struct sockaddr *addr, int addr_len, gpr_timespec deadline); -#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 2cd4aa2f45..bbf7711588 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -51,6 +51,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/time.h> typedef struct { @@ -222,8 +223,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), fdobj = grpc_fd_create(fd, name); if (err >= 0) { - cb(arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); goto done; } diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 88141a3dc2..b1a169b519 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -52,7 +52,7 @@ #include "src/core/iomgr/socket_windows.h" typedef struct { - void(*cb)(void *arg, grpc_endpoint *tcp); + void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; gpr_mu mu; grpc_winsocket *socket; @@ -86,7 +86,7 @@ static void on_connect(void *acp, int from_iocp) { SOCKET sock = ac->socket->socket; grpc_endpoint *ep = NULL; grpc_winsocket_callback_info *info = &ac->socket->write_info; - void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; + void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; int aborted; @@ -99,8 +99,7 @@ static void on_connect(void *acp, int from_iocp) { DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, - &transfered_bytes, FALSE, - &flags); + &transfered_bytes, FALSE, &flags); info->outstanding = 0; GPR_ASSERT(transfered_bytes == 0); if (!wsa_success) { @@ -176,9 +175,9 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), /* Grab the function pointer for ConnectEx for that specific socket. It may change depending on the interface. */ - status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, - &guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx), - &ioctl_num_bytes, NULL, NULL); + status = + WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL); if (status != 0) { message = "Unable to retrieve ConnectEx pointer: %s"; @@ -187,8 +186,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), grpc_sockaddr_make_wildcard6(0, &local_address); - status = bind(sock, (struct sockaddr *) &local_address, - sizeof(local_address)); + status = bind(sock, (struct sockaddr *)&local_address, sizeof(local_address)); if (status != 0) { message = "Unable to bind socket: %s"; goto failure; @@ -234,4 +232,4 @@ failure: cb(arg, NULL); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index e3289f6806..9ad089af66 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -266,7 +266,7 @@ typedef struct { grpc_endpoint base; grpc_fd *em_fd; int fd; - int iov_size; /* Number of slices to allocate per read attempt */ + int iov_size; /* Number of slices to allocate per read attempt */ int finished_edge; size_t slice_size; gpr_refcount refcount; @@ -412,8 +412,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) { ++tcp->iov_size; } GPR_ASSERT(slice_state_has_available(&read_state)); - slice_state_transfer_ownership(&read_state, &final_slices, - &final_nslices); + slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices); call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); slice_state_destroy(&read_state); grpc_tcp_unref(tcp); diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 759a493795..5854031c9b 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -63,6 +63,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -107,6 +108,7 @@ struct grpc_tcp_server { /* destroyed port count: how many ports are completely destroyed */ size_t destroyed_ports; + /* is this server shutting down? (boolean) */ int shutdown; /* all listening ports */ @@ -118,7 +120,9 @@ struct grpc_tcp_server { void (*shutdown_complete)(void *); void *shutdown_complete_arg; + /* all pollsets interested in new connections */ grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ size_t pollset_count; }; @@ -159,6 +163,9 @@ static void destroyed_port(void *server, int success) { static void dont_care_about_shutdown_completion(void *ignored) {} +/* called when all listening endpoints have been shutdown, so no further + events will be received on them - at this point it's safe to destroy + things */ static void deactivated_all_ports(grpc_tcp_server *s) { size_t i; @@ -335,9 +342,8 @@ static void on_read(void *arg, int success) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); } - sp->server->cb( - sp->server->cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + sp->server->cb(sp->server->cb_arg, + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 9ef369dfd8..d70968de88 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -41,6 +41,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/log_win32.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 3341f558a3..15759c398a 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -41,6 +41,7 @@ #include <grpc/support/log.h> #include <grpc/support/log_win32.h> #include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> #include "src/core/iomgr/alarm.h" @@ -153,7 +154,7 @@ static void on_read(void *tcpp, int from_iocp) { status = GRPC_ENDPOINT_CB_ERROR; } else { if (info->bytes_transfered != 0) { - sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered); + sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); status = GRPC_ENDPOINT_CB_OK; slice = ⊂ nslices = 1; |