aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2015-10-06 11:16:49 -0700
committerGravatar murgatroid99 <mlumish@google.com>2015-10-06 11:16:49 -0700
commitcc545461c0bcc819a40370d0259146b9e0a35b6c (patch)
tree508c691c09fa41df12f260735501c44eb681ee27 /src/core/iomgr
parent86ef17ada9a11966ea307c720bfa06e63ce09184 (diff)
parenta4aba6e66876d9f3babda8644949fd3cb4bb9745 (diff)
Resolved merge conflicts with master
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/exec_ctx.h2
-rw-r--r--src/core/iomgr/iocp_windows.c13
-rw-r--r--src/core/iomgr/iomgr.c7
-rw-r--r--src/core/iomgr/iomgr_posix.c7
-rw-r--r--src/core/iomgr/iomgr_windows.c4
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c29
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c22
-rw-r--r--src/core/iomgr/pollset_posix.c54
-rw-r--r--src/core/iomgr/pollset_posix.h1
-rw-r--r--src/core/iomgr/pollset_windows.c76
-rw-r--r--src/core/iomgr/tcp_client_posix.c14
-rw-r--r--src/core/iomgr/tcp_posix.c4
-rw-r--r--src/core/iomgr/tcp_server_windows.c2
-rw-r--r--src/core/iomgr/tcp_windows.c6
-rw-r--r--src/core/iomgr/udp_server.c4
15 files changed, 148 insertions, 97 deletions
diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h
index aa0610cbea..43df488094 100644
--- a/src/core/iomgr/exec_ctx.h
+++ b/src/core/iomgr/exec_ctx.h
@@ -61,7 +61,7 @@ struct grpc_exec_ctx {
{ GRPC_CLOSURE_LIST_INIT }
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
- * Caller must guarantee that no interfering locks are held.
+ * Caller must guarantee that no interfering locks are held.
* Returns 1 if work was performed, 0 otherwise. */
int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index cf33d74366..cebd863924 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -62,13 +62,13 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline,
return INFINITE;
}
if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
- max_spin_polling_us,
- GPR_TIMESPAN))) <= 0) {
+ max_spin_polling_us,
+ GPR_TIMESPAN))) <= 0) {
return 0;
}
timeout = gpr_time_sub(deadline, now);
return gpr_time_to_millis(gpr_time_add(
- timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) {
@@ -80,8 +80,9 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) {
grpc_winsocket *socket;
grpc_winsocket_callback_info *info;
grpc_closure *closure = NULL;
- success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key,
- &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type)));
+ success = GetQueuedCompletionStatus(
+ g_iocp, &bytes, &completion_key, &overlapped,
+ deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type)));
if (success == 0 && overlapped == NULL) {
return;
}
@@ -139,7 +140,7 @@ void grpc_iocp_kick(void) {
void grpc_iocp_flush(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
+
do {
grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC));
} while (grpc_exec_ctx_flush(&exec_ctx));
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index a10399311f..e61fc32925 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -51,13 +51,6 @@ static gpr_cv g_rcv;
static int g_shutdown;
static grpc_iomgr_object g_root_object;
-void grpc_kick_poller(void) {
- /* Empty. The background callback executor polls periodically. The activity
- * the kicker is trying to draw the executor's attention to will be picked up
- * either by one of the periodic wakeups or by one of the polling application
- * threads. */
-}
-
void grpc_iomgr_init(void) {
g_shutdown = 0;
gpr_mu_init(&g_mu);
diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c
index f6474b7e6d..fecb7b9760 100644
--- a/src/core/iomgr/iomgr_posix.c
+++ b/src/core/iomgr/iomgr_posix.c
@@ -45,11 +45,8 @@ void grpc_iomgr_platform_init(void) {
grpc_register_tracer("tcp", &grpc_tcp_trace);
}
-void grpc_iomgr_platform_flush(void) {
-}
+void grpc_iomgr_platform_flush(void) {}
-void grpc_iomgr_platform_shutdown(void) {
- grpc_fd_global_shutdown();
-}
+void grpc_iomgr_platform_shutdown(void) { grpc_fd_global_shutdown(); }
#endif /* GRPC_POSIX_SOCKET */
diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c
index 93bdc5ec16..14775516bb 100644
--- a/src/core/iomgr/iomgr_windows.c
+++ b/src/core/iomgr/iomgr_windows.c
@@ -63,9 +63,7 @@ void grpc_iomgr_platform_init(void) {
grpc_iocp_init();
}
-void grpc_iomgr_platform_flush(void) {
- grpc_iocp_flush();
-}
+void grpc_iomgr_platform_flush(void) { grpc_iocp_flush(); }
void grpc_iomgr_platform_shutdown(void) {
grpc_iocp_shutdown();
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index b22eaa6288..faf0a6362b 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -198,7 +198,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
}
if (pfds[1].revents) {
do {
- /* The following epoll_wait never blocks; it has a timeout of 0 */
+ /* The following epoll_wait never blocks; it has a timeout of 0 */
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
if (ep_rv < 0) {
if (errno != EINTR) {
@@ -213,11 +213,15 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
- grpc_fd_become_readable(exec_ctx, fd);
- }
- if (write_ev || cancel) {
- grpc_fd_become_writable(exec_ctx, fd);
+ if (fd == NULL) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ } else {
+ if (read_ev || cancel) {
+ grpc_fd_become_readable(exec_ctx, fd);
+ }
+ if (write_ev || cancel) {
+ grpc_fd_become_writable(exec_ctx, fd);
+ }
}
}
}
@@ -246,6 +250,8 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
+ struct epoll_event ev;
+ int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
@@ -255,6 +261,17 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
abort();
}
+
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = NULL;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
+ GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
+ if (err < 0) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
+ GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
+ strerror(errno));
+ }
+
for (i = 0; i < nfds; i++) {
multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
}
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 63e0b9edb9..1356ebe7a0 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -114,13 +114,16 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
h = pollset->data.ptr;
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
/* TODO(ctiller): perform just one malloc here if we exceed the inline case */
- pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1));
- watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1));
+ pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2));
+ watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2));
fd_count = 0;
- pfd_count = 1;
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd_count = 2;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfds[0].events = POLLIN;
- pfds[0].revents = POLLOUT;
+ pfds[0].revents = 0;
+ pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
for (i = 0; i < h->fd_count; i++) {
int remove = grpc_fd_is_orphaned(h->fds[i]);
for (j = 0; !remove && j < h->del_count; j++) {
@@ -143,7 +146,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
h->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
- for (i = 1; i < pfd_count; i++) {
+ for (i = 2; i < pfd_count; i++) {
pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
POLLOUT, &watchers[i]);
}
@@ -154,7 +157,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
- for (i = 1; i < pfd_count; i++) {
+ for (i = 2; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
pfds[i].revents & POLLOUT);
}
@@ -167,9 +170,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
/* do nothing */
} else {
if (pfds[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfds[1].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- for (i = 1; i < pfd_count; i++) {
+ for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
continue;
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index f9d6aad651..b663780a02 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -57,8 +57,16 @@
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
+/** Default poll() function - a pointer so that it can be overridden by some
+ * tests */
grpc_poll_function_type grpc_poll_function = poll;
+/** The alarm system needs to be able to wakeup 'some poller' sometimes
+ * (specifically when a new alarm needs to be triggered earlier than the next
+ * alarm 'epoch').
+ * This wakeup_fd gives us something to alert on when such a case occurs. */
+grpc_wakeup_fd grpc_global_wakeup_fd;
+
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
@@ -121,14 +129,18 @@ void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
grpc_wakeup_fd_global_init();
+ grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
void grpc_pollset_global_shutdown(void) {
+ grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ grpc_wakeup_fd_global_destroy();
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
- grpc_wakeup_fd_global_destroy();
}
+void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
+
/* main interface */
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
@@ -193,6 +205,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
goto done;
}
if (grpc_alarm_check(exec_ctx, now, &deadline)) {
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
goto done;
}
if (pollset->shutting_down) {
@@ -294,7 +308,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
}
timeout = gpr_time_sub(deadline, now);
return gpr_time_to_millis(gpr_time_add(
- timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN)));
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
/*
@@ -338,6 +352,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) {
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
+ pollset->called_shutdown = 1;
finish_shutdown(exec_ctx, pollset);
}
} else if (grpc_fd_is_orphaned(fd)) {
@@ -439,7 +454,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset_worker *worker,
gpr_timespec deadline,
gpr_timespec now) {
- struct pollfd pfd[2];
+ struct pollfd pfd[3];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
int timeout;
@@ -452,17 +467,21 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
fd = pollset->data.ptr = NULL;
}
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
- nfds = 1;
+ pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd[1].events = POLLIN;
+ pfd[1].revents = 0;
+ nfds = 2;
if (fd) {
- pfd[1].fd = fd->fd;
- pfd[1].revents = 0;
+ pfd[2].fd = fd->fd;
+ pfd[2].revents = 0;
+ GRPC_FD_REF(fd, "basicpoll_begin");
gpr_mu_unlock(&pollset->mu);
- pfd[1].events =
+ pfd[2].events =
(short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
- if (pfd[1].events != 0) {
+ if (pfd[2].events != 0) {
nfds++;
}
} else {
@@ -479,8 +498,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
- grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[1].revents & POLLIN,
- pfd[1].revents & POLLOUT);
+ grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN,
+ pfd[2].revents & POLLOUT);
}
if (r < 0) {
@@ -491,17 +510,24 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfd[1].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- if (nfds > 1) {
- if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
+ if (nfds > 2) {
+ if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) {
grpc_fd_become_readable(exec_ctx, fd);
}
- if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) {
+ if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) {
grpc_fd_become_writable(exec_ctx, fd);
}
}
}
+
+ if (fd) {
+ GRPC_FD_UNREF(fd, "basicpoll_begin");
+ }
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index f996dd1edf..83c5258539 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -129,5 +129,6 @@ int grpc_pollset_has_workers(grpc_pollset *pollset);
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
+extern grpc_wakeup_fd grpc_global_wakeup_fd;
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 798b637635..96abaea0b3 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -51,22 +51,21 @@ void grpc_pollset_global_init() {
gpr_mu_init(&grpc_polling_mu);
g_active_poller = NULL;
g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
- g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
- &g_global_root_worker;
+ g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
+ &g_global_root_worker;
}
-void grpc_pollset_global_shutdown() {
- gpr_mu_destroy(&grpc_polling_mu);
-}
+void grpc_pollset_global_shutdown() { gpr_mu_destroy(&grpc_polling_mu); }
-static void remove_worker(grpc_pollset_worker *worker,
+static void remove_worker(grpc_pollset_worker *worker,
grpc_pollset_worker_link_type type) {
worker->links[type].prev->links[type].next = worker->links[type].next;
worker->links[type].next->links[type].prev = worker->links[type].prev;
worker->links[type].next = worker->links[type].prev = worker;
}
-static int has_workers(grpc_pollset_worker *root, grpc_pollset_worker_link_type type) {
+static int has_workers(grpc_pollset_worker *root,
+ grpc_pollset_worker_link_type type) {
return root->links[type].next != root;
}
@@ -81,24 +80,22 @@ static grpc_pollset_worker *pop_front_worker(
}
}
-static void push_back_worker(grpc_pollset_worker *root,
- grpc_pollset_worker_link_type type,
+static void push_back_worker(grpc_pollset_worker *root,
+ grpc_pollset_worker_link_type type,
grpc_pollset_worker *worker) {
worker->links[type].next = root;
worker->links[type].prev = worker->links[type].next->links[type].prev;
- worker->links[type].prev->links[type].next =
- worker->links[type].next->links[type].prev =
- worker;
+ worker->links[type].prev->links[type].next =
+ worker->links[type].next->links[type].prev = worker;
}
-static void push_front_worker(grpc_pollset_worker *root,
- grpc_pollset_worker_link_type type,
+static void push_front_worker(grpc_pollset_worker *root,
+ grpc_pollset_worker_link_type type,
grpc_pollset_worker *worker) {
worker->links[type].prev = root;
worker->links[type].next = worker->links[type].prev->links[type].next;
- worker->links[type].prev->links[type].next =
- worker->links[type].next->links[type].prev =
- worker;
+ worker->links[type].prev->links[type].next =
+ worker->links[type].next->links[type].prev = worker;
}
/* There isn't really any such thing as a pollset under Windows, due to the
@@ -108,9 +105,9 @@ static void push_front_worker(grpc_pollset_worker *root,
void grpc_pollset_init(grpc_pollset *pollset) {
memset(pollset, 0, sizeof(*pollset));
- pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
- pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
- &pollset->root_worker;
+ pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
+ pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
+ &pollset->root_worker;
}
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -126,18 +123,16 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_unlock(&grpc_polling_mu);
}
-void grpc_pollset_destroy(grpc_pollset *pollset) {
-}
+void grpc_pollset_destroy(grpc_pollset *pollset) {}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, gpr_timespec now,
gpr_timespec deadline) {
int added_worker = 0;
- worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
- worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
- worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
- worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev =
- NULL;
+ worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
+ worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
+ worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
+ worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
worker->kicked = 0;
worker->pollset = pollset;
gpr_cv_init(&worker->cv);
@@ -157,9 +152,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->is_iocp_worker = 0;
g_active_poller = NULL;
/* try to get a worker from this pollsets worker list */
- next_worker = pop_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
- /* try to get a worker from the global list */
- next_worker = pop_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
+ next_worker = pop_front_worker(&pollset->root_worker,
+ GRPC_POLLSET_WORKER_LINK_POLLSET);
+ if (next_worker == NULL) {
+ /* try to get a worker from the global list */
+ next_worker = pop_front_worker(&g_global_root_worker,
+ GRPC_POLLSET_WORKER_LINK_GLOBAL);
+ }
if (next_worker != NULL) {
next_worker->kicked = 1;
gpr_cv_signal(&next_worker->cv);
@@ -171,8 +170,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
goto done;
}
- push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, worker);
- push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, worker);
+ push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
+ worker);
+ push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
+ worker);
added_worker = 1;
while (!worker->kicked) {
if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) {
@@ -198,9 +199,11 @@ done:
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
- for (specific_worker = p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
+ for (specific_worker =
+ p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next;
specific_worker != &p->root_worker;
- specific_worker = specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
+ specific_worker =
+ specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) {
specific_worker->kicked = 1;
gpr_cv_signal(&specific_worker->cv);
}
@@ -219,7 +222,8 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
}
}
} else {
- specific_worker = pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
+ specific_worker =
+ pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
if (specific_worker != NULL) {
grpc_pollset_kick(p, specific_worker);
} else if (p->is_iocp_worker) {
@@ -230,4 +234,6 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
}
}
+void grpc_kick_poller(void) { grpc_iocp_kick(); }
+
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 346566866a..aca2691c41 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -141,7 +141,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) {
err = getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
- gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno));
+ gpr_log(GPR_ERROR, "failed to connect to '%s': getsockopt(ERROR): %s",
+ ac->addr_str, strerror(errno));
goto finish;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
@@ -166,10 +167,14 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) {
} else {
switch (so_error) {
case ECONNREFUSED:
- gpr_log(GPR_ERROR, "socket error: connection refused");
+ gpr_log(
+ GPR_ERROR,
+ "failed to connect to '%s': socket error: connection refused",
+ ac->addr_str);
break;
default:
- gpr_log(GPR_ERROR, "socket error: %d", so_error);
+ gpr_log(GPR_ERROR, "failed to connect to '%s': socket error: %d",
+ ac->addr_str, so_error);
break;
}
goto finish;
@@ -181,7 +186,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) {
goto finish;
}
} else {
- gpr_log(GPR_ERROR, "on_writable failed during connect");
+ gpr_log(GPR_ERROR, "failed to connect to '%s': timeout occurred",
+ ac->addr_str);
goto finish;
}
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 54ebad7dbc..4a57037a72 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -292,7 +292,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
unwind_slice_idx = tcp->outgoing_slice_idx;
unwind_byte_idx = tcp->outgoing_byte_idx;
for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
- iov_size != MAX_WRITE_IOVEC;
+ iov_size != MAX_WRITE_IOVEC;
iov_size++) {
iov[iov_size].iov_base =
GPR_SLICE_START_PTR(
@@ -441,7 +441,7 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
}
static const grpc_endpoint_vtable vtable = {
- tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
+ tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
tcp_shutdown, tcp_destroy, tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 4b11ab0f06..db3319b3c6 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -382,7 +382,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
if (s->nports == s->port_capacity) {
/* too many ports, and we need to store their address in a closure */
/* TODO(ctiller): make server_port a linked list */
- abort();
+ abort();
}
sp = &s->ports[s->nports++];
sp->server = s;
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index b67683dbfd..9ceffca065 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -382,9 +382,9 @@ static char *win_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static grpc_endpoint_vtable vtable = {
- win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
- win_shutdown, win_destroy, win_get_peer};
+static grpc_endpoint_vtable vtable = {win_read, win_write, win_add_to_pollset,
+ win_add_to_pollset_set, win_shutdown,
+ win_destroy, win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index a8d611c3f2..1304f2067e 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -118,7 +118,7 @@ struct grpc_udp_server {
/* number of pollsets in the pollsets array */
size_t pollset_count;
/* The parent grpc server */
- grpc_server* grpc_server;
+ grpc_server *grpc_server;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -232,7 +232,7 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
sizeof(get_local_ip));
if (rc == 0 && addr->sa_family == AF_INET6) {
-#if !TARGET_OS_IPHONE
+#if !defined(__APPLE__)
rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
sizeof(get_local_ip));
#endif