aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-05 21:08:33 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-05 21:08:33 -0700
commit27166a6f65581a9ee820882e5d7506d6bd07ade6 (patch)
tree0e89c5e09a955b2a074a8b34e0f3fe0a4135f346 /src/core/iomgr
parent4efb6966bdfb62c725c6614b0d85ea374250bb51 (diff)
parentd1dd3a68a2d4af56f1409327c197590dac6968cb (diff)
Merge github.com:grpc/grpc into flow-like-lava-to-a-barnyard
Conflicts: src/core/surface/call.c src/core/transport/chttp2_transport.c src/core/transport/transport.h
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c1
-rw-r--r--src/core/iomgr/fd_posix.c53
-rw-r--r--src/core/iomgr/fd_posix.h8
-rw-r--r--src/core/iomgr/iomgr.c10
-rw-r--r--src/core/iomgr/iomgr.h4
-rw-r--r--src/core/iomgr/pollset.h10
-rw-r--r--src/core/iomgr/pollset_kick_posix.c14
-rw-r--r--src/core/iomgr/pollset_kick_posix.h22
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c50
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c29
-rw-r--r--src/core/iomgr/pollset_posix.c61
-rw-r--r--src/core/iomgr/pollset_posix.h14
-rw-r--r--src/core/iomgr/pollset_set.h2
-rw-r--r--src/core/iomgr/pollset_set_posix.c6
-rw-r--r--src/core/iomgr/pollset_set_windows.c3
-rw-r--r--src/core/iomgr/pollset_set_windows.h4
-rw-r--r--src/core/iomgr/pollset_windows.c23
-rw-r--r--src/core/iomgr/pollset_windows.h8
-rw-r--r--src/core/iomgr/resolve_address_posix.c3
-rw-r--r--src/core/iomgr/resolve_address_windows.c3
-rw-r--r--src/core/iomgr/sockaddr_utils.c1
-rw-r--r--src/core/iomgr/socket_windows.c2
-rw-r--r--src/core/iomgr/tcp_client.h6
-rw-r--r--src/core/iomgr/tcp_client_posix.c4
-rw-r--r--src/core/iomgr/tcp_client_windows.c18
-rw-r--r--src/core/iomgr/tcp_posix.c5
-rw-r--r--src/core/iomgr/tcp_server_posix.c12
-rw-r--r--src/core/iomgr/tcp_server_windows.c1
-rw-r--r--src/core/iomgr/tcp_windows.c3
29 files changed, 239 insertions, 141 deletions
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
index 9b3b63f1e7..fa2d2555d6 100644
--- a/src/core/iomgr/endpoint_pair_posix.c
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -47,6 +47,7 @@
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
static void create_sockets(int sv[2]) {
int flags;
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 2ac1866a66..632d2a4609 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -74,6 +74,7 @@ static void freelist_fd(grpc_fd *fd) {
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
+ grpc_iomgr_unregister_object(&fd->iomgr_object);
gpr_mu_unlock(&fd_freelist_mu);
}
@@ -100,6 +101,7 @@ static grpc_fd *alloc_fd(int fd) {
&r->inactive_watcher_root;
r->freelist_next = NULL;
r->read_watcher = r->write_watcher = NULL;
+ r->on_done_closure = NULL;
return r;
}
@@ -112,7 +114,8 @@ static void destroy(grpc_fd *fd) {
#ifdef GRPC_FD_REF_COUNT_DEBUG
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
-static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) {
+static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
gpr_atm_no_barrier_load(&fd->refst),
gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
@@ -125,7 +128,8 @@ static void ref_by(grpc_fd *fd, int n) {
}
#ifdef GRPC_FD_REF_COUNT_DEBUG
-static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) {
+static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
gpr_atm old;
gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
gpr_atm_no_barrier_load(&fd->refst),
@@ -136,10 +140,6 @@ static void unref_by(grpc_fd *fd, int n) {
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
- if (fd->on_done_closure) {
- grpc_iomgr_add_callback(fd->on_done_closure);
- }
- grpc_iomgr_unregister_object(&fd->iomgr_object);
freelist_fd(fd);
} else {
GPR_ASSERT(old > n);
@@ -197,13 +197,24 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
}
+static int has_watchers(grpc_fd *fd) {
+ return fd->read_watcher != NULL || fd->write_watcher != NULL || fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
+}
+
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
- wake_all_watchers_locked(fd);
+ if (!has_watchers(fd)) {
+ close(fd->fd);
+ if (fd->on_done_closure) {
+ grpc_iomgr_add_callback(fd->on_done_closure);
+ }
+ } else {
+ wake_all_watchers_locked(fd);
+ }
gpr_mu_unlock(&fd->watcher_mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
}
@@ -225,7 +236,7 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static void process_callback(grpc_iomgr_closure *closure, int success,
- int allow_synchronous_callback) {
+ int allow_synchronous_callback) {
if (allow_synchronous_callback) {
closure->cb(closure->cb_arg, success);
} else {
@@ -265,7 +276,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback);
+ allow_synchronous_callback);
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
@@ -309,7 +320,7 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
- grpc_iomgr_closure* closure;
+ grpc_iomgr_closure *closure;
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
@@ -352,14 +363,22 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
GRPC_FD_REF(fd, "poll");
gpr_mu_lock(&fd->watcher_mu);
+ /* if we are shutdown, then don't add to the watcher set */
+ if (gpr_atm_no_barrier_load(&fd->shutdown)) {
+ watcher->fd = NULL;
+ watcher->pollset = NULL;
+ gpr_mu_unlock(&fd->watcher_mu);
+ GRPC_FD_UNREF(fd, "poll");
+ return 0;
+ }
/* if there is nobody polling for read, but we need to, then start doing so */
- if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+ if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
- if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+ if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
fd->write_watcher = watcher;
mask |= write_mask;
}
@@ -381,6 +400,10 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
int kick = 0;
grpc_fd *fd = watcher->fd;
+ if (fd == NULL) {
+ return;
+ }
+
gpr_mu_lock(&fd->watcher_mu);
if (watcher == fd->read_watcher) {
/* remove read watcher, kick if we still need a read */
@@ -402,6 +425,12 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
+ if (grpc_fd_is_orphaned(fd) && !has_watchers(fd)) {
+ close(fd->fd);
+ if (fd->on_done_closure != NULL) {
+ grpc_iomgr_add_callback(fd->on_done_closure);
+ }
+ }
gpr_mu_unlock(&fd->watcher_mu);
GRPC_FD_UNREF(fd, "poll");
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 523d040d17..94d0019fa4 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -62,12 +62,12 @@ struct grpc_fd {
gpr_atm shutdown;
/* The watcher list.
-
+
The following watcher related fields are protected by watcher_mu.
-
+
An fd_watcher is an ephemeral object created when an fd wants to
begin polling, and destroyed after the poll.
-
+
It denotes the fd's interest in whether to read poll or write poll
or both or neither on this fd.
@@ -175,4 +175,4 @@ void grpc_fd_unref(grpc_fd *fd);
void grpc_fd_global_init(void);
void grpc_fd_global_shutdown(void);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H */
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index c6c44658df..c47528aa94 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -40,8 +40,9 @@
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
static gpr_mu g_mu;
static gpr_cv g_rcv;
@@ -143,8 +144,8 @@ void grpc_iomgr_shutdown(void) {
}
if (g_root_object.next != &g_root_object) {
int timeout = 0;
- gpr_timespec short_deadline = gpr_time_add(gpr_now(),
- gpr_time_from_millis(100));
+ gpr_timespec short_deadline =
+ gpr_time_add(gpr_now(), gpr_time_from_millis(100));
while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
if (gpr_time_cmp(gpr_now(), shutdown_deadline) > 0) {
timeout = 1;
@@ -193,7 +194,6 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_mu_unlock(&g_mu);
}
-
void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
@@ -217,12 +217,10 @@ void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
gpr_mu_unlock(&g_mu);
}
-
void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) {
grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */);
}
-
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
int n = 0;
gpr_mu *retake_mu = NULL;
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index a10e481e48..6d4a82917b 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -73,4 +73,8 @@ void grpc_iomgr_shutdown(void);
* Can be called from within a callback or from anywhere else */
void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
+/** As per grpc_iomgr_add_callback, with the ability to set the success
+ argument. */
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 334e0ebde1..c40188b3c9 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -58,16 +58,18 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void *shutdown_done_arg);
void grpc_pollset_destroy(grpc_pollset *pollset);
-
/* Do some work on a pollset.
May involve invoking asynchronous callbacks, or actually polling file
descriptors.
Requires GRPC_POLLSET_MU(pollset) locked.
- May unlock GRPC_POLLSET_MU(pollset) during its execution. */
+ May unlock GRPC_POLLSET_MU(pollset) during its execution.
+
+ Returns true if some work has been done, and false if the deadline
+ got attained. */
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
-/* Break a pollset out of polling work
+/* Break one polling thread out of polling work for this pollset.
Requires GRPC_POLLSET_MU(pollset) locked. */
void grpc_pollset_kick(grpc_pollset *pollset);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */
diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c
index 21c9bb6542..51021784f2 100644
--- a/src/core/iomgr/pollset_kick_posix.c
+++ b/src/core/iomgr/pollset_kick_posix.c
@@ -73,7 +73,7 @@ static grpc_kick_fd_info *allocate_wfd(void) {
return info;
}
-static void destroy_wfd(grpc_kick_fd_info* wfd) {
+static void destroy_wfd(grpc_kick_fd_info *wfd) {
grpc_wakeup_fd_destroy(&wfd->wakeup_fd);
gpr_free(wfd);
}
@@ -104,7 +104,8 @@ void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
GPR_ASSERT(kick_state->fd_list.next == &kick_state->fd_list);
}
-grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) {
+grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
+ grpc_pollset_kick_state *kick_state) {
grpc_kick_fd_info *fd_info;
gpr_mu_lock(&kick_state->mu);
if (kick_state->kicked) {
@@ -120,11 +121,13 @@ grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_stat
return fd_info;
}
-void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info) {
+void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
+ grpc_kick_fd_info *fd_info) {
grpc_wakeup_fd_consume_wakeup(&fd_info->wakeup_fd);
}
-void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info) {
+void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
+ grpc_kick_fd_info *fd_info) {
gpr_mu_lock(&kick_state->mu);
fd_info->next->prev = fd_info->prev;
fd_info->prev->next = fd_info->next;
@@ -162,5 +165,4 @@ void grpc_pollset_kick_global_destroy(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-
-#endif /* GPR_POSIX_SOCKET */
+#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h
index b35c2cfbe0..77e32a8d51 100644
--- a/src/core/iomgr/pollset_kick_posix.h
+++ b/src/core/iomgr/pollset_kick_posix.h
@@ -37,6 +37,11 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/sync.h>
+/* pollset kicking allows breaking a thread out of polling work for
+ a given pollset.
+ writing a byte to a pipe is used as a posix-ly portable base
+ mechanism, and eventfds are utilized on Linux for better performance. */
+
typedef struct grpc_kick_fd_info {
grpc_wakeup_fd_info wakeup_fd;
/* used for polling list and free list */
@@ -51,7 +56,8 @@ typedef struct grpc_pollset_kick_state {
struct grpc_kick_fd_info fd_list;
} grpc_pollset_kick_state;
-#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd)
+#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) \
+ GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd)
/* This is an abstraction around the typical pipe mechanism for waking up a
thread sitting in a poll() style call. */
@@ -66,18 +72,22 @@ void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);
* applicable. Intended for testing. */
void grpc_pollset_kick_global_init_fallback_fd(void);
-/* Must be called before entering poll(). If return value is -1, this consumed
+/* Must be called before entering poll(). If return value is NULL, this consumed
an existing kick. Otherwise the return value is an FD to add to the poll set.
*/
-grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state);
+grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
+ grpc_pollset_kick_state *kick_state);
/* Consume an existing kick. Must be called after poll returns that the fd was
readable, and before calling kick_post_poll. */
-void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info);
+void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
+ grpc_kick_fd_info *fd_info);
/* Must be called after pre_poll, and after consume if applicable */
-void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info);
+void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
+ grpc_kick_fd_info *fd_info);
+/* Actually kick */
void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 05b522bbf2..1900bbf9e1 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -54,17 +54,25 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
-
- ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
- ev.data.ptr = fd;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
- if (err < 0) {
- /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
- if (errno != EEXIST) {
- gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
- strerror(errno));
+ grpc_fd_watcher watcher;
+
+ /* We pretend to be polling whilst adding an fd to keep the fd from being
+ closed during the add. This may result in a spurious wakeup being assigned
+ to this pollset whilst adding, but that should be benign. */
+ GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
+ if (watcher.fd != NULL) {
+ ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+ ev.data.ptr = fd;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (err < 0) {
+ /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
+ if (errno != EEXIST) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+ strerror(errno));
+ }
}
}
+ grpc_fd_end_poll(&watcher, 0, 0);
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
@@ -83,7 +91,7 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
/* TODO(klempner): We probably want to turn this down a bit */
#define GRPC_EPOLL_MAX_EVENTS 1000
-static int multipoll_with_epoll_pollset_maybe_work(
+static void multipoll_with_epoll_pollset_maybe_work(
grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
@@ -97,14 +105,7 @@ static int multipoll_with_epoll_pollset_maybe_work(
* here.
*/
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout_ms = -1;
- } else {
- timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout_ms <= 0) {
- return 1;
- }
- }
+ timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
@@ -140,10 +141,10 @@ static int multipoll_with_epoll_pollset_maybe_work(
gpr_mu_lock(&pollset->mu);
pollset->counter -= 1;
- return 1;
}
-static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset) {}
+static void multipoll_with_epoll_pollset_finish_shutdown(
+ grpc_pollset *pollset) {}
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
@@ -158,9 +159,12 @@ static void epoll_kick(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
- multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
- multipoll_with_epoll_pollset_maybe_work, epoll_kick,
- multipoll_with_epoll_pollset_finish_shutdown, multipoll_with_epoll_pollset_destroy};
+ multipoll_with_epoll_pollset_add_fd,
+ multipoll_with_epoll_pollset_del_fd,
+ multipoll_with_epoll_pollset_maybe_work,
+ epoll_kick,
+ multipoll_with_epoll_pollset_finish_shutdown,
+ multipoll_with_epoll_pollset_destroy};
static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
size_t nfds) {
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index afce5f65db..7b717bd159 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -103,7 +103,7 @@ static void end_polling(grpc_pollset *pollset) {
}
}
-static int multipoll_with_poll_pollset_maybe_work(
+static void multipoll_with_poll_pollset_maybe_work(
grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
int timeout;
@@ -113,14 +113,7 @@ static int multipoll_with_poll_pollset_maybe_work(
grpc_kick_fd_info *kfd;
h = pollset->data.ptr;
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout = -1;
- } else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
- return 1;
- }
- }
+ timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
if (h->pfd_capacity < h->fd_count + 1) {
h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
gpr_free(h->pfds);
@@ -133,7 +126,7 @@ static int multipoll_with_poll_pollset_maybe_work(
kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
if (kfd == NULL) {
/* Already kicked */
- return 1;
+ return;
}
h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
h->pfds[0].events = POLLIN;
@@ -161,7 +154,7 @@ static int multipoll_with_poll_pollset_maybe_work(
h->del_count = 0;
if (h->pfd_count == 0) {
end_polling(pollset);
- return 0;
+ return;
}
pollset->counter++;
gpr_mu_unlock(&pollset->mu);
@@ -186,6 +179,9 @@ static int multipoll_with_poll_pollset_maybe_work(
grpc_pollset_kick_consume(&pollset->kick_state, kfd);
}
for (i = 1; i < np; i++) {
+ if (h->watchers[i].fd == NULL) {
+ continue;
+ }
if (h->pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback);
}
@@ -198,8 +194,6 @@ static int multipoll_with_poll_pollset_maybe_work(
gpr_mu_lock(&pollset->mu);
pollset->counter--;
-
- return 1;
}
static void multipoll_with_poll_pollset_kick(grpc_pollset *p) {
@@ -231,9 +225,12 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable multipoll_with_poll_pollset = {
- multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
- multipoll_with_poll_pollset_maybe_work, multipoll_with_poll_pollset_kick,
- multipoll_with_poll_pollset_finish_shutdown, multipoll_with_poll_pollset_destroy};
+ multipoll_with_poll_pollset_add_fd,
+ multipoll_with_poll_pollset_del_fd,
+ multipoll_with_poll_pollset_maybe_work,
+ multipoll_with_poll_pollset_kick,
+ multipoll_with_poll_pollset_finish_shutdown,
+ multipoll_with_poll_pollset_destroy};
void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
size_t nfds) {
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 796b1b9ebf..15ed8e75e6 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -99,6 +99,7 @@ void grpc_pollset_init(grpc_pollset *pollset) {
grpc_pollset_kick_init(&pollset->kick_state);
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
become_basic_pollset(pollset, NULL);
}
@@ -122,7 +123,6 @@ static void finish_shutdown(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now();
- int r;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
@@ -136,12 +136,13 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
return 1;
}
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- r = pollset->vtable->maybe_work(pollset, deadline, now, 1);
+ pollset->vtable->maybe_work(pollset, deadline, now, 1);
gpr_tls_set(&g_current_thread_poller, 0);
if (pollset->shutting_down) {
if (pollset->counter > 0) {
grpc_pollset_kick(pollset);
- } else if (pollset->in_flight_cbs == 0) {
+ } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
+ pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
finish_shutdown(pollset);
/* Continuing to access pollset here is safe -- it is the caller's
@@ -151,27 +152,29 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
gpr_mu_lock(&pollset->mu);
}
}
- return r;
+ return 1;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
void (*shutdown_done)(void *arg),
void *shutdown_done_arg) {
- int in_flight_cbs;
- int counter;
+ int call_shutdown = 0;
gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
- in_flight_cbs = pollset->in_flight_cbs;
- counter = pollset->counter;
+ if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
+ pollset->counter == 0) {
+ pollset->called_shutdown = 1;
+ call_shutdown = 1;
+ }
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
- if (counter > 0) {
+ if (pollset->counter > 0) {
grpc_pollset_kick(pollset);
}
gpr_mu_unlock(&pollset->mu);
- if (in_flight_cbs == 0 && counter == 0) {
+ if (call_shutdown) {
finish_shutdown(pollset);
}
}
@@ -184,6 +187,22 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
+int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) {
+ gpr_timespec timeout;
+ static const int max_spin_polling_us = 10;
+ if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
+ return -1;
+ }
+ if (gpr_time_cmp(
+ deadline,
+ gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) {
+ return 0;
+ }
+ timeout = gpr_time_sub(deadline, now);
+ return gpr_time_to_millis(
+ gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1)));
+}
+
/*
* basic_pollset - a vtable that provides polling for zero or one file
* descriptor via poll()
@@ -318,9 +337,9 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
}
}
-static int basic_pollset_maybe_work(grpc_pollset *pollset,
- gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+static void basic_pollset_maybe_work(grpc_pollset *pollset,
+ gpr_timespec deadline, gpr_timespec now,
+ int allow_synchronous_callback) {
struct pollfd pfd[2];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
@@ -333,25 +352,18 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
/* Give do_promote priority so we don't starve it out */
gpr_mu_unlock(&pollset->mu);
gpr_mu_lock(&pollset->mu);
- return 1;
+ return;
}
fd = pollset->data.ptr;
if (fd && grpc_fd_is_orphaned(fd)) {
GRPC_FD_UNREF(fd, "basicpoll");
fd = pollset->data.ptr = NULL;
}
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout = -1;
- } else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
- return 1;
- }
- }
+ timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
if (kfd == NULL) {
/* Already kicked */
- return 1;
+ return;
}
pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
pfd[0].events = POLLIN;
@@ -405,7 +417,6 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu);
pollset->counter--;
- return 1;
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
@@ -417,7 +428,7 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable basic_pollset = {
- basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
+ basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy};
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 2b897caa4b..53585a2886 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -56,6 +56,7 @@ typedef struct grpc_pollset {
int counter;
int in_flight_cbs;
int shutting_down;
+ int called_shutdown;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
union {
@@ -67,8 +68,8 @@ typedef struct grpc_pollset {
struct grpc_pollset_vtable {
void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
- int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback);
+ void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback);
void (*kick)(grpc_pollset *pollset);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
@@ -93,6 +94,15 @@ int grpc_kick_read_fd(grpc_pollset *p);
/* Call after polling has been kicked to leave the kicked state */
void grpc_kick_drain(grpc_pollset *p);
+/* Convert a timespec to milliseconds:
+ - very small or negative poll times are clamped to zero to do a
+ non-blocking poll (which becomes spin polling)
+ - other small values are rounded up to one millisecond
+ - longer than a millisecond polls are rounded up to the next nearest
+ millisecond to avoid spinning
+ - infinite timeouts are converted to -1 */
+int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now);
+
/* turn a pollset into a multipoller: platform specific */
typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset,
struct grpc_fd **fds,
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 335ffb21b9..98e3b552a7 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -39,7 +39,7 @@
/* A grpc_pollset_set is a set of pollsets that are interested in an
action. Adding a pollset to a pollset_set automatically adds any
fd's (etc) that have been registered with the set_set with that pollset.
- Registering fd's automatically iterates all current pollsets. */
+ Registering fd's automatically adds them to all current pollsets. */
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_set_posix.h"
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index c00d6e6e57..005e938398 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -49,7 +49,11 @@ void grpc_pollset_set_init(grpc_pollset_set *pollset_set) {
}
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
+ size_t i;
gpr_mu_destroy(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+ }
gpr_free(pollset_set->pollsets);
gpr_free(pollset_set->fds);
}
@@ -95,6 +99,7 @@ void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) {
pollset_set->fds = gpr_realloc(
pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
}
+ GRPC_FD_REF(fd, "pollset_set");
pollset_set->fds[pollset_set->fd_count++] = fd;
for (i = 0; i < pollset_set->pollset_count; i++) {
grpc_pollset_add_fd(pollset_set->pollsets[i], fd);
@@ -110,6 +115,7 @@ void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) {
pollset_set->fd_count--;
GPR_SWAP(grpc_fd *, pollset_set->fds[i],
pollset_set->fds[pollset_set->pollset_count]);
+ GRPC_FD_UNREF(fd, "pollset_set");
break;
}
}
diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c
index d295c64b5e..b9c209cd2c 100644
--- a/src/core/iomgr/pollset_set_windows.c
+++ b/src/core/iomgr/pollset_set_windows.c
@@ -44,4 +44,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {}
void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
grpc_pollset *pollset) {}
+void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {}
+
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h
index 0f91f1ede7..cada0d2b61 100644
--- a/src/core/iomgr/pollset_set_windows.h
+++ b/src/core/iomgr/pollset_set_windows.h
@@ -34,8 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
-typedef struct grpc_pollset_set {
- void *unused;
-} grpc_pollset_set;
+typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set;
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 88db774cc4..8d6bc79c96 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -39,6 +39,7 @@
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
/* There isn't really any such thing as a pollset under Windows, due to the
@@ -47,17 +48,24 @@
won't actually do any polling, and return as quickly as possible. */
void grpc_pollset_init(grpc_pollset *pollset) {
+ memset(pollset, 0, sizeof(*pollset));
gpr_mu_init(&pollset->mu);
+ gpr_cv_init(&pollset->cv);
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
void (*shutdown_done)(void *arg),
void *shutdown_done_arg) {
+ gpr_mu_lock(&pollset->mu);
+ pollset->shutting_down = 1;
+ gpr_cv_broadcast(&pollset->cv);
+ gpr_mu_unlock(&pollset->mu);
shutdown_done(shutdown_done_arg);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
+ gpr_cv_destroy(&pollset->cv);
}
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
@@ -66,15 +74,20 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
- if (grpc_maybe_call_delayed_callbacks(NULL, 1 /* GPR_TRUE */)) {
+ if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
return 1 /* GPR_TRUE */;
}
- if (grpc_alarm_check(NULL, now, &deadline)) {
+ if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1 /* GPR_TRUE */;
}
- return 0 /* GPR_FALSE */;
+ if (!pollset->shutting_down) {
+ gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
+ }
+ return 1 /* GPR_TRUE */;
}
-void grpc_pollset_kick(grpc_pollset *p) { }
+void grpc_pollset_kick(grpc_pollset *p) {
+ gpr_cv_signal(&p->cv);
+}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index acd82d0a0a..57a2907926 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -37,18 +37,18 @@
#include <windows.h>
#include <grpc/support/sync.h>
-#include "src/core/iomgr/pollset_kick.h"
#include "src/core/iomgr/socket_windows.h"
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. A Windows "pollset" is merely a mutex
- and a condition variable, as this is the minimal set of features we need
- implemented for the rest of grpc. But we won't use them directly. */
+ and a condition variable, used to synchronize with the IOCP. */
typedef struct grpc_pollset {
gpr_mu mu;
+ gpr_cv cv;
+ int shutting_down;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
-#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index fcf48fe0d7..dbf884c769 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -47,6 +47,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
@@ -154,9 +155,9 @@ static void do_request(void *rp) {
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
+ cb(arg, resolved);
grpc_iomgr_unregister_object(&r->iomgr_object);
gpr_free(r);
- cb(arg, resolved);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index 7d0d2f9e7a..fb5fd0d4f6 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -46,6 +46,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
@@ -134,9 +135,9 @@ static void do_request(void *rp) {
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
+ grpc_iomgr_unregister_object(&r->iomgr_object);
gpr_free(r);
cb(arg, resolved);
- grpc_iomgr_unregister_object(&r->iomgr_object);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c
index 3d202a5cc8..e91b94f8c8 100644
--- a/src/core/iomgr/sockaddr_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -40,6 +40,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
+#include <grpc/support/string_util.h>
static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0xff, 0xff};
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index e4ba0a2b66..fbf3fdc949 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -85,13 +85,13 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) {
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket;
+ grpc_iomgr_unregister_object(&winsocket->iomgr_object);
if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
grpc_iocp_socket_orphan(winsocket);
} else {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
- grpc_iomgr_unregister_object(&winsocket->iomgr_object);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h
index e1fdf235ec..0fa08b52b0 100644
--- a/src/core/iomgr/tcp_client.h
+++ b/src/core/iomgr/tcp_client.h
@@ -41,10 +41,12 @@
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
- NULL on failure) */
+ NULL on failure).
+ interested_parties points to a set of pollsets that would be interested
+ in this connection being established (in order to continue their work) */
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
void *arg, grpc_pollset_set *interested_parties,
const struct sockaddr *addr, int addr_len,
gpr_timespec deadline);
-#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */
+#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 2cd4aa2f45..bbf7711588 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -51,6 +51,7 @@
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
typedef struct {
@@ -222,8 +223,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
- cb(arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
goto done;
}
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 88141a3dc2..b1a169b519 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -52,7 +52,7 @@
#include "src/core/iomgr/socket_windows.h"
typedef struct {
- void(*cb)(void *arg, grpc_endpoint *tcp);
+ void (*cb)(void *arg, grpc_endpoint *tcp);
void *cb_arg;
gpr_mu mu;
grpc_winsocket *socket;
@@ -86,7 +86,7 @@ static void on_connect(void *acp, int from_iocp) {
SOCKET sock = ac->socket->socket;
grpc_endpoint *ep = NULL;
grpc_winsocket_callback_info *info = &ac->socket->write_info;
- void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
+ void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg;
int aborted;
@@ -99,8 +99,7 @@ static void on_connect(void *acp, int from_iocp) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
- &transfered_bytes, FALSE,
- &flags);
+ &transfered_bytes, FALSE, &flags);
info->outstanding = 0;
GPR_ASSERT(transfered_bytes == 0);
if (!wsa_success) {
@@ -176,9 +175,9 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
/* Grab the function pointer for ConnectEx for that specific socket.
It may change depending on the interface. */
- status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
- &guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx),
- &ioctl_num_bytes, NULL, NULL);
+ status =
+ WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL);
if (status != 0) {
message = "Unable to retrieve ConnectEx pointer: %s";
@@ -187,8 +186,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
grpc_sockaddr_make_wildcard6(0, &local_address);
- status = bind(sock, (struct sockaddr *) &local_address,
- sizeof(local_address));
+ status = bind(sock, (struct sockaddr *)&local_address, sizeof(local_address));
if (status != 0) {
message = "Unable to bind socket: %s";
goto failure;
@@ -234,4 +232,4 @@ failure:
cb(arg, NULL);
}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index e3289f6806..9ad089af66 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -266,7 +266,7 @@ typedef struct {
grpc_endpoint base;
grpc_fd *em_fd;
int fd;
- int iov_size; /* Number of slices to allocate per read attempt */
+ int iov_size; /* Number of slices to allocate per read attempt */
int finished_edge;
size_t slice_size;
gpr_refcount refcount;
@@ -412,8 +412,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) {
++tcp->iov_size;
}
GPR_ASSERT(slice_state_has_available(&read_state));
- slice_state_transfer_ownership(&read_state, &final_slices,
- &final_nslices);
+ slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices);
call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
slice_state_destroy(&read_state);
grpc_tcp_unref(tcp);
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 759a493795..5854031c9b 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -63,6 +63,7 @@
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@@ -107,6 +108,7 @@ struct grpc_tcp_server {
/* destroyed port count: how many ports are completely destroyed */
size_t destroyed_ports;
+ /* is this server shutting down? (boolean) */
int shutdown;
/* all listening ports */
@@ -118,7 +120,9 @@ struct grpc_tcp_server {
void (*shutdown_complete)(void *);
void *shutdown_complete_arg;
+ /* all pollsets interested in new connections */
grpc_pollset **pollsets;
+ /* number of pollsets in the pollsets array */
size_t pollset_count;
};
@@ -159,6 +163,9 @@ static void destroyed_port(void *server, int success) {
static void dont_care_about_shutdown_completion(void *ignored) {}
+/* called when all listening endpoints have been shutdown, so no further
+ events will be received on them - at this point it's safe to destroy
+ things */
static void deactivated_all_ports(grpc_tcp_server *s) {
size_t i;
@@ -335,9 +342,8 @@ static void on_read(void *arg, int success) {
for (i = 0; i < sp->server->pollset_count; i++) {
grpc_pollset_add_fd(sp->server->pollsets[i], fdobj);
}
- sp->server->cb(
- sp->server->cb_arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ sp->server->cb(sp->server->cb_arg,
+ grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
gpr_free(name);
gpr_free(addr_str);
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 9ef369dfd8..d70968de88 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -41,6 +41,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/log_win32.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 3341f558a3..15759c398a 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/log_win32.h>
#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/iomgr/alarm.h"
@@ -153,7 +154,7 @@ static void on_read(void *tcpp, int from_iocp) {
status = GRPC_ENDPOINT_CB_ERROR;
} else {
if (info->bytes_transfered != 0) {
- sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered);
+ sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
status = GRPC_ENDPOINT_CB_OK;
slice = &sub;
nslices = 1;