diff options
author | murgatroid99 <mlumish@google.com> | 2015-10-06 11:16:49 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2015-10-06 11:16:49 -0700 |
commit | cc545461c0bcc819a40370d0259146b9e0a35b6c (patch) | |
tree | 508c691c09fa41df12f260735501c44eb681ee27 /src/core/iomgr | |
parent | 86ef17ada9a11966ea307c720bfa06e63ce09184 (diff) | |
parent | a4aba6e66876d9f3babda8644949fd3cb4bb9745 (diff) |
Resolved merge conflicts with master
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/exec_ctx.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 13 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_posix.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_windows.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 29 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 22 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 54 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 76 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 14 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 4 |
15 files changed, 148 insertions, 97 deletions
diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index aa0610cbea..43df488094 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -61,7 +61,7 @@ struct grpc_exec_ctx { { GRPC_CLOSURE_LIST_INIT } /** Flush any work that has been enqueued onto this grpc_exec_ctx. - * Caller must guarantee that no interfering locks are held. + * Caller must guarantee that no interfering locks are held. * Returns 1 if work was performed, 0 otherwise. */ int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index cf33d74366..cebd863924 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -62,13 +62,13 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline, return INFINITE; } if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { return 0; } timeout = gpr_time_sub(deadline, now); return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { @@ -80,8 +80,9 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { grpc_winsocket *socket; grpc_winsocket_callback_info *info; grpc_closure *closure = NULL; - success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, - &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); + success = GetQueuedCompletionStatus( + g_iocp, &bytes, &completion_key, &overlapped, + deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); if (success == 0 && overlapped == NULL) { return; } @@ -139,7 +140,7 @@ void grpc_iocp_kick(void) { void grpc_iocp_flush(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - + do { grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); } while (grpc_exec_ctx_flush(&exec_ctx)); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index a10399311f..e61fc32925 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -51,13 +51,6 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; -void grpc_kick_poller(void) { - /* Empty. The background callback executor polls periodically. The activity - * the kicker is trying to draw the executor's attention to will be picked up - * either by one of the periodic wakeups or by one of the polling application - * threads. */ -} - void grpc_iomgr_init(void) { g_shutdown = 0; gpr_mu_init(&g_mu); diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index f6474b7e6d..fecb7b9760 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -45,11 +45,8 @@ void grpc_iomgr_platform_init(void) { grpc_register_tracer("tcp", &grpc_tcp_trace); } -void grpc_iomgr_platform_flush(void) { -} +void grpc_iomgr_platform_flush(void) {} -void grpc_iomgr_platform_shutdown(void) { - grpc_fd_global_shutdown(); -} +void grpc_iomgr_platform_shutdown(void) { grpc_fd_global_shutdown(); } #endif /* GRPC_POSIX_SOCKET */ diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c index 93bdc5ec16..14775516bb 100644 --- a/src/core/iomgr/iomgr_windows.c +++ b/src/core/iomgr/iomgr_windows.c @@ -63,9 +63,7 @@ void grpc_iomgr_platform_init(void) { grpc_iocp_init(); } -void grpc_iomgr_platform_flush(void) { - grpc_iocp_flush(); -} +void grpc_iomgr_platform_flush(void) { grpc_iocp_flush(); } void grpc_iomgr_platform_shutdown(void) { grpc_iocp_shutdown(); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index b22eaa6288..faf0a6362b 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -198,7 +198,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( } if (pfds[1].revents) { do { - /* The following epoll_wait never blocks; it has a timeout of 0 */ + /* The following epoll_wait never blocks; it has a timeout of 0 */ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); if (ep_rv < 0) { if (errno != EINTR) { @@ -213,11 +213,15 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); int write_ev = ep_ev[i].events & EPOLLOUT; - if (read_ev || cancel) { - grpc_fd_become_readable(exec_ctx, fd); - } - if (write_ev || cancel) { - grpc_fd_become_writable(exec_ctx, fd); + if (fd == NULL) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } else { + if (read_ev || cancel) { + grpc_fd_become_readable(exec_ctx, fd); + } + if (write_ev || cancel) { + grpc_fd_become_writable(exec_ctx, fd); + } } } } @@ -246,6 +250,8 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, size_t nfds) { size_t i; pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); + struct epoll_event ev; + int err; pollset->vtable = &multipoll_with_epoll_pollset; pollset->data.ptr = h; @@ -255,6 +261,17 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); abort(); } + + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = NULL; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, + GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); + if (err < 0) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", + GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), + strerror(errno)); + } + for (i = 0; i < nfds; i++) { multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0); } diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 63e0b9edb9..1356ebe7a0 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -114,13 +114,16 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( h = pollset->data.ptr; timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); /* TODO(ctiller): perform just one malloc here if we exceed the inline case */ - pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1)); - watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1)); + pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2)); + watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2)); fd_count = 0; - pfd_count = 1; - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd_count = 2; + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); pfds[0].events = POLLIN; - pfds[0].revents = POLLOUT; + pfds[0].revents = 0; + pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfds[1].events = POLLIN; + pfds[1].revents = 0; for (i = 0; i < h->fd_count; i++) { int remove = grpc_fd_is_orphaned(h->fds[i]); for (j = 0; !remove && j < h->del_count; j++) { @@ -143,7 +146,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( h->fd_count = fd_count; gpr_mu_unlock(&pollset->mu); - for (i = 1; i < pfd_count; i++) { + for (i = 2; i < pfd_count; i++) { pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN, POLLOUT, &watchers[i]); } @@ -154,7 +157,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( r = grpc_poll_function(pfds, pfd_count, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; - for (i = 1; i < pfd_count; i++) { + for (i = 2; i < pfd_count; i++) { grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN, pfds[i].revents & POLLOUT); } @@ -167,9 +170,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( /* do nothing */ } else { if (pfds[0].revents & POLLIN) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfds[1].revents & POLLIN) { grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } - for (i = 1; i < pfd_count; i++) { + for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { continue; } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index f9d6aad651..b663780a02 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -57,8 +57,16 @@ GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); +/** Default poll() function - a pointer so that it can be overridden by some + * tests */ grpc_poll_function_type grpc_poll_function = poll; +/** The alarm system needs to be able to wakeup 'some poller' sometimes + * (specifically when a new alarm needs to be triggered earlier than the next + * alarm 'epoch'). + * This wakeup_fd gives us something to alert on when such a case occurs. */ +grpc_wakeup_fd grpc_global_wakeup_fd; + static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next; worker->next->prev = worker->prev; @@ -121,14 +129,18 @@ void grpc_pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); + grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } void grpc_pollset_global_shutdown(void) { + grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + grpc_wakeup_fd_global_destroy(); gpr_tls_destroy(&g_current_thread_poller); gpr_tls_destroy(&g_current_thread_worker); - grpc_wakeup_fd_global_destroy(); } +void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } + /* main interface */ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); @@ -193,6 +205,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } if (grpc_alarm_check(exec_ctx, now, &deadline)) { + gpr_mu_unlock(&pollset->mu); + locked = 0; goto done; } if (pollset->shutting_down) { @@ -294,7 +308,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, } timeout = gpr_time_sub(deadline, now); return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } /* @@ -338,6 +352,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) { if (pollset->shutting_down) { /* We don't care about this pollset anymore. */ if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { + pollset->called_shutdown = 1; finish_shutdown(exec_ctx, pollset); } } else if (grpc_fd_is_orphaned(fd)) { @@ -439,7 +454,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { - struct pollfd pfd[2]; + struct pollfd pfd[3]; grpc_fd *fd; grpc_fd_watcher fd_watcher; int timeout; @@ -452,17 +467,21 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, fd = pollset->data.ptr = NULL; } timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); - pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); pfd[0].events = POLLIN; pfd[0].revents = 0; - nfds = 1; + pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd[1].events = POLLIN; + pfd[1].revents = 0; + nfds = 2; if (fd) { - pfd[1].fd = fd->fd; - pfd[1].revents = 0; + pfd[2].fd = fd->fd; + pfd[2].revents = 0; + GRPC_FD_REF(fd, "basicpoll_begin"); gpr_mu_unlock(&pollset->mu); - pfd[1].events = + pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); - if (pfd[1].events != 0) { + if (pfd[2].events != 0) { nfds++; } } else { @@ -479,8 +498,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { - grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[1].revents & POLLIN, - pfd[1].revents & POLLOUT); + grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN, + pfd[2].revents & POLLOUT); } if (r < 0) { @@ -491,17 +510,24 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, /* do nothing */ } else { if (pfd[0].revents & POLLIN) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfd[1].revents & POLLIN) { grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } - if (nfds > 1) { - if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { + if (nfds > 2) { + if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) { grpc_fd_become_readable(exec_ctx, fd); } - if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) { + if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) { grpc_fd_become_writable(exec_ctx, fd); } } } + + if (fd) { + GRPC_FD_UNREF(fd, "basicpoll_begin"); + } } static void basic_pollset_destroy(grpc_pollset *pollset) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index f996dd1edf..83c5258539 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -129,5 +129,6 @@ int grpc_pollset_has_workers(grpc_pollset *pollset); /* override to allow tests to hook poll() usage */ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; +extern grpc_wakeup_fd grpc_global_wakeup_fd; #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 798b637635..96abaea0b3 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -51,22 +51,21 @@ void grpc_pollset_global_init() { gpr_mu_init(&grpc_polling_mu); g_active_poller = NULL; g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = - g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = - &g_global_root_worker; + g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = + &g_global_root_worker; } -void grpc_pollset_global_shutdown() { - gpr_mu_destroy(&grpc_polling_mu); -} +void grpc_pollset_global_shutdown() { gpr_mu_destroy(&grpc_polling_mu); } -static void remove_worker(grpc_pollset_worker *worker, +static void remove_worker(grpc_pollset_worker *worker, grpc_pollset_worker_link_type type) { worker->links[type].prev->links[type].next = worker->links[type].next; worker->links[type].next->links[type].prev = worker->links[type].prev; worker->links[type].next = worker->links[type].prev = worker; } -static int has_workers(grpc_pollset_worker *root, grpc_pollset_worker_link_type type) { +static int has_workers(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type) { return root->links[type].next != root; } @@ -81,24 +80,22 @@ static grpc_pollset_worker *pop_front_worker( } } -static void push_back_worker(grpc_pollset_worker *root, - grpc_pollset_worker_link_type type, +static void push_back_worker(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type, grpc_pollset_worker *worker) { worker->links[type].next = root; worker->links[type].prev = worker->links[type].next->links[type].prev; - worker->links[type].prev->links[type].next = - worker->links[type].next->links[type].prev = - worker; + worker->links[type].prev->links[type].next = + worker->links[type].next->links[type].prev = worker; } -static void push_front_worker(grpc_pollset_worker *root, - grpc_pollset_worker_link_type type, +static void push_front_worker(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type, grpc_pollset_worker *worker) { worker->links[type].prev = root; worker->links[type].next = worker->links[type].prev->links[type].next; - worker->links[type].prev->links[type].next = - worker->links[type].next->links[type].prev = - worker; + worker->links[type].prev->links[type].next = + worker->links[type].next->links[type].prev = worker; } /* There isn't really any such thing as a pollset under Windows, due to the @@ -108,9 +105,9 @@ static void push_front_worker(grpc_pollset_worker *root, void grpc_pollset_init(grpc_pollset *pollset) { memset(pollset, 0, sizeof(*pollset)); - pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = - pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = - &pollset->root_worker; + pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + &pollset->root_worker; } void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -126,18 +123,16 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_unlock(&grpc_polling_mu); } -void grpc_pollset_destroy(grpc_pollset *pollset) { -} +void grpc_pollset_destroy(grpc_pollset *pollset) {} void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline) { int added_worker = 0; - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = - NULL; + worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; worker->kicked = 0; worker->pollset = pollset; gpr_cv_init(&worker->cv); @@ -157,9 +152,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->is_iocp_worker = 0; g_active_poller = NULL; /* try to get a worker from this pollsets worker list */ - next_worker = pop_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); - /* try to get a worker from the global list */ - next_worker = pop_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + next_worker = pop_front_worker(&pollset->root_worker, + GRPC_POLLSET_WORKER_LINK_POLLSET); + if (next_worker == NULL) { + /* try to get a worker from the global list */ + next_worker = pop_front_worker(&g_global_root_worker, + GRPC_POLLSET_WORKER_LINK_GLOBAL); + } if (next_worker != NULL) { next_worker->kicked = 1; gpr_cv_signal(&next_worker->cv); @@ -171,8 +170,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } goto done; } - push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, worker); - push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, worker); + push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, + worker); + push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, + worker); added_worker = 1; while (!worker->kicked) { if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) { @@ -198,9 +199,11 @@ done: void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { - for (specific_worker = p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next; + for (specific_worker = + p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next; specific_worker != &p->root_worker; - specific_worker = specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) { + specific_worker = + specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) { specific_worker->kicked = 1; gpr_cv_signal(&specific_worker->cv); } @@ -219,7 +222,8 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { } } } else { - specific_worker = pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); + specific_worker = + pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); if (specific_worker != NULL) { grpc_pollset_kick(p, specific_worker); } else if (p->is_iocp_worker) { @@ -230,4 +234,6 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { } } +void grpc_kick_poller(void) { grpc_iocp_kick(); } + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 346566866a..aca2691c41 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -141,7 +141,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) { err = getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { - gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); + gpr_log(GPR_ERROR, "failed to connect to '%s': getsockopt(ERROR): %s", + ac->addr_str, strerror(errno)); goto finish; } else if (so_error != 0) { if (so_error == ENOBUFS) { @@ -166,10 +167,14 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) { } else { switch (so_error) { case ECONNREFUSED: - gpr_log(GPR_ERROR, "socket error: connection refused"); + gpr_log( + GPR_ERROR, + "failed to connect to '%s': socket error: connection refused", + ac->addr_str); break; default: - gpr_log(GPR_ERROR, "socket error: %d", so_error); + gpr_log(GPR_ERROR, "failed to connect to '%s': socket error: %d", + ac->addr_str, so_error); break; } goto finish; @@ -181,7 +186,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) { goto finish; } } else { - gpr_log(GPR_ERROR, "on_writable failed during connect"); + gpr_log(GPR_ERROR, "failed to connect to '%s': timeout occurred", + ac->addr_str); goto finish; } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 54ebad7dbc..4a57037a72 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -292,7 +292,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { unwind_slice_idx = tcp->outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && - iov_size != MAX_WRITE_IOVEC; + iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = GPR_SLICE_START_PTR( @@ -441,7 +441,7 @@ static char *tcp_get_peer(grpc_endpoint *ep) { } static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, tcp_shutdown, tcp_destroy, tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 4b11ab0f06..db3319b3c6 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -382,7 +382,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, if (s->nports == s->port_capacity) { /* too many ports, and we need to store their address in a closure */ /* TODO(ctiller): make server_port a linked list */ - abort(); + abort(); } sp = &s->ports[s->nports++]; sp->server = s; diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index b67683dbfd..9ceffca065 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -382,9 +382,9 @@ static char *win_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static grpc_endpoint_vtable vtable = { - win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_peer}; +static grpc_endpoint_vtable vtable = {win_read, win_write, win_add_to_pollset, + win_add_to_pollset_set, win_shutdown, + win_destroy, win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index a8d611c3f2..1304f2067e 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -118,7 +118,7 @@ struct grpc_udp_server { /* number of pollsets in the pollsets array */ size_t pollset_count; /* The parent grpc server */ - grpc_server* grpc_server; + grpc_server *grpc_server; }; grpc_udp_server *grpc_udp_server_create(void) { @@ -232,7 +232,7 @@ static int prepare_socket(int fd, const struct sockaddr *addr, rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, sizeof(get_local_ip)); if (rc == 0 && addr->sa_family == AF_INET6) { -#if !TARGET_OS_IPHONE +#if !defined(__APPLE__) rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, sizeof(get_local_ip)); #endif |