diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/alarm.c | 10 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_posix.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_windows.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 26 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 17 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 15 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 53 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/sockaddr_utils.c | 33 | ||||
-rw-r--r-- | src/core/iomgr/sockaddr_utils.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 20 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_windows.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 28 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 31 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 13 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.h | 2 |
22 files changed, 225 insertions, 71 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 5860834de3..931f746f75 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -36,6 +36,7 @@ #include "src/core/iomgr/alarm_heap.h" #include "src/core/iomgr/alarm_internal.h" #include "src/core/iomgr/time_averaged_stats.h" +#include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> @@ -67,6 +68,7 @@ typedef struct { static gpr_mu g_mu; /* Allow only one run_some_expired_alarms at once */ static gpr_mu g_checker_mu; +static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; @@ -85,6 +87,7 @@ void grpc_alarm_list_init(gpr_timespec now) { gpr_mu_init(&g_mu); gpr_mu_init(&g_checker_mu); + g_clock_type = now.clock_type; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -102,7 +105,8 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown(void) { int i; - while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0)) + while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, + 0)) ; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -127,6 +131,7 @@ static gpr_timespec dbl_to_ts(double d) { gpr_timespec ts; ts.tv_sec = d; ts.tv_nsec = 1e9 * (d - ts.tv_sec); + ts.clock_type = GPR_TIMESPAN; return ts; } @@ -173,6 +178,8 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, gpr_timespec now) { int is_first_alarm = 0; shard_type *shard = &g_shards[shard_idx(alarm)]; + GPR_ASSERT(deadline.clock_type == g_clock_type); + GPR_ASSERT(now.clock_type == g_clock_type); alarm->cb = alarm_cb; alarm->cb_arg = alarm_cb_arg; alarm->deadline = deadline; @@ -353,6 +360,7 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, } int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { + GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_alarms(drop_mu, now, next, 1); } diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 96487958a7..f2b44aad03 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -53,3 +53,7 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } + +char *grpc_endpoint_get_peer(grpc_endpoint *ep) { + return ep->vtable->get_peer(ep); +} diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index 881e851800..ee0becf3d6 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -72,12 +72,15 @@ struct grpc_endpoint_vtable { void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); + char *(*get_peer)(grpc_endpoint *ep); }; /* When data is available on the connection, calls the callback with slices. */ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, void *user_data); +char *grpc_endpoint_get_peer(grpc_endpoint *ep); + /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index fa2d2555d6..deae9c6875 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -66,12 +66,12 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = - grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, + "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = - grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, + "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c index c6790b2937..7d81470a78 100644 --- a/src/core/iomgr/endpoint_pair_windows.c +++ b/src/core/iomgr/endpoint_pair_windows.c @@ -81,8 +81,8 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client")); - p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server")); + p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), "endpoint:server"); + p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), "endpoint:client"); return p; } diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 3d3a193d00..8741241fb8 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -157,7 +157,7 @@ void grpc_iocp_shutdown(void) { BOOL success; gpr_event_set(&g_shutdown_iocp, (void *)1); grpc_iocp_kick(); - gpr_event_wait(&g_iocp_done, gpr_inf_future); + gpr_event_wait(&g_iocp_done, gpr_inf_future(GPR_CLOCK_REALTIME)); success = CloseHandle(g_iocp); GPR_ASSERT(success); } diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index cca92d3b32..aa4bc6e20d 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -57,9 +57,9 @@ static grpc_iomgr_object g_root_object; static void background_callback_executor(void *ignored) { gpr_mu_lock(&g_mu); while (!g_shutdown) { - gpr_timespec deadline = gpr_inf_future; - gpr_timespec short_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100)); + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + gpr_timespec short_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN)); if (g_cbs_head) { grpc_iomgr_closure *closure = g_cbs_head; g_cbs_head = closure->next; @@ -67,7 +67,7 @@ static void background_callback_executor(void *ignored) { gpr_mu_unlock(&g_mu); closure->cb(closure->cb_arg, closure->success); gpr_mu_lock(&g_mu); - } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_REALTIME), + } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC), &deadline)) { } else { gpr_mu_unlock(&g_mu); @@ -88,9 +88,10 @@ void grpc_kick_poller(void) { void grpc_iomgr_init(void) { gpr_thd_id id; + g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); - grpc_alarm_list_init(gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_iomgr_platform_init(); @@ -110,8 +111,8 @@ static size_t count_objects(void) { void grpc_iomgr_shutdown(void) { grpc_iomgr_object *obj; grpc_iomgr_closure *closure; - gpr_timespec shutdown_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10)); + gpr_timespec shutdown_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN)); gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); gpr_mu_lock(&g_mu); @@ -119,7 +120,7 @@ void grpc_iomgr_shutdown(void) { while (g_cbs_head != NULL || g_root_object.next != &g_root_object) { if (gpr_time_cmp( gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time), - gpr_time_from_seconds(1)) >= 0) { + gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { if (g_cbs_head != NULL && g_root_object.next != &g_root_object) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed and executing " @@ -145,14 +146,14 @@ void grpc_iomgr_shutdown(void) { } while (g_cbs_head); continue; } - if (grpc_alarm_check(&g_mu, gpr_inf_future, NULL)) { + if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { gpr_log(GPR_DEBUG, "got late alarm"); continue; } if (g_root_object.next != &g_root_object) { int timeout = 0; - gpr_timespec short_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100)); + gpr_timespec short_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { timeout = 1; @@ -174,7 +175,8 @@ void grpc_iomgr_shutdown(void) { gpr_mu_unlock(&g_mu); grpc_kick_poller(); - gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); + gpr_event_wait(&g_background_callback_executor_done, + gpr_inf_future(GPR_CLOCK_REALTIME)); grpc_alarm_list_shutdown(); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 3746c8edaf..d697b59e4c 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -50,12 +50,17 @@ typedef struct { } pollset_hdr; static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; grpc_fd_watcher watcher; + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } + /* We pretend to be polling whilst adding an fd to keep the fd from being closed during the add. This may result in a spurious wakeup being assigned to this pollset whilst adding, but that should be benign. */ @@ -76,9 +81,15 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, } static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; int err; + + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } + /* Note that this can race with concurrent poll, but that should be fine since * at worst it creates a spurious read event on a reused grpc_fd object. */ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); @@ -183,7 +194,7 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, abort(); } for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(pollset, fds[i]); + multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0); } grpc_wakeup_fd_create(&h->wakeup_fd); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 7b717bd159..0084e83953 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -66,12 +66,13 @@ typedef struct { } pollset_hdr; static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { size_t i; pollset_hdr *h = pollset->data.ptr; /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ for (i = 0; i < h->fd_count; i++) { - if (h->fds[i] == fd) return; + if (h->fds[i] == fd) goto exit; } if (h->fd_count == h->fd_capacity) { h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); @@ -79,10 +80,15 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, } h->fds[h->fd_count++] = fd; GRPC_FD_REF(fd, "multipoller"); +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { /* will get removed next poll cycle */ pollset_hdr *h = pollset->data.ptr; if (h->del_count == h->del_capacity) { @@ -91,6 +97,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, } h->dels[h->del_count++] = fd; GRPC_FD_REF(fd, "multipoller_del"); + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void end_polling(grpc_pollset *pollset) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 85101764d2..c8646af615 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -105,14 +105,28 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd); + pollset->vtable->add_fd(pollset, fd, 1); + /* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ +#ifndef NDEBUG + gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); +#endif } void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd); + pollset->vtable->del_fd(pollset, fd, 1); + /* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ +#ifndef NDEBUG + gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); +#endif } static void finish_shutdown(grpc_pollset *pollset) { @@ -122,7 +136,7 @@ static void finish_shutdown(grpc_pollset *pollset) { int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { /* pollset->mu already held */ - gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(now, deadline) > 0) { return 0; } @@ -191,17 +205,17 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { gpr_timespec timeout; static const int max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { return -1; } - if (gpr_time_cmp( - deadline, - gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) { + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { return 0; } timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis( - gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1))); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); } /* @@ -257,7 +271,7 @@ static void basic_do_promote(void *args, int success) { } else if (grpc_fd_is_orphaned(fd)) { /* Don't try to add it to anything, we'll drop our ref on it below */ } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(pollset, fd); + pollset->vtable->add_fd(pollset, fd, 0); } else if (fd != pollset->data.ptr) { grpc_fd *fds[2]; fds[0] = pollset->data.ptr; @@ -287,10 +301,11 @@ static void basic_do_promote(void *args, int success) { GRPC_FD_UNREF(fd, "basicpoll_add"); } -static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, + int and_unlock_pollset) { grpc_unary_promote_args *up_args; GPR_ASSERT(fd); - if (fd == pollset->data.ptr) return; + if (fd == pollset->data.ptr) goto exit; if (!pollset->counter) { /* Fast path -- no in flight cbs */ @@ -313,7 +328,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { pollset->data.ptr = fd; GRPC_FD_REF(fd, "basicpoll"); } - return; + goto exit; } /* Now we need to promote. This needs to happen when we're not polling. Since @@ -329,14 +344,24 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { grpc_iomgr_add_callback(&up_args->promotion_closure); grpc_pollset_kick(pollset); + +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } -static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, + int and_unlock_pollset) { GPR_ASSERT(fd); if (fd == pollset->data.ptr) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); pollset->data.ptr = NULL; } + + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void basic_pollset_maybe_work(grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 53585a2886..37de1276d1 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -66,8 +66,10 @@ typedef struct grpc_pollset { } grpc_pollset; struct grpc_pollset_vtable { - void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); - void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd, + int and_unlock_pollset); + void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd, + int and_unlock_pollset); void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback); void (*kick)(grpc_pollset *pollset); diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 24226cc980..a9c4739c7c 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -70,7 +70,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { gpr_timespec now; - now = gpr_now(GPR_CLOCK_REALTIME); + now = gpr_now(GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(now, deadline) > 0) { return 0 /* GPR_FALSE */; } diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index e91b94f8c8..71ac12e87b 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -36,12 +36,18 @@ #include <errno.h> #include <string.h> -#include "src/core/support/string.h" +#ifdef GPR_POSIX_SOCKET +#include <sys/un.h> +#endif + +#include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/support/string.h" + static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; @@ -161,6 +167,31 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, return ret; } +char *grpc_sockaddr_to_uri(const struct sockaddr *addr) { + char *temp; + char *result; + + switch (addr->sa_family) { + case AF_INET: + grpc_sockaddr_to_string(&temp, addr, 0); + gpr_asprintf(&result, "ipv4:%s", temp); + gpr_free(temp); + return result; + case AF_INET6: + grpc_sockaddr_to_string(&temp, addr, 0); + gpr_asprintf(&result, "ipv6:%s", temp); + gpr_free(temp); + return result; +#ifdef GPR_POSIX_SOCKET + case AF_UNIX: + gpr_asprintf(&result, "unix:%s", ((struct sockaddr_un *)addr)->sun_path); + return result; +#endif + } + + return NULL; +} + int grpc_sockaddr_get_port(const struct sockaddr *addr) { switch (addr->sa_family) { case AF_INET: diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h index bdfb83479b..99f1ed54da 100644 --- a/src/core/iomgr/sockaddr_utils.h +++ b/src/core/iomgr/sockaddr_utils.h @@ -84,4 +84,6 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port); int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, int normalize); +char *grpc_sockaddr_to_uri(const struct sockaddr *addr); + #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index dc0489e64f..392eda999e 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -64,6 +64,7 @@ typedef struct { int refs; grpc_iomgr_closure write_closure; grpc_pollset_set *interested_parties; + char *addr_str; } async_connect; static int prepare_socket(const struct sockaddr *addr, int fd) { @@ -99,6 +100,7 @@ static void on_alarm(void *acp, int success) { gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_str); gpr_free(ac); } } @@ -114,6 +116,8 @@ static void on_writable(void *acp, int success) { void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; + grpc_alarm_cancel(&ac->alarm); + gpr_mu_lock(&ac->mu); if (success) { do { @@ -156,7 +160,8 @@ static void on_writable(void *acp, int success) { } } else { grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, + ac->addr_str); goto finish; } } else { @@ -177,9 +182,8 @@ finish: gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_str); gpr_free(ac); - } else { - grpc_alarm_cancel(&ac->alarm); } cb(cb_arg, ep); } @@ -223,13 +227,13 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), err = connect(fd, addr, addr_len); } while (err < 0 && errno == EINTR); - grpc_sockaddr_to_string(&addr_str, addr, 1); + addr_str = grpc_sockaddr_to_uri(addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); fdobj = grpc_fd_create(fd, name); if (err >= 0) { - cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); goto done; } @@ -247,14 +251,16 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->cb_arg = arg; ac->fd = fdobj; ac->interested_parties = interested_parties; + ac->addr_str = addr_str; + addr_str = NULL; gpr_mu_init(&ac->mu); ac->refs = 2; ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; gpr_mu_lock(&ac->mu); - grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, - gpr_now(GPR_CLOCK_REALTIME)); + grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 16741452b9..79a58fe2af 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -58,6 +58,7 @@ typedef struct { grpc_winsocket *socket; gpr_timespec deadline; grpc_alarm alarm; + char *addr_name; int refs; int aborted; } async_connect; @@ -67,6 +68,7 @@ static void async_connect_cleanup(async_connect *ac) { gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_name); gpr_free(ac); } } @@ -107,7 +109,7 @@ static void on_connect(void *acp, int from_iocp) { gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); gpr_free(utf8_message); } else if (!aborted) { - ep = grpc_tcp_create(ac->socket); + ep = grpc_tcp_create(ac->socket, ac->addr_name); } } else { gpr_log(GPR_ERROR, "on_connect is shutting down"); @@ -213,10 +215,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), ac->socket = socket; gpr_mu_init(&ac->mu); ac->refs = 2; + ac->addr_name = grpc_sockaddr_to_uri(addr); ac->aborted = 0; grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, - gpr_now(GPR_CLOCK_REALTIME)); + gpr_now(GPR_CLOCK_MONOTONIC)); socket->write_info.outstanding = 1; grpc_socket_notify_on_write(socket, on_connect, ac); return; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index b6d6efc9fb..63a8a2720e 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -44,15 +44,17 @@ #include <sys/socket.h> #include <unistd.h> -#include "src/core/support/string.h" -#include "src/core/debug/trace.h" -#include "src/core/profiling/timers.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/support/string.h" +#include "src/core/debug/trace.h" +#include "src/core/profiling/timers.h" + #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL #else @@ -282,6 +284,8 @@ typedef struct { grpc_iomgr_closure write_closure; grpc_iomgr_closure handle_read_closure; + + char *peer_string; } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); @@ -296,6 +300,7 @@ static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_free(tcp->peer_string); gpr_free(tcp); } } @@ -314,7 +319,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, gpr_log(GPR_DEBUG, "read: status=%d", status); for (i = 0; i < nslices; i++) { char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ: %s", dump); + gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump); gpr_free(dump); } } @@ -443,7 +448,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { tcp->handle_read_closure.cb_arg = tcp; - grpc_iomgr_add_callback(&tcp->handle_read_closure); + grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1); } } @@ -567,13 +572,20 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_pollset_add_fd(pollset, tcp->em_fd); } +static char *grpc_tcp_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + static const grpc_endpoint_vtable vtable = { - grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, - grpc_tcp_shutdown, grpc_tcp_destroy}; + grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, + grpc_tcp_shutdown, grpc_tcp_destroy, grpc_tcp_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, + const char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; + tcp->peer_string = gpr_strdup(peer_string); tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index 44279d5a26..d752feaeea 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -53,6 +53,7 @@ extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size); +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, + const char *peer_string); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 5854031c9b..8538600112 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -332,7 +332,7 @@ static void on_read(void *arg, int success) { grpc_set_socket_no_sigpipe_if_possible(fd); - grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); fdobj = grpc_fd_create(fd, name); @@ -342,8 +342,9 @@ static void on_read(void *arg, int success) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); } - sp->server->cb(sp->server->cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + sp->server->cb( + sp->server->cb_arg, + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index e6e1d1499e..cc680507ff 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -116,7 +116,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, } /* This happens asynchronously. Wait while that happens. */ while (s->active_ports) { - gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); + gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&s->mu); @@ -243,9 +243,14 @@ static void on_accept(void *arg, int from_iocp) { SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; + struct sockaddr_storage peer_name; + char *peer_name_string; + char *fd_name; + int peer_name_len = sizeof(peer_name); DWORD transfered_bytes; DWORD flags; BOOL wsa_success; + int err; /* The general mechanism for shutting down is to queue abortion calls. While this is necessary in the read/write case, it's useless for the accept @@ -277,8 +282,28 @@ static void on_accept(void *arg, int from_iocp) { } } else { if (!sp->shutting_down) { - /* TODO(ctiller): add sockaddr address to label */ - ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); + peer_name_string = NULL; + err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + (char *)&sp->socket->socket, + sizeof(sp->socket->socket)); + if (err) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message); + gpr_free(utf8_message); + } + err = getpeername(sock, (struct sockaddr*)&peer_name, &peer_name_len); + if (!err) { + peer_name_string = grpc_sockaddr_to_uri((struct sockaddr*)&peer_name); + } else { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message); + gpr_free(utf8_message); + } + gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); + ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), + peer_name_string); + gpr_free(fd_name); + gpr_free(peer_name_string); } } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 1bf81a73e0..d68e6aee79 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -96,6 +96,8 @@ typedef struct grpc_tcp { to protect ourselves when requesting a shutdown. */ gpr_mu mu; int shutting_down; + + char *peer_string; } grpc_tcp; static void tcp_ref(grpc_tcp *tcp) { @@ -107,6 +109,7 @@ static void tcp_unref(grpc_tcp *tcp) { gpr_slice_buffer_destroy(&tcp->write_slices); grpc_winsocket_orphan(tcp->socket); gpr_mu_destroy(&tcp->mu); + gpr_free(tcp->peer_string); gpr_free(tcp); } } @@ -393,11 +396,16 @@ static void win_destroy(grpc_endpoint *ep) { tcp_unref(tcp); } +static char *win_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + static grpc_endpoint_vtable vtable = { - win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy + win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy, win_get_peer }; -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; @@ -405,6 +413,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { gpr_mu_init(&tcp->mu); gpr_slice_buffer_init(&tcp->write_slices); gpr_ref_init(&tcp->refcount, 1); + tcp->peer_string = gpr_strdup(peer_string); return &tcp->base; } diff --git a/src/core/iomgr/tcp_windows.h b/src/core/iomgr/tcp_windows.h index 4cbc12c53a..7e301db250 100644 --- a/src/core/iomgr/tcp_windows.h +++ b/src/core/iomgr/tcp_windows.h @@ -50,7 +50,7 @@ /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket); +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string); int grpc_tcp_prepare_socket(SOCKET sock); |