aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/alarm.c10
-rw-r--r--src/core/iomgr/endpoint.c4
-rw-r--r--src/core/iomgr/endpoint.h3
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c8
-rw-r--r--src/core/iomgr/endpoint_pair_windows.c4
-rw-r--r--src/core/iomgr/iocp_windows.c2
-rw-r--r--src/core/iomgr/iomgr.c26
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c17
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c15
-rw-r--r--src/core/iomgr/pollset_posix.c53
-rw-r--r--src/core/iomgr/pollset_posix.h6
-rw-r--r--src/core/iomgr/pollset_windows.c2
-rw-r--r--src/core/iomgr/sockaddr_utils.c33
-rw-r--r--src/core/iomgr/sockaddr_utils.h2
-rw-r--r--src/core/iomgr/tcp_client_posix.c20
-rw-r--r--src/core/iomgr/tcp_client_windows.c7
-rw-r--r--src/core/iomgr/tcp_posix.c28
-rw-r--r--src/core/iomgr/tcp_posix.h3
-rw-r--r--src/core/iomgr/tcp_server_posix.c7
-rw-r--r--src/core/iomgr/tcp_server_windows.c31
-rw-r--r--src/core/iomgr/tcp_windows.c13
-rw-r--r--src/core/iomgr/tcp_windows.h2
22 files changed, 225 insertions, 71 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 5860834de3..931f746f75 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -36,6 +36,7 @@
#include "src/core/iomgr/alarm_heap.h"
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/iomgr/time_averaged_stats.h"
+#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@@ -67,6 +68,7 @@ typedef struct {
static gpr_mu g_mu;
/* Allow only one run_some_expired_alarms at once */
static gpr_mu g_checker_mu;
+static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
@@ -85,6 +87,7 @@ void grpc_alarm_list_init(gpr_timespec now) {
gpr_mu_init(&g_mu);
gpr_mu_init(&g_checker_mu);
+ g_clock_type = now.clock_type;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@@ -102,7 +105,8 @@ void grpc_alarm_list_init(gpr_timespec now) {
void grpc_alarm_list_shutdown(void) {
int i;
- while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0))
+ while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL,
+ 0))
;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@@ -127,6 +131,7 @@ static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
ts.tv_sec = d;
ts.tv_nsec = 1e9 * (d - ts.tv_sec);
+ ts.clock_type = GPR_TIMESPAN;
return ts;
}
@@ -173,6 +178,8 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
gpr_timespec now) {
int is_first_alarm = 0;
shard_type *shard = &g_shards[shard_idx(alarm)];
+ GPR_ASSERT(deadline.clock_type == g_clock_type);
+ GPR_ASSERT(now.clock_type == g_clock_type);
alarm->cb = alarm_cb;
alarm->cb_arg = alarm_cb_arg;
alarm->deadline = deadline;
@@ -353,6 +360,7 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
}
int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
+ GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_alarms(drop_mu, now, next, 1);
}
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 96487958a7..f2b44aad03 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -53,3 +53,7 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); }
void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); }
+
+char *grpc_endpoint_get_peer(grpc_endpoint *ep) {
+ return ep->vtable->get_peer(ep);
+}
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index 881e851800..ee0becf3d6 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -72,12 +72,15 @@ struct grpc_endpoint_vtable {
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*shutdown)(grpc_endpoint *ep);
void (*destroy)(grpc_endpoint *ep);
+ char *(*get_peer)(grpc_endpoint *ep);
};
/* When data is available on the connection, calls the callback with slices. */
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
void *user_data);
+char *grpc_endpoint_get_peer(grpc_endpoint *ep);
+
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
index fa2d2555d6..deae9c6875 100644
--- a/src/core/iomgr/endpoint_pair_posix.c
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -66,12 +66,12 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
- p.client =
- grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size);
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size,
+ "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server =
- grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size);
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size,
+ "socketpair-client");
gpr_free(final_name);
return p;
}
diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c
index c6790b2937..7d81470a78 100644
--- a/src/core/iomgr/endpoint_pair_windows.c
+++ b/src/core/iomgr/endpoint_pair_windows.c
@@ -81,8 +81,8 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
- p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"));
- p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"));
+ p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), "endpoint:server");
+ p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), "endpoint:client");
return p;
}
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 3d3a193d00..8741241fb8 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -157,7 +157,7 @@ void grpc_iocp_shutdown(void) {
BOOL success;
gpr_event_set(&g_shutdown_iocp, (void *)1);
grpc_iocp_kick();
- gpr_event_wait(&g_iocp_done, gpr_inf_future);
+ gpr_event_wait(&g_iocp_done, gpr_inf_future(GPR_CLOCK_REALTIME));
success = CloseHandle(g_iocp);
GPR_ASSERT(success);
}
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index cca92d3b32..aa4bc6e20d 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -57,9 +57,9 @@ static grpc_iomgr_object g_root_object;
static void background_callback_executor(void *ignored) {
gpr_mu_lock(&g_mu);
while (!g_shutdown) {
- gpr_timespec deadline = gpr_inf_future;
- gpr_timespec short_deadline =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100));
+ gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ gpr_timespec short_deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN));
if (g_cbs_head) {
grpc_iomgr_closure *closure = g_cbs_head;
g_cbs_head = closure->next;
@@ -67,7 +67,7 @@ static void background_callback_executor(void *ignored) {
gpr_mu_unlock(&g_mu);
closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
- } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_REALTIME),
+ } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC),
&deadline)) {
} else {
gpr_mu_unlock(&g_mu);
@@ -88,9 +88,10 @@ void grpc_kick_poller(void) {
void grpc_iomgr_init(void) {
gpr_thd_id id;
+ g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
- grpc_alarm_list_init(gpr_now(GPR_CLOCK_REALTIME));
+ grpc_alarm_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
@@ -110,8 +111,8 @@ static size_t count_objects(void) {
void grpc_iomgr_shutdown(void) {
grpc_iomgr_object *obj;
grpc_iomgr_closure *closure;
- gpr_timespec shutdown_deadline =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10));
+ gpr_timespec shutdown_deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN));
gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_mu_lock(&g_mu);
@@ -119,7 +120,7 @@ void grpc_iomgr_shutdown(void) {
while (g_cbs_head != NULL || g_root_object.next != &g_root_object) {
if (gpr_time_cmp(
gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time),
- gpr_time_from_seconds(1)) >= 0) {
+ gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG,
"Waiting for %d iomgr objects to be destroyed and executing "
@@ -145,14 +146,14 @@ void grpc_iomgr_shutdown(void) {
} while (g_cbs_head);
continue;
}
- if (grpc_alarm_check(&g_mu, gpr_inf_future, NULL)) {
+ if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
gpr_log(GPR_DEBUG, "got late alarm");
continue;
}
if (g_root_object.next != &g_root_object) {
int timeout = 0;
- gpr_timespec short_deadline =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100));
+ gpr_timespec short_deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
timeout = 1;
@@ -174,7 +175,8 @@ void grpc_iomgr_shutdown(void) {
gpr_mu_unlock(&g_mu);
grpc_kick_poller();
- gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future);
+ gpr_event_wait(&g_background_callback_executor_done,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
grpc_alarm_list_shutdown();
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 3746c8edaf..d697b59e4c 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -50,12 +50,17 @@ typedef struct {
} pollset_hdr;
static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
- grpc_fd *fd) {
+ grpc_fd *fd,
+ int and_unlock_pollset) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
grpc_fd_watcher watcher;
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
+
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
@@ -76,9 +81,15 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
- grpc_fd *fd) {
+ grpc_fd *fd,
+ int and_unlock_pollset) {
pollset_hdr *h = pollset->data.ptr;
int err;
+
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
+
/* Note that this can race with concurrent poll, but that should be fine since
* at worst it creates a spurious read event on a reused grpc_fd object. */
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
@@ -183,7 +194,7 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
abort();
}
for (i = 0; i < nfds; i++) {
- multipoll_with_epoll_pollset_add_fd(pollset, fds[i]);
+ multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0);
}
grpc_wakeup_fd_create(&h->wakeup_fd);
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 7b717bd159..0084e83953 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -66,12 +66,13 @@ typedef struct {
} pollset_hdr;
static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset,
- grpc_fd *fd) {
+ grpc_fd *fd,
+ int and_unlock_pollset) {
size_t i;
pollset_hdr *h = pollset->data.ptr;
/* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
for (i = 0; i < h->fd_count; i++) {
- if (h->fds[i] == fd) return;
+ if (h->fds[i] == fd) goto exit;
}
if (h->fd_count == h->fd_capacity) {
h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
@@ -79,10 +80,15 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset,
}
h->fds[h->fd_count++] = fd;
GRPC_FD_REF(fd, "multipoller");
+exit:
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
}
static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
- grpc_fd *fd) {
+ grpc_fd *fd,
+ int and_unlock_pollset) {
/* will get removed next poll cycle */
pollset_hdr *h = pollset->data.ptr;
if (h->del_count == h->del_capacity) {
@@ -91,6 +97,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
}
h->dels[h->del_count++] = fd;
GRPC_FD_REF(fd, "multipoller_del");
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
}
static void end_polling(grpc_pollset *pollset) {
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 85101764d2..c8646af615 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -105,14 +105,28 @@ void grpc_pollset_init(grpc_pollset *pollset) {
void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
- pollset->vtable->add_fd(pollset, fd);
+ pollset->vtable->add_fd(pollset, fd, 1);
+ /* the following (enabled only in debug) will reacquire and then release
+ our lock - meaning that if the unlocking flag passed to del_fd above is
+ not respected, the code will deadlock (in a way that we have a chance of
+ debugging) */
+#ifndef NDEBUG
+ gpr_mu_lock(&pollset->mu);
gpr_mu_unlock(&pollset->mu);
+#endif
}
void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
- pollset->vtable->del_fd(pollset, fd);
+ pollset->vtable->del_fd(pollset, fd, 1);
+ /* the following (enabled only in debug) will reacquire and then release
+ our lock - meaning that if the unlocking flag passed to del_fd above is
+ not respected, the code will deadlock (in a way that we have a chance of
+ debugging) */
+#ifndef NDEBUG
+ gpr_mu_lock(&pollset->mu);
gpr_mu_unlock(&pollset->mu);
+#endif
}
static void finish_shutdown(grpc_pollset *pollset) {
@@ -122,7 +136,7 @@ static void finish_shutdown(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
- gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
@@ -191,17 +205,17 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
static const int max_spin_polling_us = 10;
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
+ if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return -1;
}
- if (gpr_time_cmp(
- deadline,
- gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) {
+ if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
+ max_spin_polling_us,
+ GPR_TIMESPAN))) <= 0) {
return 0;
}
timeout = gpr_time_sub(deadline, now);
- return gpr_time_to_millis(
- gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1)));
+ return gpr_time_to_millis(gpr_time_add(
+ timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN)));
}
/*
@@ -257,7 +271,7 @@ static void basic_do_promote(void *args, int success) {
} else if (grpc_fd_is_orphaned(fd)) {
/* Don't try to add it to anything, we'll drop our ref on it below */
} else if (pollset->vtable != original_vtable) {
- pollset->vtable->add_fd(pollset, fd);
+ pollset->vtable->add_fd(pollset, fd, 0);
} else if (fd != pollset->data.ptr) {
grpc_fd *fds[2];
fds[0] = pollset->data.ptr;
@@ -287,10 +301,11 @@ static void basic_do_promote(void *args, int success) {
GRPC_FD_UNREF(fd, "basicpoll_add");
}
-static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
+static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
+ int and_unlock_pollset) {
grpc_unary_promote_args *up_args;
GPR_ASSERT(fd);
- if (fd == pollset->data.ptr) return;
+ if (fd == pollset->data.ptr) goto exit;
if (!pollset->counter) {
/* Fast path -- no in flight cbs */
@@ -313,7 +328,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
pollset->data.ptr = fd;
GRPC_FD_REF(fd, "basicpoll");
}
- return;
+ goto exit;
}
/* Now we need to promote. This needs to happen when we're not polling. Since
@@ -329,14 +344,24 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
grpc_iomgr_add_callback(&up_args->promotion_closure);
grpc_pollset_kick(pollset);
+
+exit:
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
}
-static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
+static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
+ int and_unlock_pollset) {
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) {
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
pollset->data.ptr = NULL;
}
+
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
}
static void basic_pollset_maybe_work(grpc_pollset *pollset,
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 53585a2886..37de1276d1 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -66,8 +66,10 @@ typedef struct grpc_pollset {
} grpc_pollset;
struct grpc_pollset_vtable {
- void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
- void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
+ void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd,
+ int and_unlock_pollset);
+ void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd,
+ int and_unlock_pollset);
void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
gpr_timespec now, int allow_synchronous_callback);
void (*kick)(grpc_pollset *pollset);
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 24226cc980..a9c4739c7c 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -70,7 +70,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
gpr_timespec now;
- now = gpr_now(GPR_CLOCK_REALTIME);
+ now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c
index e91b94f8c8..71ac12e87b 100644
--- a/src/core/iomgr/sockaddr_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -36,12 +36,18 @@
#include <errno.h>
#include <string.h>
-#include "src/core/support/string.h"
+#ifdef GPR_POSIX_SOCKET
+#include <sys/un.h>
+#endif
+
+#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
+#include "src/core/support/string.h"
+
static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0xff, 0xff};
@@ -161,6 +167,31 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
return ret;
}
+char *grpc_sockaddr_to_uri(const struct sockaddr *addr) {
+ char *temp;
+ char *result;
+
+ switch (addr->sa_family) {
+ case AF_INET:
+ grpc_sockaddr_to_string(&temp, addr, 0);
+ gpr_asprintf(&result, "ipv4:%s", temp);
+ gpr_free(temp);
+ return result;
+ case AF_INET6:
+ grpc_sockaddr_to_string(&temp, addr, 0);
+ gpr_asprintf(&result, "ipv6:%s", temp);
+ gpr_free(temp);
+ return result;
+#ifdef GPR_POSIX_SOCKET
+ case AF_UNIX:
+ gpr_asprintf(&result, "unix:%s", ((struct sockaddr_un *)addr)->sun_path);
+ return result;
+#endif
+ }
+
+ return NULL;
+}
+
int grpc_sockaddr_get_port(const struct sockaddr *addr) {
switch (addr->sa_family) {
case AF_INET:
diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h
index bdfb83479b..99f1ed54da 100644
--- a/src/core/iomgr/sockaddr_utils.h
+++ b/src/core/iomgr/sockaddr_utils.h
@@ -84,4 +84,6 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port);
int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
int normalize);
+char *grpc_sockaddr_to_uri(const struct sockaddr *addr);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index dc0489e64f..392eda999e 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -64,6 +64,7 @@ typedef struct {
int refs;
grpc_iomgr_closure write_closure;
grpc_pollset_set *interested_parties;
+ char *addr_str;
} async_connect;
static int prepare_socket(const struct sockaddr *addr, int fd) {
@@ -99,6 +100,7 @@ static void on_alarm(void *acp, int success) {
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
+ gpr_free(ac->addr_str);
gpr_free(ac);
}
}
@@ -114,6 +116,8 @@ static void on_writable(void *acp, int success) {
void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg;
+ grpc_alarm_cancel(&ac->alarm);
+
gpr_mu_lock(&ac->mu);
if (success) {
do {
@@ -156,7 +160,8 @@ static void on_writable(void *acp, int success) {
}
} else {
grpc_pollset_set_del_fd(ac->interested_parties, ac->fd);
- ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE,
+ ac->addr_str);
goto finish;
}
} else {
@@ -177,9 +182,8 @@ finish:
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
+ gpr_free(ac->addr_str);
gpr_free(ac);
- } else {
- grpc_alarm_cancel(&ac->alarm);
}
cb(cb_arg, ep);
}
@@ -223,13 +227,13 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
err = connect(fd, addr, addr_len);
} while (err < 0 && errno == EINTR);
- grpc_sockaddr_to_string(&addr_str, addr, 1);
+ addr_str = grpc_sockaddr_to_uri(addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
- cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
goto done;
}
@@ -247,14 +251,16 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->cb_arg = arg;
ac->fd = fdobj;
ac->interested_parties = interested_parties;
+ ac->addr_str = addr_str;
+ addr_str = NULL;
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
gpr_mu_lock(&ac->mu);
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
- gpr_now(GPR_CLOCK_REALTIME));
+ grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 16741452b9..79a58fe2af 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -58,6 +58,7 @@ typedef struct {
grpc_winsocket *socket;
gpr_timespec deadline;
grpc_alarm alarm;
+ char *addr_name;
int refs;
int aborted;
} async_connect;
@@ -67,6 +68,7 @@ static void async_connect_cleanup(async_connect *ac) {
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
+ gpr_free(ac->addr_name);
gpr_free(ac);
}
}
@@ -107,7 +109,7 @@ static void on_connect(void *acp, int from_iocp) {
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message);
} else if (!aborted) {
- ep = grpc_tcp_create(ac->socket);
+ ep = grpc_tcp_create(ac->socket, ac->addr_name);
}
} else {
gpr_log(GPR_ERROR, "on_connect is shutting down");
@@ -213,10 +215,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
ac->socket = socket;
gpr_mu_init(&ac->mu);
ac->refs = 2;
+ ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
- gpr_now(GPR_CLOCK_REALTIME));
+ gpr_now(GPR_CLOCK_MONOTONIC));
socket->write_info.outstanding = 1;
grpc_socket_notify_on_write(socket, on_connect, ac);
return;
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index b6d6efc9fb..63a8a2720e 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -44,15 +44,17 @@
#include <sys/socket.h>
#include <unistd.h>
-#include "src/core/support/string.h"
-#include "src/core/debug/trace.h"
-#include "src/core/profiling/timers.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/support/string.h"
+#include "src/core/debug/trace.h"
+#include "src/core/profiling/timers.h"
+
#ifdef GPR_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS MSG_NOSIGNAL
#else
@@ -282,6 +284,8 @@ typedef struct {
grpc_iomgr_closure write_closure;
grpc_iomgr_closure handle_read_closure;
+
+ char *peer_string;
} grpc_tcp;
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
@@ -296,6 +300,7 @@ static void grpc_tcp_unref(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount);
if (refcount_zero) {
grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
+ gpr_free(tcp->peer_string);
gpr_free(tcp);
}
}
@@ -314,7 +319,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
gpr_log(GPR_DEBUG, "read: status=%d", status);
for (i = 0; i < nslices; i++) {
char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ: %s", dump);
+ gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
gpr_free(dump);
}
}
@@ -443,7 +448,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
tcp->handle_read_closure.cb_arg = tcp;
- grpc_iomgr_add_callback(&tcp->handle_read_closure);
+ grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1);
}
}
@@ -567,13 +572,20 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_pollset_add_fd(pollset, tcp->em_fd);
}
+static char *grpc_tcp_get_peer(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return gpr_strdup(tcp->peer_string);
+}
+
static const grpc_endpoint_vtable vtable = {
- grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset,
- grpc_tcp_shutdown, grpc_tcp_destroy};
+ grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset,
+ grpc_tcp_shutdown, grpc_tcp_destroy, grpc_tcp_get_peer};
-grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
+grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
+ const char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
+ tcp->peer_string = gpr_strdup(peer_string);
tcp->fd = em_fd->fd;
tcp->read_cb = NULL;
tcp->write_cb = NULL;
diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h
index 44279d5a26..d752feaeea 100644
--- a/src/core/iomgr/tcp_posix.h
+++ b/src/core/iomgr/tcp_posix.h
@@ -53,6 +53,7 @@ extern int grpc_tcp_trace;
/* Create a tcp endpoint given a file desciptor and a read slice size.
Takes ownership of fd. */
-grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size);
+grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size,
+ const char *peer_string);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 5854031c9b..8538600112 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -332,7 +332,7 @@ static void on_read(void *arg, int success) {
grpc_set_socket_no_sigpipe_if_possible(fd);
- grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
fdobj = grpc_fd_create(fd, name);
@@ -342,8 +342,9 @@ static void on_read(void *arg, int success) {
for (i = 0; i < sp->server->pollset_count; i++) {
grpc_pollset_add_fd(sp->server->pollsets[i], fdobj);
}
- sp->server->cb(sp->server->cb_arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ sp->server->cb(
+ sp->server->cb_arg,
+ grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
gpr_free(name);
gpr_free(addr_str);
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index e6e1d1499e..cc680507ff 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -116,7 +116,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
}
/* This happens asynchronously. Wait while that happens. */
while (s->active_ports) {
- gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
+ gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&s->mu);
@@ -243,9 +243,14 @@ static void on_accept(void *arg, int from_iocp) {
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
+ struct sockaddr_storage peer_name;
+ char *peer_name_string;
+ char *fd_name;
+ int peer_name_len = sizeof(peer_name);
DWORD transfered_bytes;
DWORD flags;
BOOL wsa_success;
+ int err;
/* The general mechanism for shutting down is to queue abortion calls. While
this is necessary in the read/write case, it's useless for the accept
@@ -277,8 +282,28 @@ static void on_accept(void *arg, int from_iocp) {
}
} else {
if (!sp->shutting_down) {
- /* TODO(ctiller): add sockaddr address to label */
- ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
+ peer_name_string = NULL;
+ err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
+ (char *)&sp->socket->socket,
+ sizeof(sp->socket->socket));
+ if (err) {
+ char *utf8_message = gpr_format_message(WSAGetLastError());
+ gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
+ gpr_free(utf8_message);
+ }
+ err = getpeername(sock, (struct sockaddr*)&peer_name, &peer_name_len);
+ if (!err) {
+ peer_name_string = grpc_sockaddr_to_uri((struct sockaddr*)&peer_name);
+ } else {
+ char *utf8_message = gpr_format_message(WSAGetLastError());
+ gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
+ gpr_free(utf8_message);
+ }
+ gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
+ ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
+ peer_name_string);
+ gpr_free(fd_name);
+ gpr_free(peer_name_string);
}
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 1bf81a73e0..d68e6aee79 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -96,6 +96,8 @@ typedef struct grpc_tcp {
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
+
+ char *peer_string;
} grpc_tcp;
static void tcp_ref(grpc_tcp *tcp) {
@@ -107,6 +109,7 @@ static void tcp_unref(grpc_tcp *tcp) {
gpr_slice_buffer_destroy(&tcp->write_slices);
grpc_winsocket_orphan(tcp->socket);
gpr_mu_destroy(&tcp->mu);
+ gpr_free(tcp->peer_string);
gpr_free(tcp);
}
}
@@ -393,11 +396,16 @@ static void win_destroy(grpc_endpoint *ep) {
tcp_unref(tcp);
}
+static char *win_get_peer(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return gpr_strdup(tcp->peer_string);
+}
+
static grpc_endpoint_vtable vtable = {
- win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy
+ win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy, win_get_peer
};
-grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
+grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
@@ -405,6 +413,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
gpr_mu_init(&tcp->mu);
gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1);
+ tcp->peer_string = gpr_strdup(peer_string);
return &tcp->base;
}
diff --git a/src/core/iomgr/tcp_windows.h b/src/core/iomgr/tcp_windows.h
index 4cbc12c53a..7e301db250 100644
--- a/src/core/iomgr/tcp_windows.h
+++ b/src/core/iomgr/tcp_windows.h
@@ -50,7 +50,7 @@
/* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle.
*/
-grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket);
+grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string);
int grpc_tcp_prepare_socket(SOCKET sock);