diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/alarm.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_windows.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 1 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.c | 12 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 33 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 11 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 6 |
9 files changed, 54 insertions, 26 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 931f746f75..68d33b9cf6 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -361,7 +361,9 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); - return run_some_expired_alarms(drop_mu, now, next, 1); + return run_some_expired_alarms( + drop_mu, now, next, + gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); } gpr_timespec grpc_alarm_list_next_timeout(void) { diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index f2b44aad03..744fe7656c 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -50,6 +50,10 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { ep->vtable->add_to_pollset(ep, pollset); } +void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) { + ep->vtable->add_to_pollset_set(ep, pollset_set); +} + void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index ee0becf3d6..a2216925f9 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/pollset_set.h" #include <grpc/support/slice.h> #include <grpc/support/time.h> @@ -70,6 +71,7 @@ struct grpc_endpoint_vtable { size_t nslices, grpc_endpoint_write_cb cb, void *user_data); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); + void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep); @@ -101,6 +103,7 @@ void grpc_endpoint_destroy(grpc_endpoint *ep); /* Add an endpoint to a pollset, so that when the pollset is polled, events from this endpoint are considered */ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset); +void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set); struct grpc_endpoint { const grpc_endpoint_vtable *vtable; diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c index 7d81470a78..e8295df8b3 100644 --- a/src/core/iomgr/endpoint_pair_windows.c +++ b/src/core/iomgr/endpoint_pair_windows.c @@ -81,8 +81,10 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), "endpoint:server"); - p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), "endpoint:client"); + p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), + "endpoint:server"); + p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), + "endpoint:client"); return p; } diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index aa4bc6e20d..fdc9adf4af 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -147,7 +147,6 @@ void grpc_iomgr_shutdown(void) { continue; } if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { - gpr_log(GPR_DEBUG, "got late alarm"); continue; } if (g_root_object.next != &g_root_object) { diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 5ff7df1dcd..2076ac70ef 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -60,7 +60,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset) { - size_t i; + size_t i, j; gpr_mu_lock(&pollset_set->mu); if (pollset_set->pollset_count == pollset_set->pollset_capacity) { pollset_set->pollset_capacity = @@ -70,9 +70,15 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, sizeof(*pollset_set->pollsets)); } pollset_set->pollsets[pollset_set->pollset_count++] = pollset; - for (i = 0; i < pollset_set->fd_count; i++) { - grpc_pollset_add_fd(pollset, pollset_set->fds[i]); + for (i = 0, j = 0; i < pollset_set->fd_count; i++) { + if (grpc_fd_is_orphaned(pollset_set->fds[i])) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + } else { + grpc_pollset_add_fd(pollset, pollset_set->fds[i]); + pollset_set->fds[j++] = pollset_set->fds[i]; + } } + pollset_set->fd_count = j; gpr_mu_unlock(&pollset_set->mu); } diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 392eda999e..9572ce5980 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -89,11 +89,11 @@ error: return 0; } -static void on_alarm(void *acp, int success) { +static void tc_on_alarm(void *acp, int success) { int done; async_connect *ac = acp; gpr_mu_lock(&ac->mu); - if (ac->fd != NULL && success) { + if (ac->fd != NULL) { grpc_fd_shutdown(ac->fd); } done = (--ac->refs == 0); @@ -110,11 +110,17 @@ static void on_writable(void *acp, int success) { int so_error = 0; socklen_t so_error_size; int err; - int fd = ac->fd->fd; int done; grpc_endpoint *ep = NULL; void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; + grpc_fd *fd; + + gpr_mu_lock(&ac->mu); + GPR_ASSERT(ac->fd); + fd = ac->fd; + ac->fd = NULL; + gpr_mu_unlock(&ac->mu); grpc_alarm_cancel(&ac->alarm); @@ -122,7 +128,7 @@ static void on_writable(void *acp, int success) { if (success) { do { so_error_size = sizeof(so_error); - err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); + err = getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); @@ -145,7 +151,7 @@ static void on_writable(void *acp, int success) { don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); gpr_mu_unlock(&ac->mu); - grpc_fd_notify_on_write(ac->fd, &ac->write_closure); + grpc_fd_notify_on_write(fd, &ac->write_closure); return; } else { switch (so_error) { @@ -159,9 +165,9 @@ static void on_writable(void *acp, int success) { goto finish; } } else { - grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, - ac->addr_str); + grpc_pollset_set_del_fd(ac->interested_parties, fd); + ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + fd = NULL; goto finish; } } else { @@ -172,11 +178,10 @@ static void on_writable(void *acp, int success) { abort(); finish: - if (ep == NULL) { - grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan"); - } else { - ac->fd = NULL; + if (fd != NULL) { + grpc_pollset_set_del_fd(ac->interested_parties, fd); + grpc_fd_orphan(fd, NULL, "tcp_client_orphan"); + fd = NULL; } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); @@ -260,7 +265,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); + tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 63a8a2720e..24fee0596f 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -572,14 +572,21 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_pollset_add_fd(pollset, tcp->em_fd); } +static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); +} + static char *grpc_tcp_get_peer(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return gpr_strdup(tcp->peer_string); } static const grpc_endpoint_vtable vtable = { - grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, - grpc_tcp_shutdown, grpc_tcp_destroy, grpc_tcp_get_peer}; + grpc_tcp_notify_on_read, grpc_tcp_write, + grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set, + grpc_tcp_shutdown, grpc_tcp_destroy, + grpc_tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, const char *peer_string) { diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index d68e6aee79..38ae74abec 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -401,9 +401,9 @@ static char *win_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static grpc_endpoint_vtable vtable = { - win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy, win_get_peer -}; +static grpc_endpoint_vtable vtable = {win_notify_on_read, win_write, + win_add_to_pollset, win_shutdown, + win_destroy, win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp)); |