diff options
author | David Garcia Quintas <dgq@google.com> | 2015-12-16 17:36:04 -0800 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-12-16 17:36:04 -0800 |
commit | 7052ac25e60e137514d9a201a86eeb9b29b03d24 (patch) | |
tree | 2ce8f32319129e346a27d3b29a9b8d6b440cdd6c /src/core/iomgr | |
parent | 886b7d19bafbb61e84141e66a040da8c27781c44 (diff) | |
parent | 788767a18f918131268ca88985b3547a8257e973 (diff) |
Merge branch 'master' of github.com:grpc/grpc into grpclb_api
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/endpoint_pair_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 22 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 19 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 34 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set.h | 22 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.c | 51 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_windows.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 18 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 14 | ||||
-rw-r--r-- | src/core/iomgr/timer.c | 12 | ||||
-rw-r--r-- | src/core/iomgr/timer_internal.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/wakeup_fd_posix.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/wakeup_fd_posix.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 3 |
20 files changed, 126 insertions, 119 deletions
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index deae9c6875..56f6f146fd 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -36,6 +36,7 @@ #ifdef GPR_POSIX_SOCKET #include "src/core/iomgr/endpoint_pair.h" +#include "src/core/iomgr/socket_utils_posix.h" #include <errno.h> #include <fcntl.h> @@ -56,6 +57,8 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); + GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[0])); + GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1])); } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 2be0ea235f..00710d83bd 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -43,6 +43,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> #define CLOSURE_NOT_READY ((grpc_closure *)0) @@ -158,7 +159,10 @@ void grpc_fd_global_shutdown(void) { grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); - grpc_iomgr_register_object(&r->iomgr_object, name); + char *name2; + gpr_asprintf(&name2, "%s fd=%d", name, fd); + grpc_iomgr_register_object(&r->iomgr_object, name2); + gpr_free(name2); #ifdef GRPC_FD_REF_COUNT_DEBUG gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name); #endif diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index d628ef3aaf..df4eb64d4c 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -170,6 +170,7 @@ void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); /* Reference counting for fds */ +/*#define GRPC_FD_REF_COUNT_DEBUG*/ #ifdef GRPC_FD_REF_COUNT_DEBUG void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 1f1bf47e98..6e31efa013 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -123,26 +123,6 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, } } -static void multipoll_with_epoll_pollset_del_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset) { - pollset_hdr *h = pollset->data.ptr; - int err; - - if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); - } - - /* Note that this can race with concurrent poll, but that should be fine since - * at worst it creates a spurious read event on a reused grpc_fd object. */ - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); - if (err < 0) { - gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd->fd, - strerror(errno)); - } -} - /* TODO(klempner): We probably want to turn this down a bit */ #define GRPC_EPOLL_MAX_EVENTS 1000 @@ -235,7 +215,7 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable multipoll_with_epoll_pollset = { - multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd, + multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_maybe_work_and_unlock, multipoll_with_epoll_pollset_finish_shutdown, multipoll_with_epoll_pollset_destroy}; diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 09f04b64b9..b619b8c3db 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -82,23 +82,6 @@ exit: } } -static void multipoll_with_poll_pollset_del_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset) { - /* will get removed next poll cycle */ - pollset_hdr *h = pollset->data.ptr; - if (h->del_count == h->del_capacity) { - h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2); - h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity); - } - h->dels[h->del_count++] = fd; - GRPC_FD_REF(fd, "multipoller_del"); - if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); - } -} - static void multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { @@ -212,7 +195,7 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable multipoll_with_poll_pollset = { - multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, + multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_maybe_work_and_unlock, multipoll_with_poll_pollset_finish_shutdown, multipoll_with_poll_pollset_destroy}; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 0a5577baea..9195344758 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -232,21 +232,7 @@ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to del_fd above is - not respected, the code will deadlock (in a way that we have a chance of - debugging) */ -#ifndef NDEBUG - gpr_mu_lock(&pollset->mu); - gpr_mu_unlock(&pollset->mu); -#endif -} - -void grpc_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { - gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(exec_ctx, pollset, fd, 1); -/* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to del_fd above is + our lock - meaning that if the unlocking flag passed to add_fd above is not respected, the code will deadlock (in a way that we have a chance of debugging) */ #ifndef NDEBUG @@ -547,19 +533,6 @@ exit: } } -static void basic_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd, int and_unlock_pollset) { - GPR_ASSERT(fd); - if (fd == pollset->data.ptr) { - GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); - pollset->data.ptr = NULL; - } - - if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); - } -} - static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, @@ -651,9 +624,8 @@ static void basic_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_del_fd, - basic_pollset_maybe_work_and_unlock, basic_pollset_destroy, - basic_pollset_destroy}; + basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock, + basic_pollset_destroy, basic_pollset_destroy}; static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { pollset->vtable = &basic_pollset; diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index e4593728bd..29de4a2026 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -86,8 +86,6 @@ typedef struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd, int and_unlock_pollset); - void (*del_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - struct grpc_fd *fd, int and_unlock_pollset); void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now); @@ -100,10 +98,6 @@ struct grpc_pollset_vtable { /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); -/* Force remove an fd from a pollset (normally they are removed on the next - poll after an fd is orphaned) */ -void grpc_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - struct grpc_fd *fd); /* Returns the fd to listen on for kicks */ int grpc_kick_read_fd(grpc_pollset *p); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 0fdcba01a4..09c04438f7 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -49,13 +49,19 @@ #include "src/core/iomgr/pollset_set_windows.h" #endif -void grpc_pollset_set_init(grpc_pollset_set* pollset_set); -void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set); -void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, - grpc_pollset* pollset); -void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, - grpc_pollset* pollset); +void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); +void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset); +void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset); +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item); +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index c86ed3d5da..4ec92202e3 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -52,9 +52,10 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { size_t i; gpr_mu_destroy(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } gpr_free(pollset_set->pollsets); + gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); } @@ -73,7 +74,7 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, pollset_set->pollsets[pollset_set->pollset_count++] = pollset; for (i = 0, j = 0; i < pollset_set->fd_count; i++) { if (grpc_fd_is_orphaned(pollset_set->fds[i])) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } else { grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); pollset_set->fds[j++] = pollset_set->fds[i]; @@ -99,6 +100,46 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&pollset_set->mu); } +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i, j; + gpr_mu_lock(&bag->mu); + if (bag->pollset_set_count == bag->pollset_set_capacity) { + bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); + bag->pollset_sets = + gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + } + bag->pollset_sets[bag->pollset_set_count++] = item; + for (i = 0, j = 0; i < bag->fd_count; i++) { + if (grpc_fd_is_orphaned(bag->fds[i])) { + GRPC_FD_UNREF(bag->fds[i], "pollset_set"); + } else { + grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); + bag->fds[j++] = bag->fds[i]; + } + } + bag->fd_count = j; + gpr_mu_unlock(&bag->mu); +} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i; + gpr_mu_lock(&bag->mu); + for (i = 0; i < bag->pollset_set_count; i++) { + if (bag->pollset_sets[i] == item) { + bag->pollset_set_count--; + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); + break; + } + } + gpr_mu_unlock(&bag->mu); +} + void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd) { size_t i; @@ -113,6 +154,9 @@ void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, for (i = 0; i < pollset_set->pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } gpr_mu_unlock(&pollset_set->mu); } @@ -129,6 +173,9 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, break; } } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } gpr_mu_unlock(&pollset_set->mu); } diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 05234fb642..4820a61e4b 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -44,6 +44,10 @@ typedef struct grpc_pollset_set { size_t pollset_capacity; grpc_pollset **pollsets; + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + size_t fd_count; size_t fd_capacity; grpc_fd **fds; diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 53d5d3dcd4..157b46ec32 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -49,4 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index a89ee02d34..835675c390 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -411,7 +411,6 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, size_t addr_len) { - int allocated_port = -1; grpc_tcp_listener *sp; grpc_tcp_listener *sp2 = NULL; int fd; @@ -464,14 +463,13 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, addr_len = sizeof(wild6); fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); sp = add_socket_to_server(s, fd, addr, addr_len); - allocated_port = sp->port; if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ - if (port == 0 && allocated_port > 0) { - grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port); + if (port == 0 && sp != NULL) { + grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port); sp2 = sp; } addr = (struct sockaddr *)&wild4; @@ -488,8 +486,8 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, addr_len = sizeof(addr4_copy); } sp = add_socket_to_server(s, fd, addr, addr_len); - sp->sibling = sp2; - if (sp2) sp2->is_sibling = 1; + if (sp != NULL) sp->sibling = sp2; + if (sp2 != NULL) sp2->is_sibling = 1; done: gpr_free(allocated_addr); @@ -534,8 +532,12 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, } int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { - grpc_tcp_listener *sp = listener; - return sp->port; + if (listener != NULL) { + grpc_tcp_listener *sp = listener; + return sp->port; + } else { + return 0; + } } void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index a2425cd4d2..583cab4890 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -486,8 +486,12 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, } int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) { - grpc_tcp_listener *sp = listener; - return sp->port; + if (listener != NULL) { + grpc_tcp_listener *sp = listener; + return sp->port; + } else { + return 0; + } } void grpc_tcp_listener_ref(grpc_tcp_listener *listener) { diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 5ff78231bd..cc7f7ff8d2 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -197,7 +197,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->read_slice = gpr_slice_malloc(8192); - buffer.len = GPR_SLICE_LENGTH(tcp->read_slice); + buffer.len = (ULONG)GPR_SLICE_LENGTH( + tcp->read_slice); // we know slice size fits in 32bit. buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice); TCP_REF(tcp, "read"); @@ -273,6 +274,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF local_buffers[16]; WSABUF *allocated = NULL; WSABUF *buffers = local_buffers; + size_t len; if (tcp->shutting_down) { grpc_exec_ctx_enqueue(exec_ctx, cb, 0); @@ -281,19 +283,21 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; tcp->write_slices = slices; - + GPR_ASSERT(tcp->write_slices->count <= UINT_MAX); if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) { buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count); allocated = buffers; } for (i = 0; i < tcp->write_slices->count; i++) { - buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]); + len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]); + GPR_ASSERT(len <= ULONG_MAX); + buffers[i].len = (ULONG)len; buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]); } /* First, let's try a synchronous, non-blocking write. */ - status = WSASend(socket->socket, buffers, tcp->write_slices->count, + status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count, &bytes_sent, 0, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); @@ -322,7 +326,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same operation, this time asynchronously. */ memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED)); - status = WSASend(socket->socket, buffers, tcp->write_slices->count, + status = WSASend(socket->socket, buffers, (DWORD)tcp->write_slices->count, &bytes_sent, 0, &socket->write_info.overlapped, NULL); if (allocated) gpr_free(allocated); diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index 66fafe75ad..bbf9800049 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -126,8 +126,8 @@ static double ts_to_dbl(gpr_timespec ts) { static gpr_timespec dbl_to_ts(double d) { gpr_timespec ts; - ts.tv_sec = (time_t)d; - ts.tv_nsec = (int)(1e9 * (d - (double)ts.tv_sec)); + ts.tv_sec = (gpr_int64)d; + ts.tv_nsec = (gpr_int32)(1e9 * (d - (double)ts.tv_sec)); ts.clock_type = GPR_TIMESPAN; return ts; } @@ -343,11 +343,3 @@ int grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, exec_ctx, now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); } - -gpr_timespec grpc_timer_list_next_timeout(void) { - gpr_timespec out; - gpr_mu_lock(&g_mu); - out = g_shard_queue[0]->min_deadline; - gpr_mu_unlock(&g_mu); - return out; -} diff --git a/src/core/iomgr/timer_internal.h b/src/core/iomgr/timer_internal.h index f180eca36e..f182e73764 100644 --- a/src/core/iomgr/timer_internal.h +++ b/src/core/iomgr/timer_internal.h @@ -54,8 +54,6 @@ int grpc_timer_check(grpc_exec_ctx* exec_ctx, gpr_timespec now, void grpc_timer_list_init(gpr_timespec now); void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx); -gpr_timespec grpc_timer_list_next_timeout(void); - /* the following must be implemented by each iomgr implementation */ void grpc_kick_poller(void); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 782fbd9f46..28f1bfae26 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -38,6 +38,7 @@ #include <grpc/support/port_platform.h> +#ifdef GRPC_NEED_UDP #ifdef GPR_POSIX_SOCKET #include "src/core/iomgr/udp_server.h" @@ -435,3 +436,4 @@ void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len, } #endif +#endif diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c index d09fb78d12..f40be081b0 100644 --- a/src/core/iomgr/wakeup_fd_posix.c +++ b/src/core/iomgr/wakeup_fd_posix.c @@ -40,19 +40,17 @@ #include <stddef.h> static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL; +int grpc_allow_specialized_wakeup_fd = 1; void grpc_wakeup_fd_global_init(void) { - if (grpc_specialized_wakeup_fd_vtable.check_availability()) { + if (grpc_allow_specialized_wakeup_fd && + grpc_specialized_wakeup_fd_vtable.check_availability()) { wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable; } else { wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable; } } -void grpc_wakeup_fd_global_init_force_fallback(void) { - wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable; -} - void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h index fe71b5abe9..ffd60d1d4e 100644 --- a/src/core/iomgr/wakeup_fd_posix.h +++ b/src/core/iomgr/wakeup_fd_posix.h @@ -85,6 +85,8 @@ struct grpc_wakeup_fd { int write_fd; }; +extern int grpc_allow_specialized_wakeup_fd; + #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) void grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info); diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index 2e30178131..d2a1c34612 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -103,6 +103,9 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); + if (grpc_closure_list_empty(workqueue->closure_list)) { + grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + } grpc_closure_list_move(&exec_ctx->closure_list, &workqueue->closure_list); gpr_mu_unlock(&workqueue->mu); } |