aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.c4
-rw-r--r--src/core/lib/iomgr/endpoint_pair_uv.c53
-rw-r--r--src/core/lib/iomgr/endpoint_pair_windows.c4
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c20
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.h5
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c16
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c4
-rw-r--r--src/core/lib/iomgr/ev_posix.c6
-rw-r--r--src/core/lib/iomgr/iocp_windows.c6
-rw-r--r--src/core/lib/iomgr/iomgr.h2
-rw-r--r--src/core/lib/iomgr/iomgr_posix.c4
-rw-r--r--src/core/lib/iomgr/iomgr_uv.c49
-rw-r--r--src/core/lib/iomgr/iomgr_windows.c4
-rw-r--r--src/core/lib/iomgr/pollset_set_uv.c62
-rw-r--r--src/core/lib/iomgr/pollset_set_windows.c6
-rw-r--r--src/core/lib/iomgr/pollset_uv.c142
-rw-r--r--src/core/lib/iomgr/pollset_uv.h42
-rw-r--r--src/core/lib/iomgr/pollset_windows.c6
-rw-r--r--src/core/lib/iomgr/port.h131
-rw-r--r--src/core/lib/iomgr/resolve_address.h1
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c8
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c231
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c10
-rw-r--r--src/core/lib/iomgr/resource_quota.c115
-rw-r--r--src/core/lib/iomgr/resource_quota.h47
-rw-r--r--src/core/lib/iomgr/sockaddr.h12
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.c99
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.h28
-rw-r--r--src/core/lib/iomgr/socket_utils.h42
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.c32
-rw-r--r--src/core/lib/iomgr/socket_utils_linux.c16
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.c17
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h10
-rw-r--r--src/core/lib/iomgr/socket_utils_uv.c49
-rw-r--r--src/core/lib/iomgr/socket_utils_windows.c48
-rw-r--r--src/core/lib/iomgr/socket_windows.c6
-rw-r--r--src/core/lib/iomgr/tcp_client.h4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c31
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c153
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c22
-rw-r--r--src/core/lib/iomgr/tcp_posix.c10
-rw-r--r--src/core/lib/iomgr/tcp_server.h6
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c119
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c365
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c80
-rw-r--r--src/core/lib/iomgr/tcp_uv.c338
-rw-r--r--src/core/lib/iomgr/tcp_uv.h57
-rw-r--r--src/core/lib/iomgr/tcp_windows.c6
-rw-r--r--src/core/lib/iomgr/timer.h17
-rw-r--r--src/core/lib/iomgr/timer_generic.c (renamed from src/core/lib/iomgr/timer.c)6
-rw-r--r--src/core/lib/iomgr/timer_generic.h49
-rw-r--r--src/core/lib/iomgr/timer_heap.c6
-rw-r--r--src/core/lib/iomgr/timer_uv.c99
-rw-r--r--src/core/lib/iomgr/timer_uv.h47
-rw-r--r--src/core/lib/iomgr/udp_server.c202
-rw-r--r--src/core/lib/iomgr/udp_server.h8
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.c20
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.h11
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix_noop.c8
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.c6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_eventfd.c6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_nospecial.c6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_pipe.c4
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.c6
-rw-r--r--src/core/lib/iomgr/workqueue.h1
-rw-r--r--src/core/lib/iomgr/workqueue_uv.c66
-rw-r--r--src/core/lib/iomgr/workqueue_uv.h37
67 files changed, 2645 insertions, 488 deletions
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c
index fc80064a60..b9ff969e81 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.c
+++ b/src/core/lib/iomgr/endpoint_pair_posix.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.c
new file mode 100644
index 0000000000..7941e20388
--- /dev/null
+++ b/src/core/lib/iomgr/endpoint_pair_uv.c
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <stdlib.h>
+
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/endpoint_pair.h"
+
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ size_t read_slice_size) {
+ grpc_endpoint_pair endpoint_pair;
+ // TODO(mlumish): implement this properly under libuv
+ GPR_ASSERT(false &&
+ "grpc_iomgr_create_endpoint_pair is not suppoted with libuv");
+ return endpoint_pair;
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.c
index 582704e267..5c78c95492 100644
--- a/src/core/lib/iomgr/endpoint_pair_windows.c
+++ b/src/core/lib/iomgr/endpoint_pair_windows.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index e5909d9380..db51ec4939 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -32,10 +32,10 @@
*/
#include <grpc/grpc_posix.h>
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
/* This polling engine is only relevant on linux kernels supporting epoll() */
-#ifdef GPR_LINUX_EPOLL
+#ifdef GRPC_LINUX_EPOLL
#include "src/core/lib/iomgr/ev_epoll_linux.h"
@@ -1711,6 +1711,12 @@ retry:
"pollset_add_fd: Raced creating new polling island. pi_new: %p "
"(fd: %d, pollset: %p)",
(void *)pi_new, fd->fd, (void *)pollset);
+
+ /* No need to lock 'pi_new' here since this is a new polling island and
+ * no one has a reference to it yet */
+ polling_island_remove_all_fds_locked(pi_new, true, &error);
+
+ /* Ref and unref so that the polling island gets deleted during unref */
PI_ADD_REF(pi_new, "dance_of_destruction");
PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
goto retry;
@@ -2049,13 +2055,13 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
return &vtable;
}
-#else /* defined(GPR_LINUX_EPOLL) */
-#if defined(GPR_POSIX_SOCKET)
+#else /* defined(GRPC_LINUX_EPOLL) */
+#if defined(GRPC_POSIX_SOCKET)
#include "src/core/lib/iomgr/ev_posix.h"
-/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
+/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
-#endif /* defined(GPR_POSIX_SOCKET) */
+#endif /* defined(GRPC_POSIX_SOCKET) */
void grpc_use_signal(int signum) {}
-#endif /* !defined(GPR_LINUX_EPOLL) */
+#endif /* !defined(GRPC_LINUX_EPOLL) */
diff --git a/src/core/lib/iomgr/ev_epoll_linux.h b/src/core/lib/iomgr/ev_epoll_linux.h
index 7a494aba19..8fc3ff59a3 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.h
+++ b/src/core/lib/iomgr/ev_epoll_linux.h
@@ -35,13 +35,14 @@
#define GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H
#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/port.h"
const grpc_event_engine_vtable *grpc_init_epoll_linux(void);
-#ifdef GPR_LINUX_EPOLL
+#ifdef GRPC_LINUX_EPOLL
void *grpc_fd_get_polling_island(grpc_fd *fd);
void *grpc_pollset_get_polling_island(grpc_pollset *ps);
bool grpc_are_polling_islands_equal(void *p, void *q);
-#endif /* defined(GPR_LINUX_EPOLL) */
+#endif /* defined(GRPC_LINUX_EPOLL) */
#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H */
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 1829440a6e..bf51404203 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -42,9 +42,9 @@
* - ev_epoll_posix.{h,c}
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
@@ -1338,7 +1338,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
* pollset_multipoller_with_poll_posix.c
*/
-#ifndef GPR_LINUX_MULTIPOLL_WITH_EPOLL
+#ifndef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
typedef struct {
/* all polled fds */
@@ -1520,13 +1520,13 @@ static void poll_become_multipoller(grpc_exec_ctx *exec_ctx,
}
}
-#endif /* !GPR_LINUX_MULTIPOLL_WITH_EPOLL */
+#endif /* !GRPC_LINUX_MULTIPOLL_WITH_EPOLL */
/*******************************************************************************
* pollset_multipoller_with_epoll_posix.c
*/
-#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
+#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
#include <errno.h>
#include <poll.h>
@@ -1839,11 +1839,11 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
}
}
-#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
+#else /* GRPC_LINUX_MULTIPOLL_WITH_EPOLL */
static void remove_fd_from_all_epoll_sets(int fd) {}
-#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
+#endif /* GRPC_LINUX_MULTIPOLL_WITH_EPOLL */
/*******************************************************************************
* pollset_set_posix.c
@@ -2063,7 +2063,7 @@ static const grpc_event_engine_vtable vtable = {
};
const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) {
-#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
+#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
platform_become_multipoller = epoll_become_multipoller;
#else
platform_become_multipoller = poll_become_multipoller;
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 27e966c18c..e1d620cfff 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/iomgr/ev_poll_posix.h"
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 9857b0bce9..ef36ba89b2 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/iomgr/ev_posix.h"
@@ -282,4 +282,4 @@ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error);
}
-#endif // GPR_POSIX_SOCKET
+#endif // GRPC_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/iocp_windows.c b/src/core/lib/iomgr/iocp_windows.c
index 2532e52e48..60ebe43676 100644
--- a/src/core/lib/iomgr/iocp_windows.c
+++ b/src/core/lib/iomgr/iocp_windows.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
#include <winsock2.h>
@@ -166,4 +166,4 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
GPR_ASSERT(ret == g_iocp);
}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index 6c82de78ac..c1cfaf302e 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -34,6 +34,8 @@
#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_H
#define GRPC_CORE_LIB_IOMGR_IOMGR_H
+#include "src/core/lib/iomgr/port.h"
+
/** Initializes the iomgr. */
void grpc_iomgr_init(void);
diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c
index cede97f4c6..f5ee0c9ee4 100644
--- a/src/core/lib/iomgr/iomgr_posix.c
+++ b/src/core/lib/iomgr/iomgr_posix.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c
new file mode 100644
index 0000000000..96516ff167
--- /dev/null
+++ b/src/core/lib/iomgr/iomgr_uv.c
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/pollset_uv.h"
+#include "src/core/lib/iomgr/tcp_uv.h"
+
+void grpc_iomgr_platform_init(void) {
+ grpc_pollset_global_init();
+ grpc_register_tracer("tcp", &grpc_tcp_trace);
+}
+void grpc_iomgr_platform_flush(void) {}
+void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/iomgr_windows.c b/src/core/lib/iomgr/iomgr_windows.c
index 7653f6e635..b659264ede 100644
--- a/src/core/lib/iomgr/iomgr_windows.c
+++ b/src/core/lib/iomgr/iomgr_windows.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
#include "src/core/lib/iomgr/sockaddr_windows.h"
diff --git a/src/core/lib/iomgr/pollset_set_uv.c b/src/core/lib/iomgr/pollset_set_uv.c
new file mode 100644
index 0000000000..e5ef8b29e0
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_set_uv.c
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include "src/core/lib/iomgr/pollset_set.h"
+
+grpc_pollset_set* grpc_pollset_set_create(void) {
+ return (grpc_pollset_set*)((intptr_t)0xdeafbeef);
+}
+
+void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
+
+void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {}
+
+void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {}
+
+void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
+void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c
index a35a9766fc..645650db9b 100644
--- a/src/core/lib/iomgr/pollset_set_windows.c
+++ b/src/core/lib/iomgr/pollset_set_windows.c
@@ -31,10 +31,10 @@
*
*/
-#include <grpc/support/port_platform.h>
#include <stdint.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
#include "src/core/lib/iomgr/pollset_set_windows.h"
@@ -60,4 +60,4 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* bag,
grpc_pollset_set* item) {}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
new file mode 100644
index 0000000000..3a74b842b6
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <uv.h>
+
+#include <string.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_uv.h"
+
+struct grpc_pollset {
+ uv_timer_t timer;
+ int shutting_down;
+};
+
+/* Indicates that grpc_pollset_work should run an iteration of the UV loop
+ before running callbacks. This defaults to 1, and should be disabled if
+ grpc_pollset_work will be called within the callstack of uv_run */
+int grpc_pollset_work_run_loop;
+
+gpr_mu grpc_polling_mu;
+
+size_t grpc_pollset_size() { return sizeof(grpc_pollset); }
+
+void grpc_pollset_global_init(void) {
+ gpr_mu_init(&grpc_polling_mu);
+ grpc_pollset_work_run_loop = 1;
+}
+
+void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
+
+void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ *mu = &grpc_polling_mu;
+ memset(pollset, 0, sizeof(grpc_pollset));
+ uv_timer_init(uv_default_loop(), &pollset->timer);
+ pollset->shutting_down = 0;
+}
+
+static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
+
+void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
+ GPR_ASSERT(!pollset->shutting_down);
+ pollset->shutting_down = 1;
+ if (grpc_pollset_work_run_loop) {
+ // Drain any pending UV callbacks without blocking
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ }
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+}
+
+void grpc_pollset_destroy(grpc_pollset *pollset) {
+ uv_close((uv_handle_t *)&pollset->timer, timer_close_cb);
+ // timer.data is a boolean indicating that the timer has finished closing
+ pollset->timer.data = (void *)0;
+ if (grpc_pollset_work_run_loop) {
+ while (!pollset->timer.data) {
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ }
+ }
+}
+
+void grpc_pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ pollset->shutting_down = 0;
+}
+
+static void timer_run_cb(uv_timer_t *timer) {}
+
+grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl,
+ gpr_timespec now, gpr_timespec deadline) {
+ uint64_t timeout;
+ gpr_mu_unlock(&grpc_polling_mu);
+ if (grpc_pollset_work_run_loop) {
+ if (gpr_time_cmp(deadline, now) >= 0) {
+ timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now));
+ } else {
+ timeout = 0;
+ }
+ /* We special-case timeout=0 so that we don't bother with the timer when
+ the loop won't block anyway */
+ if (timeout > 0) {
+ uv_timer_start(&pollset->timer, timer_run_cb, timeout, 0);
+ /* Run until there is some I/O activity or the timer triggers. It doesn't
+ matter which happens */
+ uv_run(uv_default_loop(), UV_RUN_ONCE);
+ uv_timer_stop(&pollset->timer);
+ } else {
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ }
+ }
+ if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
+ grpc_exec_ctx_flush(exec_ctx);
+ }
+ gpr_mu_lock(&grpc_polling_mu);
+ return GRPC_ERROR_NONE;
+}
+
+grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker) {
+ return GRPC_ERROR_NONE;
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h
new file mode 100644
index 0000000000..0715eb4295
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_uv.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_POLLSET_UV_H
+#define GRPC_CORE_LIB_IOMGR_POLLSET_UV_H
+
+extern int grpc_pollset_work_run_loop;
+
+void grpc_pollset_global_init(void);
+void grpc_pollset_global_shutdown(void);
+
+#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index 626dd784b3..5540303e49 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
@@ -241,4 +241,4 @@ grpc_error *grpc_pollset_kick(grpc_pollset *p,
void grpc_kick_poller(void) { grpc_iocp_kick(); }
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
new file mode 100644
index 0000000000..c0bb3b5a23
--- /dev/null
+++ b/src/core/lib/iomgr/port.h
@@ -0,0 +1,131 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifndef GRPC_CORE_LIB_IOMGR_PORT_H
+#define GRPC_CORE_LIB_IOMGR_PORT_H
+
+#if defined(GRPC_UV)
+// Do nothing
+#elif defined(GPR_MANYLINUX1)
+#define GRPC_HAVE_IPV6_RECVPKTINFO 1
+#define GRPC_HAVE_IP_PKTINFO 1
+#define GRPC_HAVE_MSG_NOSIGNAL 1
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
+#elif defined(GPR_WINDOWS)
+#define GRPC_TIMER_USE_GENERIC 1
+#define GRPC_WINSOCK_SOCKET 1
+#define GRPC_WINDOWS_SOCKETUTILS 1
+#elif defined(GPR_ANDROID)
+#define GRPC_HAVE_IPV6_RECVPKTINFO 1
+#define GRPC_HAVE_IP_PKTINFO 1
+#define GRPC_HAVE_MSG_NOSIGNAL 1
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_LINUX_EVENTFD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
+#elif defined(GPR_LINUX)
+#define GRPC_HAVE_IPV6_RECVPKTINFO 1
+#define GRPC_HAVE_IP_PKTINFO 1
+#define GRPC_HAVE_MSG_NOSIGNAL 1
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
+#ifdef __GLIBC_PREREQ
+#if __GLIBC_PREREQ(2, 9)
+#define GRPC_LINUX_EPOLL 1
+#define GRPC_LINUX_EVENTFD 1
+#endif
+#if __GLIBC_PREREQ(2, 10)
+#define GRPC_LINUX_SOCKETUTILS 1
+#endif
+#endif
+#ifndef GRPC_LINUX_EVENTFD
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#endif
+#ifndef GRPC_LINUX_SOCKETUTILS
+#define GRPC_POSIX_SOCKETUTILS
+#endif
+#elif defined(GPR_APPLE)
+#define GRPC_HAVE_IP_PKTINFO 1
+#define GRPC_HAVE_SO_NOSIGPIPE 1
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_MSG_IOVLEN_TYPE int
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
+#elif defined(GPR_FREEBSD)
+#define GRPC_HAVE_IPV6_RECVPKTINFO 1
+#define GRPC_HAVE_IP_PKTINFO 1
+#define GRPC_HAVE_SO_NOSIGPIPE 1
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
+#elif defined(GPR_NACL)
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
+#elif !defined(GPR_NO_AUTODETECT_PLATFORM)
+#error "Platform not recognized"
+#endif
+
+#if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \
+ defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_UV) != \
+ 1
+#error Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GPR_CUSTOM_SOCKET
+#endif
+
+#endif /* GRPC_CORE_LIB_IOMGR_PORT_H */
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index ddbe375755..275924448a 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -36,7 +36,6 @@
#include <stddef.h>
#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/iomgr/iomgr.h"
#define GRPC_MAX_SOCKADDR_SIZE 128
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index 4e9f978584..de791b2b67 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -31,12 +31,13 @@
*
*/
-#include <grpc/support/port_platform.h>
-#ifdef GPR_POSIX_SOCKET
+#include "src/core/lib/iomgr/port.h"
+#ifdef GRPC_POSIX_SOCKET
-#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+
#include <string.h>
#include <sys/types.h>
@@ -49,7 +50,6 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/string.h"
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
new file mode 100644
index 0000000000..b8295acfa1
--- /dev/null
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -0,0 +1,231 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+#ifdef GRPC_UV
+
+#include <uv.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+
+#include <string.h>
+
+typedef struct request {
+ grpc_closure *on_done;
+ grpc_resolved_addresses **addresses;
+ struct addrinfo *hints;
+} request;
+
+static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result,
+ grpc_resolved_addresses **addresses) {
+ struct addrinfo *resp;
+ size_t i;
+ if (status != 0) {
+ grpc_error *error;
+ *addresses = NULL;
+ error = GRPC_ERROR_CREATE("getaddrinfo failed");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
+ return error;
+ }
+ (*addresses) = gpr_malloc(sizeof(grpc_resolved_addresses));
+ (*addresses)->naddrs = 0;
+ for (resp = result; resp != NULL; resp = resp->ai_next) {
+ (*addresses)->naddrs++;
+ }
+ (*addresses)->addrs =
+ gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs);
+ i = 0;
+ for (resp = result; resp != NULL; resp = resp->ai_next) {
+ memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
+ (*addresses)->addrs[i].len = resp->ai_addrlen;
+ i++;
+ }
+
+ {
+ for (i = 0; i < (*addresses)->naddrs; i++) {
+ char *buf;
+ grpc_sockaddr_to_string(&buf, &(*addresses)->addrs[i], 0);
+ gpr_free(buf);
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
+ struct addrinfo *res) {
+ request *r = (request *)req->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_error *error;
+ error = handle_addrinfo_result(status, res, r->addresses);
+ grpc_exec_ctx_sched(&exec_ctx, r->on_done, error, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
+
+ gpr_free(r->hints);
+ gpr_free(r);
+ gpr_free(req);
+ uv_freeaddrinfo(res);
+}
+
+static grpc_error *try_split_host_port(const char *name,
+ const char *default_port, char **host,
+ char **port) {
+ /* parse name, splitting it into host and port parts */
+ grpc_error *error;
+ gpr_split_host_port(name, host, port);
+ if (host == NULL) {
+ char *msg;
+ gpr_asprintf(&msg, "unparseable host:port: '%s'", name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return error;
+ }
+ if (port == NULL) {
+ if (default_port == NULL) {
+ char *msg;
+ gpr_asprintf(&msg, "no port in name '%s'", name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return error;
+ }
+ *port = gpr_strdup(default_port);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error *blocking_resolve_address_impl(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses) {
+ char *host;
+ char *port;
+ struct addrinfo hints;
+ uv_getaddrinfo_t req;
+ int s;
+ grpc_error *err;
+
+ req.addrinfo = NULL;
+
+ err = try_split_host_port(name, default_port, &host, &port);
+ if (err != GRPC_ERROR_NONE) {
+ goto done;
+ }
+
+ /* Call getaddrinfo */
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
+ hints.ai_socktype = SOCK_STREAM; /* stream socket */
+ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+
+ s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
+ err = handle_addrinfo_result(s, req.addrinfo, addresses);
+
+done:
+ gpr_free(host);
+ gpr_free(port);
+ if (req.addrinfo) {
+ uv_freeaddrinfo(req.addrinfo);
+ }
+ return err;
+}
+
+grpc_error *(*grpc_blocking_resolve_address)(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses) = blocking_resolve_address_impl;
+
+void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
+ if (addrs != NULL) {
+ gpr_free(addrs->addrs);
+ }
+ gpr_free(addrs);
+}
+
+static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) {
+ uv_getaddrinfo_t *req;
+ request *r;
+ struct addrinfo *hints;
+ char *host;
+ char *port;
+ grpc_error *err;
+ int s;
+ err = try_split_host_port(name, default_port, &host, &port);
+ if (err != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ return;
+ }
+ r = gpr_malloc(sizeof(request));
+ r->on_done = on_done;
+ r->addresses = addrs;
+ req = gpr_malloc(sizeof(uv_getaddrinfo_t));
+ req->data = r;
+
+ /* Call getaddrinfo */
+ hints = gpr_malloc(sizeof(struct addrinfo));
+ memset(hints, 0, sizeof(struct addrinfo));
+ hints->ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
+ hints->ai_socktype = SOCK_STREAM; /* stream socket */
+ hints->ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ r->hints = hints;
+
+ s = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_callback, host, port,
+ hints);
+
+ if (s != 0) {
+ *addrs = NULL;
+ err = GRPC_ERROR_CREATE("getaddrinfo failed");
+ err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, uv_strerror(s));
+ grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ gpr_free(r);
+ gpr_free(req);
+ gpr_free(hints);
+ }
+}
+
+void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) =
+ resolve_address_impl;
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 2af8af82dc..e139293c03 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -31,12 +31,13 @@
*
*/
-#include <grpc/support/port_platform.h>
-#ifdef GPR_WINSOCK_SOCKET
+#include "src/core/lib/iomgr/port.h"
+#ifdef GRPC_WINSOCK_SOCKET
-#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+
#include <string.h>
#include <sys/types.h>
@@ -124,8 +125,7 @@ static grpc_error *blocking_resolve_address_impl(
{
for (i = 0; i < (*addresses)->naddrs; i++) {
char *buf;
- grpc_sockaddr_to_string(
- &buf, (struct sockaddr *)&(*addresses)->addrs[i].addr, 0);
+ grpc_sockaddr_to_string(&buf, &(*addresses)->addrs[i], 0);
gpr_free(buf);
}
}
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 5466973408..bfc905845d 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -49,6 +49,7 @@ struct grpc_resource_quota {
gpr_refcount refs;
/* Master combiner lock: all activity on a quota executes under this combiner
+ * (so no mutex is needed for this data structure)
*/
grpc_combiner *combiner;
/* Size of the resource quota */
@@ -75,7 +76,7 @@ struct grpc_resource_quota {
* list management
*/
-static void rulist_add_tail(grpc_resource_user *resource_user,
+static void rulist_add_head(grpc_resource_user *resource_user,
grpc_rulist list) {
grpc_resource_quota *resource_quota = resource_user->resource_quota;
grpc_resource_user **root = &resource_quota->roots[list];
@@ -88,10 +89,11 @@ static void rulist_add_tail(grpc_resource_user *resource_user,
resource_user->links[list].prev = (*root)->links[list].prev;
resource_user->links[list].next->links[list].prev =
resource_user->links[list].prev->links[list].next = resource_user;
+ *root = resource_user;
}
}
-static void rulist_add_head(grpc_resource_user *resource_user,
+static void rulist_add_tail(grpc_resource_user *resource_user,
grpc_rulist list) {
grpc_resource_quota *resource_quota = resource_user->resource_quota;
grpc_resource_user **root = &resource_quota->roots[list];
@@ -104,7 +106,6 @@ static void rulist_add_head(grpc_resource_user *resource_user,
resource_user->links[list].prev = *root;
resource_user->links[list].next->links[list].prev =
resource_user->links[list].prev->links[list].next = resource_user;
- *root = resource_user;
}
}
@@ -113,8 +114,8 @@ static bool rulist_empty(grpc_resource_quota *resource_quota,
return resource_quota->roots[list] == NULL;
}
-static grpc_resource_user *rulist_pop(grpc_resource_quota *resource_quota,
- grpc_rulist list) {
+static grpc_resource_user *rulist_pop_head(grpc_resource_quota *resource_quota,
+ grpc_rulist list) {
grpc_resource_user **root = &resource_quota->roots[list];
grpc_resource_user *resource_user = *root;
if (resource_user == NULL) {
@@ -149,22 +150,22 @@ static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) {
}
/*******************************************************************************
- * buffer pool state machine
+ * resource quota state machine
*/
static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
-static bool rq_scavenge(grpc_exec_ctx *exec_ctx,
- grpc_resource_quota *resource_quota);
+static bool rq_reclaim_from_per_user_free_pool(
+ grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota);
static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota, bool destructive);
-static void rq_step(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) {
- grpc_resource_quota *resource_quota = bp;
+static void rq_step(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) {
+ grpc_resource_quota *resource_quota = rq;
resource_quota->step_scheduled = false;
do {
if (rq_alloc(exec_ctx, resource_quota)) goto done;
- } while (rq_scavenge(exec_ctx, resource_quota));
+ } while (rq_reclaim_from_per_user_free_pool(exec_ctx, resource_quota));
rq_reclaim(exec_ctx, resource_quota, false) ||
rq_reclaim(exec_ctx, resource_quota, true);
done:
@@ -185,8 +186,8 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx,
static bool rq_alloc(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
grpc_resource_user *resource_user;
- while ((resource_user =
- rulist_pop(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) {
+ while ((resource_user = rulist_pop_head(resource_quota,
+ GRPC_RULIST_AWAITING_ALLOCATION))) {
gpr_mu_lock(&resource_user->mu);
if (resource_user->free_pool < 0 &&
-resource_user->free_pool <= resource_quota->free_pool) {
@@ -194,13 +195,13 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
resource_user->free_pool = 0;
resource_quota->free_pool -= amt;
if (grpc_resource_quota_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64
+ gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64
" bytes; rq_free_pool -> %" PRId64,
resource_quota->name, resource_user->name, amt,
resource_quota->free_pool);
}
} else if (grpc_resource_quota_trace && resource_user->free_pool >= 0) {
- gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request",
+ gpr_log(GPR_DEBUG, "RQ %s %s: discard already satisfied alloc request",
resource_quota->name, resource_user->name);
}
if (resource_user->free_pool >= 0) {
@@ -217,18 +218,18 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
}
/* returns true if any memory could be reclaimed from buffers */
-static bool rq_scavenge(grpc_exec_ctx *exec_ctx,
- grpc_resource_quota *resource_quota) {
+static bool rq_reclaim_from_per_user_free_pool(
+ grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) {
grpc_resource_user *resource_user;
- while ((resource_user =
- rulist_pop(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
+ while ((resource_user = rulist_pop_head(resource_quota,
+ GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
gpr_mu_lock(&resource_user->mu);
if (resource_user->free_pool > 0) {
int64_t amt = resource_user->free_pool;
resource_user->free_pool = 0;
resource_quota->free_pool += amt;
if (grpc_resource_quota_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64
+ gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
" bytes; rq_free_pool -> %" PRId64,
resource_quota->name, resource_user->name, amt,
resource_quota->free_pool);
@@ -248,10 +249,10 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
if (resource_quota->reclaiming) return true;
grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
: GRPC_RULIST_RECLAIMER_BENIGN;
- grpc_resource_user *resource_user = rulist_pop(resource_quota, list);
+ grpc_resource_user *resource_user = rulist_pop_head(resource_quota, list);
if (resource_user == NULL) return false;
if (grpc_resource_quota_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclamation",
+ gpr_log(GPR_DEBUG, "RQ %s %s: initiate %s reclamation",
resource_quota->name, resource_user->name,
destructive ? "destructive" : "benign");
}
@@ -314,11 +315,12 @@ static gpr_slice ru_slice_create(grpc_resource_user *resource_user,
}
/*******************************************************************************
- * grpc_resource_quota internal implementation
+ * grpc_resource_quota internal implementation: resource user manipulation under
+ * the combiner
*/
-static void ru_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
- grpc_resource_user *resource_user = bu;
+static void ru_allocate(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
+ grpc_resource_user *resource_user = ru;
if (rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION)) {
rq_step_sched(exec_ctx, resource_user->resource_quota);
@@ -326,9 +328,9 @@ static void ru_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
}
-static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu,
+static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
- grpc_resource_user *resource_user = bu;
+ grpc_resource_user *resource_user = ru;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@@ -338,9 +340,9 @@ static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu,
rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
}
-static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
+static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
- grpc_resource_user *resource_user = bu;
+ grpc_resource_user *resource_user = ru;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@@ -352,9 +354,9 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
}
-static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
+static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
- grpc_resource_user *resource_user = bu;
+ grpc_resource_user *resource_user = ru;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@@ -368,8 +370,8 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
}
-static void ru_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
- grpc_resource_user *resource_user = bu;
+static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
+ grpc_resource_user *resource_user = ru;
GPR_ASSERT(resource_user->allocated == 0);
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
@@ -387,9 +389,9 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
}
}
-static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
+static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_resource_user_slice_allocator *slice_allocator = ts;
+ grpc_resource_user_slice_allocator *slice_allocator = arg;
if (error == GRPC_ERROR_NONE) {
for (size_t i = 0; i < slice_allocator->count; i++) {
gpr_slice_buffer_add_indexed(
@@ -400,6 +402,11 @@ static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
}
+/*******************************************************************************
+ * grpc_resource_quota internal implementation: quota manipulation under the
+ * combiner
+ */
+
typedef struct {
int64_t size;
grpc_resource_quota *resource_quota;
@@ -411,20 +418,14 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
int64_t delta = a->size - a->resource_quota->size;
a->resource_quota->size += delta;
a->resource_quota->free_pool += delta;
- if (delta < 0 && a->resource_quota->free_pool < 0) {
- rq_step_sched(exec_ctx, a->resource_quota);
- } else if (delta > 0 &&
- !rulist_empty(a->resource_quota,
- GRPC_RULIST_AWAITING_ALLOCATION)) {
- rq_step_sched(exec_ctx, a->resource_quota);
- }
+ rq_step_sched(exec_ctx, a->resource_quota);
grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota);
gpr_free(a);
}
-static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *bp,
+static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq,
grpc_error *error) {
- grpc_resource_quota *resource_quota = bp;
+ grpc_resource_quota *resource_quota = rq;
resource_quota->reclaiming = false;
rq_step_sched(exec_ctx, resource_quota);
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
@@ -434,6 +435,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *bp,
* grpc_resource_quota api
*/
+/* Public API */
grpc_resource_quota *grpc_resource_quota_create(const char *name) {
grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota));
gpr_ref_init(&resource_quota->refs, 1);
@@ -466,6 +468,7 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
+/* Public API */
void grpc_resource_quota_unref(grpc_resource_quota *resource_quota) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
@@ -478,10 +481,12 @@ grpc_resource_quota *grpc_resource_quota_internal_ref(
return resource_quota;
}
+/* Public API */
void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) {
grpc_resource_quota_internal_ref(resource_quota);
}
+/* Public API */
void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
size_t size) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -513,12 +518,12 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args(
return grpc_resource_quota_create(NULL);
}
-static void *rq_copy(void *bp) {
- grpc_resource_quota_ref(bp);
- return bp;
+static void *rq_copy(void *rq) {
+ grpc_resource_quota_ref(rq);
+ return rq;
}
-static void rq_destroy(void *bp) { grpc_resource_quota_unref(bp); }
+static void rq_destroy(void *rq) { grpc_resource_quota_unref(rq); }
static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
@@ -558,9 +563,6 @@ void grpc_resource_user_init(grpc_resource_user *resource_user,
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_user->links[i].next = resource_user->links[i].prev = NULL;
}
-#ifndef NDEBUG
- resource_user->asan_canary = gpr_malloc(1);
-#endif
if (name != NULL) {
resource_user->name = gpr_strdup(name);
} else {
@@ -587,9 +589,6 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
-#ifndef NDEBUG
- gpr_free(resource_user->asan_canary);
-#endif
grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota);
gpr_mu_destroy(&resource_user->mu);
gpr_free(resource_user->name);
@@ -604,7 +603,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
if (on_done_destroy != NULL) {
/* already shutdown */
if (grpc_resource_quota_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown",
+ gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR " after shutdown",
resource_user->resource_quota->name, resource_user->name, size);
}
grpc_exec_ctx_sched(
@@ -616,7 +615,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
resource_user->allocated += (int64_t)size;
resource_user->free_pool -= (int64_t)size;
if (grpc_resource_quota_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
+ gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
", free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->allocated, resource_user->free_pool);
@@ -644,7 +643,7 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
resource_user->free_pool += (int64_t)size;
resource_user->allocated -= (int64_t)size;
if (grpc_resource_quota_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64
+ gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; allocated -> %" PRId64
", free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->allocated, resource_user->free_pool);
@@ -685,7 +684,7 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (grpc_resource_quota_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: reclamation complete",
+ gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
grpc_combiner_execute(
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index af94a19911..6dfac55f88 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -49,7 +49,8 @@
resource constrained, grpc_resource_user instances are asked (in turn) to
free up whatever they can so that the system as a whole can make progress.
- There are three kinds of reclamation that take place:
+ There are three kinds of reclamation that take place, in order of increasing
+ invasiveness:
- an internal reclamation, where cached resource at the resource user level
is returned to the quota
- a benign reclamation phase, whereby resources that are in use but are not
@@ -58,9 +59,14 @@
make progress may be enacted so that at least one part of the system can
complete.
- These reclamations are tried in priority order, and only one reclamation
- is outstanding for a quota at any given time (meaning that if a destructive
- reclamation makes progress, we may follow up with a benign reclamation).
+ Only one reclamation will be outstanding for a given quota at a given time.
+ On each reclamation attempt, the kinds of reclamation are tried in order of
+ increasing invasiveness, stopping at the first one that succeeds. Thus, on a
+ given reclamation attempt, if internal and benign reclamation both fail, it
+ will wind up doing a destructive reclamation. However, the next reclamation
+ attempt may then be able to get what it needs via internal or benign
+ reclamation, due to resources that may have been freed up by the destructive
+ reclamation in the previous attempt.
Future work will be to expose the current resource pressure so that back
pressure can be applied to avoid reclamation phases starting.
@@ -106,24 +112,20 @@ struct grpc_resource_user {
/* The quota this resource user consumes from */
grpc_resource_quota *resource_quota;
- /* Closure to schedule an allocation onder the resource quota combiner lock */
+ /* Closure to schedule an allocation under the resource quota combiner lock */
grpc_closure allocate_closure;
/* Closure to publish a non empty free pool under the resource quota combiner
lock */
grpc_closure add_to_free_pool_closure;
-#ifndef NDEBUG
- /* Canary object to detect leaked resource users with ASAN */
- void *asan_canary;
-#endif
-
gpr_mu mu;
- /* Total allocated memory outstanding by this resource user;
+ /* Total allocated memory outstanding by this resource user in bytes;
always positive */
int64_t allocated;
- /* The amount of memory this user has cached for its own use: to avoid quota
- contention, each resource user can keep some memory aside from the quota,
- and the quota can pull it back under memory pressure.
+ /* The amount of memory (in bytes) this user has cached for its own use: to
+ avoid quota contention, each resource user can keep some memory in
+ addition to what it is immediately using (e.g., for caching), and the quota
+ can pull it back under memory pressure.
This value can become negative if more memory has been requested than
existed in the free pool, at which point the quota is consulted to bring
this value non-negative (asynchronously). */
@@ -148,7 +150,8 @@ struct grpc_resource_user {
resource user */
grpc_closure destroy_closure;
/* User supplied closure to call once the user has finished shutting down AND
- all outstanding allocations have been freed */
+ all outstanding allocations have been freed. Real type is grpc_closure*,
+ but it's stored as an atomic to avoid a mutex on some fast paths. */
gpr_atm on_done_destroy_closure;
/* Links in the various grpc_rulist lists */
@@ -167,7 +170,7 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user);
-/* Allocate from the resource user (and it's quota).
+/* Allocate from the resource user (and its quota).
If optional_on_done is NULL, then allocate immediately. This may push the
quota over-limit, at which point reclamation will kick in.
If optional_on_done is non-NULL, it will be scheduled when the allocation has
@@ -191,20 +194,28 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
/* Helper to allocate slices from a resource user */
typedef struct grpc_resource_user_slice_allocator {
+ /* Closure for when a resource user allocation completes */
grpc_closure on_allocated;
+ /* Closure to call when slices have been allocated */
grpc_closure on_done;
+ /* Length of slices to allocate on the current request */
size_t length;
+ /* Number of slices to allocate on the current request */
size_t count;
+ /* Destination for slices to allocate on the current request */
gpr_slice_buffer *dest;
+ /* Parent resource user */
grpc_resource_user *resource_user;
} grpc_resource_user_slice_allocator;
-/* Initialize a slice allocator */
+/* Initialize a slice allocator.
+ When an allocation is completed, calls \a cb with arg \p. */
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p);
-/* Allocate \a count slices of length \a length into \a dest. */
+/* Allocate \a count slices of length \a length into \a dest. Only one request
+ can be outstanding at a time. */
void grpc_resource_user_alloc_slices(
grpc_exec_ctx *exec_ctx,
grpc_resource_user_slice_allocator *slice_allocator, size_t length,
diff --git a/src/core/lib/iomgr/sockaddr.h b/src/core/lib/iomgr/sockaddr.h
index 5563d0b8a6..52b504390d 100644
--- a/src/core/lib/iomgr/sockaddr.h
+++ b/src/core/lib/iomgr/sockaddr.h
@@ -31,16 +31,24 @@
*
*/
+/* This header transitively includes other headers that care about include
+ * order, so it should be included first. As a consequence, it should not be
+ * included in any other header. */
+
#ifndef GRPC_CORE_LIB_IOMGR_SOCKADDR_H
#define GRPC_CORE_LIB_IOMGR_SOCKADDR_H
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+#include <uv.h>
+#endif
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/sockaddr_windows.h"
#endif
-#ifdef GPR_POSIX_SOCKETADDR
+#ifdef GRPC_POSIX_SOCKETADDR
#include "src/core/lib/iomgr/sockaddr_posix.h"
#endif
diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c
index 127d95c618..44bc2f968b 100644
--- a/src/core/lib/iomgr/sockaddr_utils.c
+++ b/src/core/lib/iomgr/sockaddr_utils.c
@@ -42,26 +42,32 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
static const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0xff, 0xff};
-int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr,
- struct sockaddr_in *addr4_out) {
- GPR_ASSERT(addr != (struct sockaddr *)addr4_out);
+int grpc_sockaddr_is_v4mapped(const grpc_resolved_address *resolved_addr,
+ grpc_resolved_address *resolved_addr4_out) {
+ GPR_ASSERT(resolved_addr != resolved_addr4_out);
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
+ struct sockaddr_in *addr4_out =
+ (struct sockaddr_in *)resolved_addr4_out->addr;
if (addr->sa_family == AF_INET6) {
const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix,
sizeof(kV4MappedPrefix)) == 0) {
- if (addr4_out != NULL) {
+ if (resolved_addr4_out != NULL) {
/* Normalize ::ffff:0.0.0.0/96 to IPv4. */
- memset(addr4_out, 0, sizeof(*addr4_out));
+ memset(resolved_addr4_out, 0, sizeof(*resolved_addr4_out));
addr4_out->sin_family = AF_INET;
/* s6_addr32 would be nice, but it's non-standard. */
memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4);
addr4_out->sin_port = addr6->sin6_port;
+ resolved_addr4_out->len = sizeof(struct sockaddr_in);
}
return 1;
}
@@ -69,26 +75,33 @@ int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr,
return 0;
}
-int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr,
- struct sockaddr_in6 *addr6_out) {
- GPR_ASSERT(addr != (struct sockaddr *)addr6_out);
+int grpc_sockaddr_to_v4mapped(const grpc_resolved_address *resolved_addr,
+ grpc_resolved_address *resolved_addr6_out) {
+ GPR_ASSERT(resolved_addr != resolved_addr6_out);
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
+ struct sockaddr_in6 *addr6_out =
+ (struct sockaddr_in6 *)resolved_addr6_out->addr;
if (addr->sa_family == AF_INET) {
const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr;
- memset(addr6_out, 0, sizeof(*addr6_out));
+ memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out));
addr6_out->sin6_family = AF_INET6;
memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12);
memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4);
addr6_out->sin6_port = addr4->sin_port;
+ resolved_addr6_out->len = sizeof(struct sockaddr_in6);
return 1;
}
return 0;
}
-int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out) {
- struct sockaddr_in addr4_normalized;
- if (grpc_sockaddr_is_v4mapped(addr, &addr4_normalized)) {
- addr = (struct sockaddr *)&addr4_normalized;
+int grpc_sockaddr_is_wildcard(const grpc_resolved_address *resolved_addr,
+ int *port_out) {
+ const struct sockaddr *addr;
+ grpc_resolved_address addr4_normalized;
+ if (grpc_sockaddr_is_v4mapped(resolved_addr, &addr4_normalized)) {
+ resolved_addr = &addr4_normalized;
}
+ addr = (const struct sockaddr *)resolved_addr->addr;
if (addr->sa_family == AF_INET) {
/* Check for 0.0.0.0 */
const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr;
@@ -113,39 +126,49 @@ int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out) {
}
}
-void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out,
- struct sockaddr_in6 *wild6_out) {
+void grpc_sockaddr_make_wildcards(int port, grpc_resolved_address *wild4_out,
+ grpc_resolved_address *wild6_out) {
grpc_sockaddr_make_wildcard4(port, wild4_out);
grpc_sockaddr_make_wildcard6(port, wild6_out);
}
-void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out) {
+void grpc_sockaddr_make_wildcard4(int port,
+ grpc_resolved_address *resolved_wild_out) {
+ struct sockaddr_in *wild_out = (struct sockaddr_in *)resolved_wild_out->addr;
GPR_ASSERT(port >= 0 && port < 65536);
- memset(wild_out, 0, sizeof(*wild_out));
+ memset(resolved_wild_out, 0, sizeof(*resolved_wild_out));
wild_out->sin_family = AF_INET;
wild_out->sin_port = htons((uint16_t)port);
+ resolved_wild_out->len = sizeof(struct sockaddr_in);
}
-void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out) {
+void grpc_sockaddr_make_wildcard6(int port,
+ grpc_resolved_address *resolved_wild_out) {
+ struct sockaddr_in6 *wild_out =
+ (struct sockaddr_in6 *)resolved_wild_out->addr;
GPR_ASSERT(port >= 0 && port < 65536);
- memset(wild_out, 0, sizeof(*wild_out));
+ memset(resolved_wild_out, 0, sizeof(*resolved_wild_out));
wild_out->sin6_family = AF_INET6;
wild_out->sin6_port = htons((uint16_t)port);
+ resolved_wild_out->len = sizeof(struct sockaddr_in6);
}
-int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
+int grpc_sockaddr_to_string(char **out,
+ const grpc_resolved_address *resolved_addr,
int normalize) {
+ const struct sockaddr *addr;
const int save_errno = errno;
- struct sockaddr_in addr_normalized;
+ grpc_resolved_address addr_normalized;
char ntop_buf[INET6_ADDRSTRLEN];
const void *ip = NULL;
int port;
int ret;
*out = NULL;
- if (normalize && grpc_sockaddr_is_v4mapped(addr, &addr_normalized)) {
- addr = (const struct sockaddr *)&addr_normalized;
+ if (normalize && grpc_sockaddr_is_v4mapped(resolved_addr, &addr_normalized)) {
+ resolved_addr = &addr_normalized;
}
+ addr = (const struct sockaddr *)resolved_addr->addr;
if (addr->sa_family == AF_INET) {
const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr;
ip = &addr4->sin_addr;
@@ -155,10 +178,8 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
}
- /* Windows inet_ntop wants a mutable ip pointer */
if (ip != NULL &&
- inet_ntop(addr->sa_family, (void *)ip, ntop_buf, sizeof(ntop_buf)) !=
- NULL) {
+ grpc_inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) {
ret = gpr_join_host_port(out, ntop_buf, port);
} else {
ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family);
@@ -168,39 +189,43 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
return ret;
}
-char *grpc_sockaddr_to_uri(const struct sockaddr *addr) {
+char *grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr) {
char *temp;
char *result;
- struct sockaddr_in addr_normalized;
+ grpc_resolved_address addr_normalized;
+ const struct sockaddr *addr;
- if (grpc_sockaddr_is_v4mapped(addr, &addr_normalized)) {
- addr = (const struct sockaddr *)&addr_normalized;
+ if (grpc_sockaddr_is_v4mapped(resolved_addr, &addr_normalized)) {
+ resolved_addr = &addr_normalized;
}
+ addr = (const struct sockaddr *)resolved_addr->addr;
+
switch (addr->sa_family) {
case AF_INET:
- grpc_sockaddr_to_string(&temp, addr, 0);
+ grpc_sockaddr_to_string(&temp, resolved_addr, 0);
gpr_asprintf(&result, "ipv4:%s", temp);
gpr_free(temp);
return result;
case AF_INET6:
- grpc_sockaddr_to_string(&temp, addr, 0);
+ grpc_sockaddr_to_string(&temp, resolved_addr, 0);
gpr_asprintf(&result, "ipv6:%s", temp);
gpr_free(temp);
return result;
default:
- return grpc_sockaddr_to_uri_unix_if_possible(addr);
+ return grpc_sockaddr_to_uri_unix_if_possible(resolved_addr);
}
}
-int grpc_sockaddr_get_port(const struct sockaddr *addr) {
+int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) {
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
switch (addr->sa_family) {
case AF_INET:
return ntohs(((struct sockaddr_in *)addr)->sin_port);
case AF_INET6:
return ntohs(((struct sockaddr_in6 *)addr)->sin6_port);
default:
- if (grpc_is_unix_socket(addr)) {
+ if (grpc_is_unix_socket(resolved_addr)) {
return 1;
}
gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port",
@@ -209,7 +234,9 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) {
}
}
-int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) {
+int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr,
+ int port) {
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
switch (addr->sa_family) {
case AF_INET:
GPR_ASSERT(port >= 0 && port < 65536);
diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h
index 9f81992e6b..5371e360c5 100644
--- a/src/core/lib/iomgr/sockaddr_utils.h
+++ b/src/core/lib/iomgr/sockaddr_utils.h
@@ -34,40 +34,40 @@
#ifndef GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H
#define GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H
-#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/resolve_address.h"
/* Returns true if addr is an IPv4-mapped IPv6 address within the
::ffff:0.0.0.0/96 range, or false otherwise.
If addr4_out is non-NULL, the inner IPv4 address will be copied here when
returning true. */
-int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr,
- struct sockaddr_in *addr4_out);
+int grpc_sockaddr_is_v4mapped(const grpc_resolved_address *addr,
+ grpc_resolved_address *addr4_out);
/* If addr is an AF_INET address, writes the corresponding ::ffff:0.0.0.0/96
address to addr6_out and returns true. Otherwise returns false. */
-int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr,
- struct sockaddr_in6 *addr6_out);
+int grpc_sockaddr_to_v4mapped(const grpc_resolved_address *addr,
+ grpc_resolved_address *addr6_out);
/* If addr is ::, 0.0.0.0, or ::ffff:0.0.0.0, writes the port number to
*port_out (if not NULL) and returns true, otherwise returns false. */
-int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out);
+int grpc_sockaddr_is_wildcard(const grpc_resolved_address *addr, int *port_out);
/* Writes 0.0.0.0:port and [::]:port to separate sockaddrs. */
-void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out,
- struct sockaddr_in6 *wild6_out);
+void grpc_sockaddr_make_wildcards(int port, grpc_resolved_address *wild4_out,
+ grpc_resolved_address *wild6_out);
/* Writes 0.0.0.0:port. */
-void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out);
+void grpc_sockaddr_make_wildcard4(int port, grpc_resolved_address *wild_out);
/* Writes [::]:port. */
-void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out);
+void grpc_sockaddr_make_wildcard6(int port, grpc_resolved_address *wild_out);
/* Return the IP port number of a sockaddr */
-int grpc_sockaddr_get_port(const struct sockaddr *addr);
+int grpc_sockaddr_get_port(const grpc_resolved_address *addr);
/* Set IP port number of a sockaddr */
-int grpc_sockaddr_set_port(const struct sockaddr *addr, int port);
+int grpc_sockaddr_set_port(const grpc_resolved_address *addr, int port);
/* Converts a sockaddr into a newly-allocated human-readable string.
@@ -81,9 +81,9 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port);
In the unlikely event of an error, returns -1 and sets *out to NULL.
The existing value of errno is always preserved. */
-int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
+int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr,
int normalize);
-char *grpc_sockaddr_to_uri(const struct sockaddr *addr);
+char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr);
#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */
diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h
new file mode 100644
index 0000000000..cc3ee2e30c
--- /dev/null
+++ b/src/core/lib/iomgr/socket_utils.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H
+#define GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H
+
+#include <stddef.h>
+
+/* A wrapper for inet_ntop on POSIX systems and InetNtop on Windows systems */
+const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size);
+
+#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c
index d2f6261e2a..bc28bbe316 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.c
+++ b/src/core/lib/iomgr/socket_utils_common_posix.c
@@ -31,10 +31,11 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
+#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include <arpa/inet.h>
@@ -78,7 +79,7 @@ grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) {
}
grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) {
-#ifdef GPR_HAVE_SO_NOSIGPIPE
+#ifdef GRPC_HAVE_SO_NOSIGPIPE
int val = 1;
int newval;
socklen_t intlen = sizeof(newval);
@@ -96,7 +97,7 @@ grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) {
}
grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) {
-#ifdef GPR_HAVE_IP_PKTINFO
+#ifdef GRPC_HAVE_IP_PKTINFO
int get_local_ip = 1;
if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
sizeof(get_local_ip))) {
@@ -107,7 +108,7 @@ grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) {
}
grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
-#ifdef GPR_HAVE_IPV6_RECVPKTINFO
+#ifdef GRPC_HAVE_IPV6_RECVPKTINFO
int get_local_ip = 1;
if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
sizeof(get_local_ip))) {
@@ -253,7 +254,7 @@ static int set_socket_dualstack(int fd) {
}
}
-static grpc_error *error_for_fd(int fd, const struct sockaddr *addr) {
+static grpc_error *error_for_fd(int fd, const grpc_resolved_address *addr) {
if (fd >= 0) return GRPC_ERROR_NONE;
char *addr_str;
grpc_sockaddr_to_string(&addr_str, addr, 0);
@@ -263,10 +264,10 @@ static grpc_error *error_for_fd(int fd, const struct sockaddr *addr) {
return err;
}
-grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
- int protocol,
- grpc_dualstack_mode *dsmode,
- int *newfd) {
+grpc_error *grpc_create_dualstack_socket(
+ const grpc_resolved_address *resolved_addr, int type, int protocol,
+ grpc_dualstack_mode *dsmode, int *newfd) {
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
int family = addr->sa_family;
if (family == AF_INET6) {
if (grpc_ipv6_loopback_available()) {
@@ -281,9 +282,9 @@ grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
return GRPC_ERROR_NONE;
}
/* If this isn't an IPv4 address, then return whatever we've got. */
- if (!grpc_sockaddr_is_v4mapped(addr, NULL)) {
+ if (!grpc_sockaddr_is_v4mapped(resolved_addr, NULL)) {
*dsmode = GRPC_DSMODE_IPV6;
- return error_for_fd(*newfd, addr);
+ return error_for_fd(*newfd, resolved_addr);
}
/* Fall back to AF_INET. */
if (*newfd >= 0) {
@@ -293,7 +294,12 @@ grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
}
*dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
*newfd = socket(family, type, protocol);
- return error_for_fd(*newfd, addr);
+ return error_for_fd(*newfd, resolved_addr);
+}
+
+const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) {
+ GPR_ASSERT(size <= (socklen_t)-1);
+ return inet_ntop(af, src, dst, (socklen_t)size);
}
#endif
diff --git a/src/core/lib/iomgr/socket_utils_linux.c b/src/core/lib/iomgr/socket_utils_linux.c
index 144e3110c8..bf6e9e4f55 100644
--- a/src/core/lib/iomgr/socket_utils_linux.c
+++ b/src/core/lib/iomgr/socket_utils_linux.c
@@ -31,21 +31,27 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_LINUX_SOCKETUTILS
+#ifdef GRPC_LINUX_SOCKETUTILS
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
+#include <grpc/support/log.h>
+
#include <sys/socket.h>
#include <sys/types.h>
-int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen,
- int nonblock, int cloexec) {
+int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock,
+ int cloexec) {
int flags = 0;
+ GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t));
+ GPR_ASSERT(resolved_addr->len <= (socklen_t)-1);
flags |= nonblock ? SOCK_NONBLOCK : 0;
flags |= cloexec ? SOCK_CLOEXEC : 0;
- return accept4(sockfd, addr, addrlen, flags);
+ return accept4(sockfd, (struct sockaddr *)resolved_addr->addr,
+ (socklen_t *)&resolved_addr->len, flags);
}
#endif
diff --git a/src/core/lib/iomgr/socket_utils_posix.c b/src/core/lib/iomgr/socket_utils_posix.c
index 57ae64c103..9dea0c0cd8 100644
--- a/src/core/lib/iomgr/socket_utils_posix.c
+++ b/src/core/lib/iomgr/socket_utils_posix.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKETUTILS
+#ifdef GRPC_POSIX_SOCKETUTILS
#include "src/core/lib/iomgr/socket_utils_posix.h"
@@ -42,12 +42,15 @@
#include <unistd.h>
#include <grpc/support/log.h>
+#include "src/core/lib/iomgr/sockaddr.h"
-int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen,
- int nonblock, int cloexec) {
+int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock,
+ int cloexec) {
int fd, flags;
-
- fd = accept(sockfd, addr, addrlen);
+ GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t));
+ GPR_ASSERT(resolved_addr->len <= (socklen_t)-1);
+ fd = accept(sockfd, (struct sockaddr *)resolved_addr->addr,
+ (socklen_t *)&resolved_addr->len);
if (fd >= 0) {
if (nonblock) {
flags = fcntl(fd, F_GETFL, 0);
@@ -67,4 +70,4 @@ close_and_error:
return -1;
}
-#endif /* GPR_POSIX_SOCKETUTILS */
+#endif /* GRPC_POSIX_SOCKETUTILS */
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index 7bcc2219ae..175fb2b717 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -34,14 +34,16 @@
#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H
#define GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H
+#include "src/core/lib/iomgr/resolve_address.h"
+
#include <sys/socket.h>
#include <unistd.h>
#include "src/core/lib/iomgr/error.h"
/* a wrapper for accept or accept4 */
-int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen,
- int nonblock, int cloexec);
+int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock,
+ int cloexec);
/* set a socket to non blocking mode */
grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking);
@@ -125,8 +127,8 @@ extern int grpc_forbid_dualstack_sockets_for_testing;
IPv4, so that bind() or connect() see the correct family.
Also, it's important to distinguish between DUALSTACK and IPV6 when
listening on the [::] wildcard address. */
-grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
- int protocol,
+grpc_error *grpc_create_dualstack_socket(const grpc_resolved_address *addr,
+ int type, int protocol,
grpc_dualstack_mode *dsmode,
int *newfd);
diff --git a/src/core/lib/iomgr/socket_utils_uv.c b/src/core/lib/iomgr/socket_utils_uv.c
new file mode 100644
index 0000000000..741bf28969
--- /dev/null
+++ b/src/core/lib/iomgr/socket_utils_uv.c
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <uv.h>
+
+#include "src/core/lib/iomgr/socket_utils.h"
+
+#include <grpc/support/log.h>
+
+const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) {
+ uv_inet_ntop(af, src, dst, size);
+ return dst;
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.c
new file mode 100644
index 0000000000..628ad4a45b
--- /dev/null
+++ b/src/core/lib/iomgr/socket_utils_windows.c
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_WINDOWS_SOCKETUTILS
+
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
+
+#include <grpc/support/log.h>
+
+const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) {
+ /* Windows InetNtopA wants a mutable ip pointer */
+ return InetNtopA(af, (void *)src, dst, size);
+}
+
+#endif /* GRPC_WINDOWS_SOCKETUTILS */
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index 78ef46d042..35f23300dc 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
#include <winsock2.h>
@@ -156,4 +156,4 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
if (should_destroy) destroy(socket);
}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index d1b0e9cd19..18e6e60ebc 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -37,7 +37,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/pollset_set.h"
-#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/resolve_address.h"
/* Channel arg (integer) setting how large a slice to try and read from the wire
each time recvmsg (or equivalent) is called */
@@ -52,7 +52,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect,
grpc_endpoint **endpoint,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
- const struct sockaddr *addr, size_t addr_len,
+ const grpc_resolved_address *addr,
gpr_timespec deadline);
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 500c988146..bc08c94ee0 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/iomgr/tcp_client_posix.h"
@@ -73,7 +73,7 @@ typedef struct {
grpc_channel_args *channel_args;
} async_connect;
-static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) {
+static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
grpc_error *err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
@@ -257,14 +257,14 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
- const struct sockaddr *addr,
- size_t addr_len, gpr_timespec deadline) {
+ const grpc_resolved_address *addr,
+ gpr_timespec deadline) {
int fd;
grpc_dualstack_mode dsmode;
int err;
async_connect *ac;
- struct sockaddr_in6 addr6_v4mapped;
- struct sockaddr_in addr4_copy;
+ grpc_resolved_address addr6_v4mapped;
+ grpc_resolved_address addr4_copy;
grpc_fd *fdobj;
char *name;
char *addr_str;
@@ -274,8 +274,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
/* Use dualstack sockets where available. */
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
- addr = (const struct sockaddr *)&addr6_v4mapped;
- addr_len = sizeof(addr6_v4mapped);
+ addr = &addr6_v4mapped;
}
error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
@@ -286,8 +285,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (dsmode == GRPC_DSMODE_IPV4) {
/* If we got an AF_INET socket, map the address back to IPv4. */
GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy));
- addr = (struct sockaddr *)&addr4_copy;
- addr_len = sizeof(addr4_copy);
+ addr = &addr4_copy;
}
if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) {
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
@@ -295,8 +293,9 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}
do {
- GPR_ASSERT(addr_len < ~(socklen_t)0);
- err = connect(fd, addr, (socklen_t)addr_len);
+ GPR_ASSERT(addr->len < ~(socklen_t)0);
+ err =
+ connect(fd, (const struct sockaddr *)addr->addr, (socklen_t)addr->len);
} while (err < 0 && errno == EINTR);
addr_str = grpc_sockaddr_to_uri(addr);
@@ -354,17 +353,17 @@ done:
void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
- const struct sockaddr *addr, size_t addr_len,
+ const grpc_resolved_address *addr,
gpr_timespec deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
- const struct sockaddr *addr, size_t addr_len,
+ const grpc_resolved_address *addr,
gpr_timespec deadline) {
grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
- channel_args, addr, addr_len, deadline);
+ channel_args, addr, deadline);
}
#endif
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
new file mode 100644
index 0000000000..6274667042
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -0,0 +1,153 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_uv.h"
+#include "src/core/lib/iomgr/timer.h"
+
+typedef struct grpc_uv_tcp_connect {
+ uv_connect_t connect_req;
+ grpc_timer alarm;
+ uv_tcp_t *tcp_handle;
+ grpc_closure *closure;
+ grpc_endpoint **endpoint;
+ int refs;
+ char *addr_name;
+} grpc_uv_tcp_connect;
+
+static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) {
+ gpr_free(connect);
+}
+
+static void tcp_close_callback(uv_handle_t *handle) { gpr_free(handle); }
+
+static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
+ grpc_error *error) {
+ int done;
+ grpc_uv_tcp_connect *connect = acp;
+ if (error == GRPC_ERROR_NONE) {
+ /* error == NONE implies that the timer ran out, and wasn't cancelled. If
+ it was cancelled, then the handler that cancelled it also should close
+ the handle, if applicable */
+ uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback);
+ }
+ done = (--connect->refs == 0);
+ if (done) {
+ uv_tcp_connect_cleanup(connect);
+ }
+}
+
+static void uv_tc_on_connect(uv_connect_t *req, int status) {
+ grpc_uv_tcp_connect *connect = req->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_error *error = GRPC_ERROR_NONE;
+ int done;
+ grpc_closure *closure = connect->closure;
+ grpc_timer_cancel(&exec_ctx, &connect->alarm);
+ if (status == 0) {
+ *connect->endpoint =
+ grpc_tcp_create(connect->tcp_handle, connect->addr_name);
+ } else {
+ error = GRPC_ERROR_CREATE("Failed to connect to remote host");
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status);
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
+ if (status == UV_ECANCELED) {
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ "Timeout occurred");
+ // This should only happen if the handle is already closed
+ } else {
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(status));
+ uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback);
+ }
+ }
+ done = (--connect->refs == 0);
+ if (done) {
+ uv_tcp_connect_cleanup(connect);
+ }
+ grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties,
+ const grpc_resolved_address *resolved_addr,
+ gpr_timespec deadline) {
+ grpc_uv_tcp_connect *connect;
+ (void)interested_parties;
+ connect = gpr_malloc(sizeof(grpc_uv_tcp_connect));
+ memset(connect, 0, sizeof(grpc_uv_tcp_connect));
+ connect->closure = closure;
+ connect->endpoint = ep;
+ connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t));
+ connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
+ uv_tcp_init(uv_default_loop(), connect->tcp_handle);
+ connect->connect_req.data = connect;
+ // TODO(murgatroid99): figure out what the return value here means
+ uv_tcp_connect(&connect->connect_req, connect->tcp_handle,
+ (const struct sockaddr *)resolved_addr->addr,
+ uv_tc_on_connect);
+ grpc_timer_init(exec_ctx, &connect->alarm,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ uv_tc_on_alarm, connect, gpr_now(GPR_CLOCK_MONOTONIC));
+}
+
+// overridden by api_fuzzer.c
+void (*grpc_tcp_client_connect_impl)(
+ grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties, const grpc_resolved_address *addr,
+ gpr_timespec deadline) = tcp_client_connect_impl;
+
+void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties,
+ const grpc_resolved_address *addr,
+ gpr_timespec deadline) {
+ grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr,
+ deadline);
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 562cb9c6bf..fdd8c1a1f8 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
#include "src/core/lib/iomgr/sockaddr_windows.h"
@@ -129,13 +129,13 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_endpoint **endpoint,
grpc_pollset_set *interested_parties,
- const struct sockaddr *addr, size_t addr_len,
+ const grpc_resolved_address *addr,
gpr_timespec deadline) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
int status;
- struct sockaddr_in6 addr6_v4mapped;
- struct sockaddr_in6 local_address;
+ grpc_resolved_address addr6_v4mapped;
+ grpc_resolved_address local_address;
async_connect *ac;
grpc_winsocket *socket = NULL;
LPFN_CONNECTEX ConnectEx;
@@ -148,8 +148,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
/* Use dualstack sockets where available. */
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
- addr = (const struct sockaddr *)&addr6_v4mapped;
- addr_len = sizeof(addr6_v4mapped);
+ addr = &addr6_v4mapped;
}
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
@@ -178,7 +177,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_sockaddr_make_wildcard6(0, &local_address);
- status = bind(sock, (struct sockaddr *)&local_address, sizeof(local_address));
+ status =
+ bind(sock, (struct sockaddr *)&local_address.addr, local_address.len);
if (status != 0) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
goto failure;
@@ -186,8 +186,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
socket = grpc_winsocket_create(sock, "client");
info = &socket->write_info;
- success =
- ConnectEx(sock, addr, (int)addr_len, NULL, 0, NULL, &info->overlapped);
+ success = ConnectEx(sock, (struct sockaddr *)&addr->addr, (int)addr->len,
+ NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
an IOCP notification, so let's ignore it. */
@@ -228,4 +228,4 @@ failure:
grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 27b6677545..880af93ee1 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/tcp_posix.h"
@@ -58,14 +58,14 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
-#ifdef GPR_HAVE_MSG_NOSIGNAL
+#ifdef GRPC_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS MSG_NOSIGNAL
#else
#define SENDMSG_FLAGS 0
#endif
-#ifdef GPR_MSG_IOVLEN_TYPE
-typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type;
+#ifdef GRPC_MSG_IOVLEN_TYPE
+typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
#else
typedef size_t msg_iovlen_type;
#endif
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 2568d7d836..6eba8c4057 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -38,6 +38,7 @@
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/resolve_address.h"
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;
@@ -79,8 +80,9 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
-grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len, int *out_port);
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
+ const grpc_resolved_address *addr,
+ int *out_port);
/* Number of fds at the given port_index, or 0 if port_index is out of
bounds. */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 648736caa9..b6fc1e4ca2 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -36,9 +36,9 @@
#define _GNU_SOURCE
#endif
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/iomgr/tcp_server.h"
@@ -62,6 +62,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
@@ -79,11 +80,7 @@ struct grpc_tcp_listener {
int fd;
grpc_fd *emfd;
grpc_tcp_server *server;
- union {
- uint8_t untyped[GRPC_MAX_SOCKADDR_SIZE];
- struct sockaddr sockaddr;
- } addr;
- size_t addr_len;
+ grpc_resolved_address addr;
int port;
unsigned port_index;
unsigned fd_index;
@@ -256,7 +253,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->head) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
- grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr);
+ grpc_unlink_if_unix_domain_socket(&sp->addr);
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
@@ -322,11 +319,9 @@ static int get_max_accept_queue_size(void) {
}
/* Prepare a recently-created socket for listening. */
-static grpc_error *prepare_socket(int fd, const struct sockaddr *addr,
- size_t addr_len, bool so_reuseport,
- int *port) {
- struct sockaddr_storage sockname_temp;
- socklen_t sockname_len;
+static grpc_error *prepare_socket(int fd, const grpc_resolved_address *addr,
+ bool so_reuseport, int *port) {
+ grpc_resolved_address sockname_temp;
grpc_error *err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
@@ -349,8 +344,8 @@ static grpc_error *prepare_socket(int fd, const struct sockaddr *addr,
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (err != GRPC_ERROR_NONE) goto error;
- GPR_ASSERT(addr_len < ~(socklen_t)0);
- if (bind(fd, addr, (socklen_t)addr_len) < 0) {
+ GPR_ASSERT(addr->len < ~(socklen_t)0);
+ if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) {
err = GRPC_OS_ERROR(errno, "bind");
goto error;
}
@@ -360,13 +355,15 @@ static grpc_error *prepare_socket(int fd, const struct sockaddr *addr,
goto error;
}
- sockname_len = sizeof(sockname_temp);
- if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) {
+ sockname_temp.len = sizeof(struct sockaddr_storage);
+
+ if (getsockname(fd, (struct sockaddr *)sockname_temp.addr,
+ (socklen_t *)&sockname_temp.len) < 0) {
err = GRPC_OS_ERROR(errno, "getsockname");
goto error;
}
- *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ *port = grpc_sockaddr_get_port(&sockname_temp);
return GRPC_ERROR_NONE;
error:
@@ -400,13 +397,13 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {
- struct sockaddr_storage addr;
- socklen_t addrlen = sizeof(addr);
+ grpc_resolved_address addr;
char *addr_str;
char *name;
+ addr.len = sizeof(struct sockaddr_storage);
/* Note: If we ever decide to return this address to the user, remember to
strip off the ::ffff:0.0.0.0/96 prefix first. */
- int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1);
+ int fd = grpc_accept4(sp->fd, &addr, 1, 1);
if (fd < 0) {
switch (errno) {
case EINTR:
@@ -422,7 +419,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_set_socket_no_sigpipe_if_possible(fd);
- addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr);
+ addr_str = grpc_sockaddr_to_uri(&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
if (grpc_tcp_trace) {
@@ -461,19 +458,18 @@ error:
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
- const struct sockaddr *addr,
- size_t addr_len, unsigned port_index,
- unsigned fd_index,
+ const grpc_resolved_address *addr,
+ unsigned port_index, unsigned fd_index,
grpc_tcp_listener **listener) {
grpc_tcp_listener *sp = NULL;
int port = -1;
char *addr_str;
char *name;
- grpc_error *err = prepare_socket(fd, addr, addr_len, s->so_reuseport, &port);
+ grpc_error *err = prepare_socket(fd, addr, s->so_reuseport, &port);
if (err == GRPC_ERROR_NONE) {
GPR_ASSERT(port > 0);
- grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu);
s->nports++;
@@ -489,8 +485,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
- memcpy(sp->addr.untyped, addr, addr_len);
- sp->addr_len = addr_len;
+ memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;
sp->fd_index = fd_index;
@@ -523,14 +518,13 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
int fd = -1;
int port = -1;
grpc_dualstack_mode dsmode;
- err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0,
- &dsmode, &fd);
+ err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
+ &fd);
if (err != GRPC_ERROR_NONE) return err;
- err = prepare_socket(fd, &listener->addr.sockaddr, listener->addr_len, true,
- &port);
+ err = prepare_socket(fd, &listener->addr, true, &port);
if (err != GRPC_ERROR_NONE) return err;
listener->server->nports++;
- grpc_sockaddr_to_string(&addr_str, &listener->addr.sockaddr, 1);
+ grpc_sockaddr_to_string(&addr_str, &listener->addr, 1);
gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
sp = gpr_malloc(sizeof(grpc_tcp_listener));
sp->next = listener->next;
@@ -543,8 +537,7 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
sp->server = listener->server;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
- memcpy(sp->addr.untyped, listener->addr.untyped, listener->addr_len);
- sp->addr_len = listener->addr_len;
+ memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;
sp->fd_index = listener->fd_index + count - i;
@@ -559,19 +552,19 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
return GRPC_ERROR_NONE;
}
-grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len, int *out_port) {
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
+ const grpc_resolved_address *addr,
+ int *out_port) {
grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL;
int fd;
grpc_dualstack_mode dsmode;
- struct sockaddr_in6 addr6_v4mapped;
- struct sockaddr_in wild4;
- struct sockaddr_in6 wild6;
- struct sockaddr_in addr4_copy;
- struct sockaddr *allocated_addr = NULL;
- struct sockaddr_storage sockname_temp;
- socklen_t sockname_len;
+ grpc_resolved_address addr6_v4mapped;
+ grpc_resolved_address wild4;
+ grpc_resolved_address wild6;
+ grpc_resolved_address addr4_copy;
+ grpc_resolved_address *allocated_addr = NULL;
+ grpc_resolved_address sockname_temp;
int port;
unsigned port_index = 0;
unsigned fd_index = 0;
@@ -579,19 +572,19 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
- grpc_unlink_if_unix_domain_socket((struct sockaddr *)addr);
+ grpc_unlink_if_unix_domain_socket(addr);
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
for (sp = s->head; sp; sp = sp->next) {
- sockname_len = sizeof(sockname_temp);
- if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp,
- &sockname_len)) {
- port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ sockname_temp.len = sizeof(struct sockaddr_storage);
+ if (0 == getsockname(sp->fd, (struct sockaddr *)sockname_temp.addr,
+ (socklen_t *)&sockname_temp.len)) {
+ port = grpc_sockaddr_get_port(&sockname_temp);
if (port > 0) {
- allocated_addr = gpr_malloc(addr_len);
- memcpy(allocated_addr, addr, addr_len);
+ allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));
+ memcpy(allocated_addr, addr, addr->len);
grpc_sockaddr_set_port(allocated_addr, port);
addr = allocated_addr;
break;
@@ -603,8 +596,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
sp = NULL;
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
- addr = (const struct sockaddr *)&addr6_v4mapped;
- addr_len = sizeof(addr6_v4mapped);
+ addr = &addr6_v4mapped;
}
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
@@ -612,12 +604,10 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
/* Try listening on IPv6 first. */
- addr = (struct sockaddr *)&wild6;
- addr_len = sizeof(wild6);
+ addr = &wild6;
errs[0] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
if (errs[0] == GRPC_ERROR_NONE) {
- errs[0] = add_socket_to_server(s, fd, addr, addr_len, port_index,
- fd_index, &sp);
+ errs[0] = add_socket_to_server(s, fd, addr, port_index, fd_index, &sp);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -626,23 +616,20 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && sp != NULL) {
- grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
+ grpc_sockaddr_set_port(&wild4, sp->port);
}
}
- addr = (struct sockaddr *)&wild4;
- addr_len = sizeof(wild4);
+ addr = &wild4;
}
errs[1] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
if (errs[1] == GRPC_ERROR_NONE) {
if (dsmode == GRPC_DSMODE_IPV4 &&
grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
- addr = (struct sockaddr *)&addr4_copy;
- addr_len = sizeof(addr4_copy);
+ addr = &addr4_copy;
}
sp2 = sp;
- errs[1] =
- add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index, &sp);
+ errs[1] = add_socket_to_server(s, fd, addr, port_index, fd_index, &sp);
if (sp2 != NULL && sp != NULL) {
sp2->sibling = sp;
sp->is_sibling = 1;
@@ -723,7 +710,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->pollset_count = pollset_count;
sp = s->head;
while (sp != NULL) {
- if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr.sockaddr) &&
+ if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
pollset_count > 1) {
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
new file mode 100644
index 0000000000..73e4db3d65
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -0,0 +1,365 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/iomgr/tcp_uv.h"
+
+/* one listening port */
+typedef struct grpc_tcp_listener grpc_tcp_listener;
+struct grpc_tcp_listener {
+ uv_tcp_t *handle;
+ grpc_tcp_server *server;
+ unsigned port_index;
+ int port;
+ /* linked list */
+ struct grpc_tcp_listener *next;
+};
+
+struct grpc_tcp_server {
+ gpr_refcount refs;
+
+ /* Called whenever accept() succeeds on a server port. */
+ grpc_tcp_server_cb on_accept_cb;
+ void *on_accept_cb_arg;
+
+ int open_ports;
+
+ /* linked list of server ports */
+ grpc_tcp_listener *head;
+ grpc_tcp_listener *tail;
+
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
+
+ /* shutdown callback */
+ grpc_closure *shutdown_complete;
+};
+
+grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
+ grpc_tcp_server **server) {
+ grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ (void)args;
+ gpr_ref_init(&s->refs, 1);
+ s->on_accept_cb = NULL;
+ s->on_accept_cb_arg = NULL;
+ s->open_ports = 0;
+ s->head = NULL;
+ s->tail = NULL;
+ s->shutdown_starting.head = NULL;
+ s->shutdown_starting.tail = NULL;
+ s->shutdown_complete = shutdown_complete;
+ *server = s;
+ return GRPC_ERROR_NONE;
+}
+
+grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
+ gpr_ref(&s->refs);
+ return s;
+}
+
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
+ grpc_closure *shutdown_starting) {
+ grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
+ GRPC_ERROR_NONE);
+}
+
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ }
+
+ while (s->head) {
+ grpc_tcp_listener *sp = s->head;
+ s->head = sp->next;
+ sp->next = NULL;
+ gpr_free(sp->handle);
+ gpr_free(sp);
+ }
+ gpr_free(s);
+}
+
+static void handle_close_callback(uv_handle_t *handle) {
+ grpc_tcp_listener *sp = (grpc_tcp_listener *)handle->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ sp->server->open_ports--;
+ if (sp->server->open_ports == 0) {
+ finish_shutdown(&exec_ctx, sp->server);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ int immediately_done = 0;
+ grpc_tcp_listener *sp;
+
+ if (s->open_ports == 0) {
+ immediately_done = 1;
+ }
+ for (sp = s->head; sp; sp = sp->next) {
+ uv_close((uv_handle_t *)sp->handle, handle_close_callback);
+ }
+
+ if (immediately_done) {
+ finish_shutdown(exec_ctx, s);
+ }
+}
+
+void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ if (gpr_unref(&s->refs)) {
+ /* Complete shutdown_starting work before destroying. */
+ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
+ if (exec_ctx == NULL) {
+ grpc_exec_ctx_flush(&local_exec_ctx);
+ tcp_server_destroy(&local_exec_ctx, s);
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ } else {
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ tcp_server_destroy(exec_ctx, s);
+ }
+ }
+}
+
+static void accepted_connection_close_cb(uv_handle_t *handle) {
+ gpr_free(handle);
+}
+
+static void on_connect(uv_stream_t *server, int status) {
+ grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
+ grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
+ uv_tcp_t *client;
+ grpc_endpoint *ep = NULL;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resolved_address peer_name;
+ char *peer_name_string;
+ int err;
+
+ if (status < 0) {
+ gpr_log(GPR_INFO, "Skipping on_accept due to error: %s",
+ uv_strerror(status));
+ return;
+ }
+ client = gpr_malloc(sizeof(uv_tcp_t));
+ uv_tcp_init(uv_default_loop(), client);
+ // UV documentation says this is guaranteed to succeed
+ uv_accept((uv_stream_t *)server, (uv_stream_t *)client);
+ // If the server has not been started, we discard incoming connections
+ if (sp->server->on_accept_cb == NULL) {
+ uv_close((uv_handle_t *)client, accepted_connection_close_cb);
+ } else {
+ peer_name_string = NULL;
+ memset(&peer_name, 0, sizeof(grpc_resolved_address));
+ peer_name.len = sizeof(struct sockaddr_storage);
+ err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr,
+ (int *)&peer_name.len);
+ if (err == 0) {
+ peer_name_string = grpc_sockaddr_to_uri(&peer_name);
+ } else {
+ gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
+ }
+ ep = grpc_tcp_create(client, peer_name_string);
+ sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
+ &acceptor);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+}
+
+static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle,
+ const grpc_resolved_address *addr,
+ unsigned port_index,
+ grpc_tcp_listener **listener) {
+ grpc_tcp_listener *sp = NULL;
+ int port = -1;
+ int status;
+ grpc_error *error;
+ grpc_resolved_address sockname_temp;
+
+ // The last argument to uv_tcp_bind is flags
+ status = uv_tcp_bind(handle, (struct sockaddr *)addr->addr, 0);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE("Failed to bind to port");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
+ return error;
+ }
+
+ status = uv_listen((uv_stream_t *)handle, SOMAXCONN, on_connect);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE("Failed to listen to port");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
+ return error;
+ }
+
+ sockname_temp.len = (int)sizeof(struct sockaddr_storage);
+ status = uv_tcp_getsockname(handle, (struct sockaddr *)&sockname_temp.addr,
+ (int *)&sockname_temp.len);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE("getsockname failed");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
+ return error;
+ }
+
+ port = grpc_sockaddr_get_port(&sockname_temp);
+
+ GPR_ASSERT(port >= 0);
+ GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
+ sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
+ sp->server = s;
+ sp->handle = handle;
+ sp->port = port;
+ sp->port_index = port_index;
+ handle->data = sp;
+ s->open_ports++;
+ GPR_ASSERT(sp->handle);
+ *listener = sp;
+
+ return GRPC_ERROR_NONE;
+}
+
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
+ const grpc_resolved_address *addr,
+ int *port) {
+ // This function is mostly copied from tcp_server_windows.c
+ grpc_tcp_listener *sp = NULL;
+ uv_tcp_t *handle;
+ grpc_resolved_address addr6_v4mapped;
+ grpc_resolved_address wildcard;
+ grpc_resolved_address *allocated_addr = NULL;
+ grpc_resolved_address sockname_temp;
+ unsigned port_index = 0;
+ int status;
+ grpc_error *error = GRPC_ERROR_NONE;
+
+ if (s->tail != NULL) {
+ port_index = s->tail->port_index + 1;
+ }
+
+ /* Check if this is a wildcard port, and if so, try to keep the port the same
+ as some previously created listener. */
+ if (grpc_sockaddr_get_port(addr) == 0) {
+ for (sp = s->head; sp; sp = sp->next) {
+ sockname_temp.len = sizeof(struct sockaddr_storage);
+ if (0 == uv_tcp_getsockname(sp->handle,
+ (struct sockaddr *)&sockname_temp.addr,
+ (int *)&sockname_temp.len)) {
+ *port = grpc_sockaddr_get_port(&sockname_temp);
+ if (*port > 0) {
+ allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));
+ memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
+ grpc_sockaddr_set_port(allocated_addr, *port);
+ addr = allocated_addr;
+ break;
+ }
+ }
+ }
+ }
+
+ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
+ addr = &addr6_v4mapped;
+ }
+
+ /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
+ if (grpc_sockaddr_is_wildcard(addr, port)) {
+ grpc_sockaddr_make_wildcard6(*port, &wildcard);
+
+ addr = &wildcard;
+ }
+
+ handle = gpr_malloc(sizeof(uv_tcp_t));
+ status = uv_tcp_init(uv_default_loop(), handle);
+ if (status == 0) {
+ error = add_socket_to_server(s, handle, addr, port_index, &sp);
+ } else {
+ error = GRPC_ERROR_CREATE("Failed to initialize UV tcp handle");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
+ }
+
+ gpr_free(allocated_addr);
+
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING(
+ "Failed to add port to server", &error, 1);
+ GRPC_ERROR_UNREF(error);
+ error = error_out;
+ *port = -1;
+ } else {
+ GPR_ASSERT(sp != NULL);
+ *port = sp->port;
+ }
+ return error;
+}
+
+void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
+ grpc_pollset **pollsets, size_t pollset_count,
+ grpc_tcp_server_cb on_accept_cb, void *cb_arg) {
+ grpc_tcp_listener *sp;
+ (void)pollsets;
+ (void)pollset_count;
+ GPR_ASSERT(on_accept_cb);
+ GPR_ASSERT(!server->on_accept_cb);
+ server->on_accept_cb = on_accept_cb;
+ server->on_accept_cb_arg = cb_arg;
+ for (sp = server->head; sp; sp = sp->next) {
+ GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) ==
+ 0);
+ }
+}
+
+void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 4ff05601fa..ad6769a6ba 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -31,13 +31,13 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
-#include <io.h>
+#include "src/core/lib/iomgr/sockaddr.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include <io.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -48,6 +48,8 @@
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/pollset_windows.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/tcp_windows.h"
@@ -183,10 +185,10 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
/* Prepare (bind) a recently-created socket for listening. */
-static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr,
- size_t addr_len, int *port) {
- struct sockaddr_storage sockname_temp;
- socklen_t sockname_len;
+static grpc_error *prepare_socket(SOCKET sock,
+ const grpc_resolved_address *addr,
+ int *port) {
+ grpc_resolved_address sockname_temp;
grpc_error *error = GRPC_ERROR_NONE;
error = grpc_tcp_prepare_socket(sock);
@@ -194,7 +196,8 @@ static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr,
goto failure;
}
- if (bind(sock, addr, (int)addr_len) == SOCKET_ERROR) {
+ if (bind(sock, (const struct sockaddr *)addr->addr, (int)addr->len) ==
+ SOCKET_ERROR) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
goto failure;
}
@@ -204,14 +207,14 @@ static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr,
goto failure;
}
- sockname_len = sizeof(sockname_temp);
- if (getsockname(sock, (struct sockaddr *)&sockname_temp, &sockname_len) ==
- SOCKET_ERROR) {
+ sockname_temp.len = sizeof(struct sockaddr_storage);
+ if (getsockname(sock, (struct sockaddr *)sockname_temp.addr,
+ &sockname_temp.len) == SOCKET_ERROR) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
goto failure;
}
- *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ *port = grpc_sockaddr_get_port(&sockname_temp);
return GRPC_ERROR_NONE;
failure:
@@ -307,15 +310,16 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
- struct sockaddr_storage peer_name;
+ grpc_resolved_address peer_name;
char *peer_name_string;
char *fd_name;
- int peer_name_len = sizeof(peer_name);
DWORD transfered_bytes;
DWORD flags;
BOOL wsa_success;
int err;
+ peer_name.len = sizeof(struct sockaddr_storage);
+
/* The general mechanism for shutting down is to queue abortion calls. While
this is necessary in the read/write case, it's useless for the accept
case. We only need to adjust the pending callback count */
@@ -353,9 +357,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
gpr_free(utf8_message);
}
- err = getpeername(sock, (struct sockaddr *)&peer_name, &peer_name_len);
+ err =
+ getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name.len);
if (!err) {
- peer_name_string = grpc_sockaddr_to_uri((struct sockaddr *)&peer_name);
+ peer_name_string = grpc_sockaddr_to_uri(&peer_name);
} else {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
@@ -385,8 +390,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
- const struct sockaddr *addr,
- size_t addr_len, unsigned port_index,
+ const grpc_resolved_address *addr,
+ unsigned port_index,
grpc_tcp_listener **listener) {
grpc_tcp_listener *sp = NULL;
int port = -1;
@@ -410,7 +415,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
return NULL;
}
- error = prepare_socket(sock, addr, addr_len, &port);
+ error = prepare_socket(sock, addr, &port);
if (error != GRPC_ERROR_NONE) {
return error;
}
@@ -441,15 +446,15 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
return GRPC_ERROR_NONE;
}
-grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len, int *port) {
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
+ const grpc_resolved_address *addr,
+ int *port) {
grpc_tcp_listener *sp = NULL;
SOCKET sock;
- struct sockaddr_in6 addr6_v4mapped;
- struct sockaddr_in6 wildcard;
- struct sockaddr *allocated_addr = NULL;
- struct sockaddr_storage sockname_temp;
- socklen_t sockname_len;
+ grpc_resolved_address addr6_v4mapped;
+ grpc_resolved_address wildcard;
+ grpc_resolved_address *allocated_addr = NULL;
+ grpc_resolved_address sockname_temp;
unsigned port_index = 0;
grpc_error *error = GRPC_ERROR_NONE;
@@ -461,13 +466,14 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
for (sp = s->head; sp; sp = sp->next) {
- sockname_len = sizeof(sockname_temp);
+ sockname_temp.len = sizeof(struct sockaddr_storage);
if (0 == getsockname(sp->socket->socket,
- (struct sockaddr *)&sockname_temp, &sockname_len)) {
- *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ (struct sockaddr *)sockname_temp.addr,
+ &sockname_temp.len)) {
+ *port = grpc_sockaddr_get_port(&sockname_temp);
if (*port > 0) {
- allocated_addr = gpr_malloc(addr_len);
- memcpy(allocated_addr, addr, addr_len);
+ allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));
+ memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
grpc_sockaddr_set_port(allocated_addr, *port);
addr = allocated_addr;
break;
@@ -477,16 +483,14 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
- addr = (const struct sockaddr *)&addr6_v4mapped;
- addr_len = sizeof(addr6_v4mapped);
+ addr = &addr6_v4mapped;
}
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
if (grpc_sockaddr_is_wildcard(addr, port)) {
grpc_sockaddr_make_wildcard6(*port, &wildcard);
- addr = (struct sockaddr *)&wildcard;
- addr_len = sizeof(wildcard);
+ addr = &wildcard;
}
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
@@ -496,7 +500,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
goto done;
}
- error = add_socket_to_server(s, sock, addr, addr_len, port_index, &sp);
+ error = add_socket_to_server(s, sock, addr, port_index, &sp);
done:
gpr_free(allocated_addr);
@@ -535,4 +539,4 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
new file mode 100644
index 0000000000..3860fe3e9b
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -0,0 +1,338 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/network_status_tracker.h"
+#include "src/core/lib/iomgr/tcp_uv.h"
+#include "src/core/lib/support/string.h"
+
+int grpc_tcp_trace = 0;
+
+typedef struct {
+ grpc_endpoint base;
+ gpr_refcount refcount;
+
+ uv_tcp_t *handle;
+
+ grpc_closure *read_cb;
+ grpc_closure *write_cb;
+
+ gpr_slice read_slice;
+ gpr_slice_buffer *read_slices;
+ gpr_slice_buffer *write_slices;
+ uv_buf_t *write_buffers;
+
+ bool shutting_down;
+ char *peer_string;
+ grpc_pollset *pollset;
+} grpc_tcp;
+
+static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); }
+
+static void tcp_free(grpc_tcp *tcp) { gpr_free(tcp); }
+
+/*#define GRPC_TCP_REFCOUNT_DEBUG*/
+#ifdef GRPC_TCP_REFCOUNT_DEBUG
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count - 1);
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
+ }
+}
+
+static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count + 1);
+ gpr_ref(&tcp->refcount);
+}
+#else
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_REF(tcp, reason) tcp_ref((tcp))
+static void tcp_unref(grpc_tcp *tcp) {
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
+ }
+}
+
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
+#endif
+
+static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
+ uv_buf_t *buf) {
+ grpc_tcp *tcp = handle->data;
+ (void)suggested_size;
+ tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
+ buf->len = GPR_SLICE_LENGTH(tcp->read_slice);
+}
+
+static void read_callback(uv_stream_t *stream, ssize_t nread,
+ const uv_buf_t *buf) {
+ gpr_slice sub;
+ grpc_error *error;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_tcp *tcp = stream->data;
+ grpc_closure *cb = tcp->read_cb;
+ if (nread == 0) {
+ // Nothing happened. Wait for the next callback
+ return;
+ }
+ TCP_UNREF(tcp, "read");
+ tcp->read_cb = NULL;
+ // TODO(murgatroid99): figure out what the return value here means
+ uv_read_stop(stream);
+ if (nread == UV_EOF) {
+ error = GRPC_ERROR_CREATE("EOF");
+ } else if (nread > 0) {
+ // Successful read
+ sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
+ gpr_slice_buffer_add(tcp->read_slices, sub);
+ error = GRPC_ERROR_NONE;
+ if (grpc_tcp_trace) {
+ size_t i;
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+ grpc_error_free_string(str);
+ for (i = 0; i < tcp->read_slices->count; i++) {
+ char *dump = gpr_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
+ dump);
+ gpr_free(dump);
+ }
+ }
+ } else {
+ // nread < 0: Error
+ error = GRPC_ERROR_CREATE("TCP Read failed");
+ }
+ grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ gpr_slice_buffer *read_slices, grpc_closure *cb) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ int status;
+ grpc_error *error = GRPC_ERROR_NONE;
+ GPR_ASSERT(tcp->read_cb == NULL);
+ tcp->read_cb = cb;
+ tcp->read_slices = read_slices;
+ gpr_slice_buffer_reset_and_unref(read_slices);
+ TCP_REF(tcp, "read");
+ // TODO(murgatroid99): figure out what the return value here means
+ status =
+ uv_read_start((uv_stream_t *)tcp->handle, alloc_uv_buf, read_callback);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE("TCP Read failed at start");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ }
+ if (grpc_tcp_trace) {
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str);
+ }
+}
+
+static void write_callback(uv_write_t *req, int status) {
+ grpc_tcp *tcp = req->data;
+ grpc_error *error;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_closure *cb = tcp->write_cb;
+ tcp->write_cb = NULL;
+ TCP_UNREF(tcp, "write");
+ if (status == 0) {
+ error = GRPC_ERROR_NONE;
+ } else {
+ error = GRPC_ERROR_CREATE("TCP Write failed");
+ }
+ if (grpc_tcp_trace) {
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
+ }
+ gpr_free(tcp->write_buffers);
+ gpr_free(req);
+ grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ gpr_slice_buffer *write_slices,
+ grpc_closure *cb) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ uv_buf_t *buffers;
+ unsigned int buffer_count;
+ unsigned int i;
+ gpr_slice *slice;
+ uv_write_t *write_req;
+
+ if (grpc_tcp_trace) {
+ size_t j;
+
+ for (j = 0; j < write_slices->count; j++) {
+ char *data = gpr_dump_slice(write_slices->slices[j],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
+ gpr_free(data);
+ }
+ }
+
+ if (tcp->shutting_down) {
+ grpc_exec_ctx_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ return;
+ }
+
+ GPR_ASSERT(tcp->write_cb == NULL);
+ tcp->write_slices = write_slices;
+ GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
+ if (tcp->write_slices->count == 0) {
+ // No slices means we don't have to do anything,
+ // and libuv doesn't like empty writes
+ grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL);
+ return;
+ }
+
+ tcp->write_cb = cb;
+ buffer_count = (unsigned int)tcp->write_slices->count;
+ buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
+ for (i = 0; i < buffer_count; i++) {
+ slice = &tcp->write_slices->slices[i];
+ buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice);
+ buffers[i].len = GPR_SLICE_LENGTH(*slice);
+ }
+ write_req = gpr_malloc(sizeof(uv_write_t));
+ write_req->data = tcp;
+ TCP_REF(tcp, "write");
+ // TODO(murgatroid99): figure out what the return value here means
+ uv_write(write_req, (uv_stream_t *)tcp->handle, buffers, buffer_count,
+ write_callback);
+}
+
+static void uv_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_pollset *pollset) {
+ // No-op. We're ignoring pollsets currently
+ (void)exec_ctx;
+ (void)ep;
+ (void)pollset;
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ tcp->pollset = pollset;
+}
+
+static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_pollset_set *pollset) {
+ // No-op. We're ignoring pollsets currently
+ (void)exec_ctx;
+ (void)ep;
+ (void)pollset;
+}
+
+static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); }
+
+static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ if (!tcp->shutting_down) {
+ tcp->shutting_down = true;
+ uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t));
+ uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
+ }
+}
+
+static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ grpc_network_status_unregister_endpoint(ep);
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
+ TCP_UNREF(tcp, "destroy");
+}
+
+static char *uv_get_peer(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return gpr_strdup(tcp->peer_string);
+}
+
+static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
+
+static grpc_endpoint_vtable vtable = {uv_endpoint_read,
+ uv_endpoint_write,
+ uv_get_workqueue,
+ uv_add_to_pollset,
+ uv_add_to_pollset_set,
+ uv_endpoint_shutdown,
+ uv_destroy,
+ uv_get_peer};
+
+grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
+ grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
+
+ if (grpc_tcp_trace) {
+ gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
+ }
+
+ /* Disable Nagle's Algorithm */
+ uv_tcp_nodelay(handle, 1);
+
+ memset(tcp, 0, sizeof(grpc_tcp));
+ tcp->base.vtable = &vtable;
+ tcp->handle = handle;
+ handle->data = tcp;
+ gpr_ref_init(&tcp->refcount, 1);
+ tcp->peer_string = gpr_strdup(peer_string);
+ tcp->shutting_down = false;
+ /* Tell network status tracking code about the new endpoint */
+ grpc_network_status_register_endpoint(&tcp->base);
+
+#ifndef GRPC_UV_TCP_HOLD_LOOP
+ uv_unref((uv_handle_t *)handle);
+#endif
+
+ return &tcp->base;
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h
new file mode 100644
index 0000000000..eed41151ea
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_uv.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_UV_H
+#define GRPC_CORE_LIB_IOMGR_TCP_UV_H
+/*
+ Low level TCP "bottom half" implementation, for use by transports built on
+ top of a TCP connection.
+
+ Note that this file does not (yet) include APIs for creating the socket in
+ the first place.
+
+ All calls passing slice transfer ownership of a slice refcount unless
+ otherwise specified.
+*/
+
+#include "src/core/lib/iomgr/endpoint.h"
+
+#include <uv.h>
+
+extern int grpc_tcp_trace;
+
+#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
+
+grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string);
+
+#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 448a72671c..a5f508a2c3 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_WINSOCK_SOCKET
+#ifdef GRPC_WINSOCK_SOCKET
#include <limits.h>
@@ -417,4 +417,4 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
return &tcp->base;
}
-#endif /* GPR_WINSOCK_SOCKET */
+#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index 5a9a177963..20fe98c4a7 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -34,19 +34,20 @@
#ifndef GRPC_CORE_LIB_IOMGR_TIMER_H
#define GRPC_CORE_LIB_IOMGR_TIMER_H
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+#include "src/core/lib/iomgr/timer_uv.h"
+#else
+#include "src/core/lib/iomgr/timer_generic.h"
+#endif /* GRPC_UV */
+
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
-typedef struct grpc_timer {
- gpr_timespec deadline;
- uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
- int triggered;
- struct grpc_timer *next;
- struct grpc_timer *prev;
- grpc_closure closure;
-} grpc_timer;
+typedef struct grpc_timer grpc_timer;
/* Initialize *timer. When expired or canceled, timer_cb will be called with
*timer_cb_arg and error set to indicate if it expired (GRPC_ERROR_NONE) or
diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer_generic.c
index 9975fa1671..00058f9d86 100644
--- a/src/core/lib/iomgr/timer.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -31,6 +31,10 @@
*
*/
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_TIMER_USE_GENERIC
+
#include "src/core/lib/iomgr/timer.h"
#include <grpc/support/log.h>
@@ -382,3 +386,5 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("Shutting down timer system"));
}
+
+#endif /* GRPC_TIMER_USE_GENERIC */
diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h
new file mode 100644
index 0000000000..e4494adb5f
--- /dev/null
+++ b/src/core/lib/iomgr/timer_generic.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H
+#define GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H
+
+#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+struct grpc_timer {
+ gpr_timespec deadline;
+ uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
+ int triggered;
+ struct grpc_timer *next;
+ struct grpc_timer *prev;
+ grpc_closure closure;
+};
+
+#endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */
diff --git a/src/core/lib/iomgr/timer_heap.c b/src/core/lib/iomgr/timer_heap.c
index 2ad9bb9cd2..f736d335e6 100644
--- a/src/core/lib/iomgr/timer_heap.c
+++ b/src/core/lib/iomgr/timer_heap.c
@@ -31,6 +31,10 @@
*
*/
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_TIMER_USE_GENERIC
+
#include "src/core/lib/iomgr/timer_heap.h"
#include <string.h>
@@ -144,3 +148,5 @@ grpc_timer *grpc_timer_heap_top(grpc_timer_heap *heap) {
void grpc_timer_heap_pop(grpc_timer_heap *heap) {
grpc_timer_heap_remove(heap, grpc_timer_heap_top(heap));
}
+
+#endif /* GRPC_TIMER_USE_GENERIC */
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
new file mode 100644
index 0000000000..cfcb89268b
--- /dev/null
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -0,0 +1,99 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#if GRPC_UV
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/timer.h"
+
+#include <uv.h>
+
+static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); }
+
+static void stop_uv_timer(uv_timer_t *handle) {
+ uv_timer_stop(handle);
+ uv_unref((uv_handle_t *)handle);
+ uv_close((uv_handle_t *)handle, timer_close_callback);
+}
+
+void run_expired_timer(uv_timer_t *handle) {
+ grpc_timer *timer = (grpc_timer *)handle->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GPR_ASSERT(!timer->triggered);
+ timer->triggered = 1;
+ grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ stop_uv_timer(handle);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
+ gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
+ void *timer_cb_arg, gpr_timespec now) {
+ uint64_t timeout;
+ uv_timer_t *uv_timer;
+ grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
+ if (gpr_time_cmp(deadline, now) <= 0) {
+ timer->triggered = 1;
+ grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ return;
+ }
+ timer->triggered = 0;
+ timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now));
+ uv_timer = gpr_malloc(sizeof(uv_timer_t));
+ uv_timer_init(uv_default_loop(), uv_timer);
+ uv_timer->data = timer;
+ timer->uv_timer = uv_timer;
+ uv_timer_start(uv_timer, run_expired_timer, timeout, 0);
+}
+
+void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
+ if (!timer->triggered) {
+ timer->triggered = 1;
+ grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
+ stop_uv_timer((uv_timer_t *)timer->uv_timer);
+ }
+}
+
+bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
+ gpr_timespec *next) {
+ return false;
+}
+
+void grpc_timer_list_init(gpr_timespec now) {}
+void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h
new file mode 100644
index 0000000000..3de383ebd5
--- /dev/null
+++ b/src/core/lib/iomgr/timer_uv.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TIMER_UV_H
+#define GRPC_CORE_LIB_IOMGR_TIMER_UV_H
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+struct grpc_timer {
+ grpc_closure closure;
+ /* This is actually a uv_timer_t*, but we want to keep platform-specific
+ types out of headers */
+ void *uv_timer;
+ int triggered;
+};
+
+#endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index edf7b133e9..fd0c7a0f9d 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -36,9 +36,9 @@
#define _GNU_SOURCE
#endif
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET
#include "src/core/lib/iomgr/udp_server.h"
@@ -62,32 +62,30 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
+#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
-#define INIT_PORT_CAP 2
-
/* one listening port */
-typedef struct {
+typedef struct grpc_udp_listener grpc_udp_listener;
+struct grpc_udp_listener {
int fd;
grpc_fd *emfd;
grpc_udp_server *server;
- union {
- uint8_t untyped[GRPC_MAX_SOCKADDR_SIZE];
- struct sockaddr sockaddr;
- } addr;
- size_t addr_len;
+ grpc_resolved_address addr;
grpc_closure read_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
grpc_udp_server_orphan_cb orphan_cb;
-} server_port;
+
+ struct grpc_udp_listener *next;
+};
/* the overall server */
struct grpc_udp_server {
gpr_mu mu;
- gpr_cv cv;
/* active port count: how many ports are actually still listening */
size_t active_ports;
@@ -97,10 +95,10 @@ struct grpc_udp_server {
/* is this server shutting down? (boolean) */
int shutdown;
- /* all listening ports */
- server_port *ports;
- size_t nports;
- size_t port_capacity;
+ /* linked list of server ports */
+ grpc_udp_listener *head;
+ grpc_udp_listener *tail;
+ unsigned nports;
/* shutdown callback */
grpc_closure *shutdown_complete;
@@ -116,24 +114,29 @@ struct grpc_udp_server {
grpc_udp_server *grpc_udp_server_create(void) {
grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server));
gpr_mu_init(&s->mu);
- gpr_cv_init(&s->cv);
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
- s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
+ s->head = NULL;
+ s->tail = NULL;
s->nports = 0;
- s->port_capacity = INIT_PORT_CAP;
return s;
}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ }
gpr_mu_destroy(&s->mu);
- gpr_cv_destroy(&s->cv);
- gpr_free(s->ports);
+ while (s->head) {
+ grpc_udp_listener *sp = s->head;
+ s->head = sp->next;
+ gpr_free(sp);
+ }
+
gpr_free(s);
}
@@ -154,8 +157,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
events will be received on them - at this point it's safe to destroy
things */
static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
- size_t i;
-
/* delete ALL the things */
gpr_mu_lock(&s->mu);
@@ -164,9 +165,11 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
return;
}
- if (s->nports) {
- for (i = 0; i < s->nports; i++) {
- server_port *sp = &s->ports[i];
+ if (s->head) {
+ grpc_udp_listener *sp;
+ for (sp = s->head; sp; sp = sp->next) {
+ grpc_unlink_if_unix_domain_socket(&sp->addr);
+
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
@@ -187,7 +190,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_closure *on_done) {
- size_t i;
+ grpc_udp_listener *sp;
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
@@ -197,14 +200,10 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
/* shutdown all fd's */
if (s->active_ports) {
- for (i = 0; i < s->nports; i++) {
- server_port *sp = &s->ports[i];
- /* Call the orphan_cb to signal that the FD is about to be closed and
- * should no longer be used. */
+ for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(sp->emfd);
-
- grpc_fd_shutdown(exec_ctx, s->ports[i].emfd);
+ grpc_fd_shutdown(exec_ctx, sp->emfd);
}
gpr_mu_unlock(&s->mu);
} else {
@@ -214,10 +213,9 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
}
/* Prepare a recently-created socket for listening. */
-static int prepare_socket(int fd, const struct sockaddr *addr,
- size_t addr_len) {
- struct sockaddr_storage sockname_temp;
- socklen_t sockname_len;
+static int prepare_socket(int fd, const grpc_resolved_address *addr) {
+ grpc_resolved_address sockname_temp;
+ struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr;
/* Set send/receive socket buffers to 1 MB */
int buffer_size_bytes = 1024 * 1024;
@@ -237,15 +235,15 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
if (grpc_set_socket_ip_pktinfo_if_possible(fd) != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Unable to set ip_pktinfo.");
goto error;
- } else if (addr->sa_family == AF_INET6) {
+ } else if (addr_ptr->sa_family == AF_INET6) {
if (grpc_set_socket_ipv6_recvpktinfo_if_possible(fd) != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Unable to set ipv6_recvpktinfo.");
goto error;
}
}
- GPR_ASSERT(addr_len < ~(socklen_t)0);
- if (bind(fd, addr, (socklen_t)addr_len) < 0) {
+ GPR_ASSERT(addr->len < ~(socklen_t)0);
+ if (bind(fd, (struct sockaddr *)addr, (socklen_t)addr->len) < 0) {
char *addr_str;
grpc_sockaddr_to_string(&addr_str, addr, 0);
gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
@@ -253,8 +251,10 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
goto error;
}
- sockname_len = sizeof(sockname_temp);
- if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) {
+ sockname_temp.len = sizeof(struct sockaddr_storage);
+
+ if (getsockname(fd, (struct sockaddr *)sockname_temp.addr,
+ (socklen_t *)&sockname_temp.len) < 0) {
goto error;
}
@@ -270,7 +270,7 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
goto error;
}
- return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ return grpc_sockaddr_get_port(&sockname_temp);
error:
if (fd >= 0) {
@@ -281,10 +281,10 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- server_port *sp = arg;
+ grpc_udp_listener *sp = arg;
+ gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
- gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_mu_unlock(&sp->server->mu);
deactivated_all_ports(exec_ctx, sp->server);
@@ -300,34 +300,37 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ gpr_mu_unlock(&sp->server->mu);
}
static int add_socket_to_server(grpc_udp_server *s, int fd,
- const struct sockaddr *addr, size_t addr_len,
+ const grpc_resolved_address *addr,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_orphan_cb orphan_cb) {
- server_port *sp;
+ grpc_udp_listener *sp;
int port;
char *addr_str;
char *name;
- port = prepare_socket(fd, addr, addr_len);
+ port = prepare_socket(fd, addr);
if (port >= 0) {
- grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
gpr_mu_lock(&s->mu);
- /* append it to the list under a lock */
- if (s->nports == s->port_capacity) {
- s->port_capacity *= 2;
- s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
+ s->nports++;
+ sp = gpr_malloc(sizeof(grpc_udp_listener));
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
}
- sp = &s->ports[s->nports++];
+ s->tail = sp;
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
- memcpy(sp->addr.untyped, addr, addr_len);
- sp->addr_len = addr_len;
+ memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->read_cb = read_cb;
sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd);
@@ -338,34 +341,34 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
return port;
}
-int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb,
+int grpc_udp_server_add_port(grpc_udp_server *s,
+ const grpc_resolved_address *addr,
+ grpc_udp_server_read_cb read_cb,
grpc_udp_server_orphan_cb orphan_cb) {
+ grpc_udp_listener *sp;
int allocated_port1 = -1;
int allocated_port2 = -1;
- unsigned i;
int fd;
grpc_dualstack_mode dsmode;
- struct sockaddr_in6 addr6_v4mapped;
- struct sockaddr_in wild4;
- struct sockaddr_in6 wild6;
- struct sockaddr_in addr4_copy;
- struct sockaddr *allocated_addr = NULL;
- struct sockaddr_storage sockname_temp;
- socklen_t sockname_len;
+ grpc_resolved_address addr6_v4mapped;
+ grpc_resolved_address wild4;
+ grpc_resolved_address wild6;
+ grpc_resolved_address addr4_copy;
+ grpc_resolved_address *allocated_addr = NULL;
+ grpc_resolved_address sockname_temp;
int port;
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
- for (i = 0; i < s->nports; i++) {
- sockname_len = sizeof(sockname_temp);
- if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp,
- &sockname_len)) {
- port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ for (sp = s->head; sp; sp = sp->next) {
+ sockname_temp.len = sizeof(struct sockaddr_storage);
+ if (0 == getsockname(sp->fd, (struct sockaddr *)sockname_temp.addr,
+ (socklen_t *)&sockname_temp.len)) {
+ port = grpc_sockaddr_get_port(&sockname_temp);
if (port > 0) {
- allocated_addr = gpr_malloc(addr_len);
- memcpy(allocated_addr, addr, addr_len);
+ allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));
+ memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
grpc_sockaddr_set_port(allocated_addr, port);
addr = allocated_addr;
break;
@@ -375,8 +378,7 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
}
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
- addr = (const struct sockaddr *)&addr6_v4mapped;
- addr_len = sizeof(addr6_v4mapped);
+ addr = &addr6_v4mapped;
}
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
@@ -384,22 +386,19 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
/* Try listening on IPv6 first. */
- addr = (struct sockaddr *)&wild6;
- addr_len = sizeof(wild6);
+ addr = &wild6;
// TODO(rjshade): Test and propagate the returned grpc_error*:
grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
- allocated_port1 =
- add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
+ allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && allocated_port1 > 0) {
- grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1);
+ grpc_sockaddr_set_port(&wild4, allocated_port1);
}
- addr = (struct sockaddr *)&wild4;
- addr_len = sizeof(wild4);
+ addr = &wild4;
}
// TODO(rjshade): Test and propagate the returned grpc_error*:
@@ -409,11 +408,9 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
}
if (dsmode == GRPC_DSMODE_IPV4 &&
grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
- addr = (struct sockaddr *)&addr4_copy;
- addr_len = sizeof(addr4_copy);
+ addr = &addr4_copy;
}
- allocated_port2 =
- add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
+ allocated_port2 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
done:
gpr_free(allocated_addr);
@@ -421,27 +418,40 @@ done:
}
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
- return (port_index < s->nports) ? s->ports[port_index].fd : -1;
+ grpc_udp_listener *sp;
+ if (port_index >= s->nports) {
+ return -1;
+ }
+
+ for (sp = s->head; sp && port_index != 0; sp = sp->next) {
+ --port_index;
+ }
+ return sp->fd;
}
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_pollset **pollsets, size_t pollset_count,
grpc_server *server) {
- size_t i, j;
+ size_t i;
gpr_mu_lock(&s->mu);
+ grpc_udp_listener *sp;
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
s->grpc_server = server;
- for (i = 0; i < s->nports; i++) {
- for (j = 0; j < pollset_count; j++) {
- grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);
+
+ sp = s->head;
+ while (sp != NULL) {
+ for (i = 0; i < pollset_count; i++) {
+ grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- s->ports[i].read_closure.cb = on_read;
- s->ports[i].read_closure.cb_arg = &s->ports[i];
- grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd,
- &s->ports[i].read_closure);
+ sp->read_closure.cb = on_read;
+ sp->read_closure.cb_arg = sp;
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+
s->active_ports++;
+ sp = sp->next;
}
+
gpr_mu_unlock(&s->mu);
}
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 33c5ce11cd..f3c466a031 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -36,6 +36,7 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/resolve_address.h"
/* Forward decl of struct grpc_server */
/* This is not typedef'ed to avoid a typedef-redefinition error */
@@ -59,7 +60,7 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server,
grpc_pollset **pollsets, size_t pollset_count,
struct grpc_server *server);
-int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
+int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index);
/* Add a port to the server, returning port number on success, or negative
on failure.
@@ -71,8 +72,9 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
-int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb,
+int grpc_udp_server_add_port(grpc_udp_server *s,
+ const grpc_resolved_address *addr,
+ grpc_udp_server_read_cb read_cb,
grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c
index 0e7670e5a5..030acd9811 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.c
+++ b/src/core/lib/iomgr/unix_sockets_posix.c
@@ -30,16 +30,19 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
+#include "src/core/lib/iomgr/port.h"
-#include "src/core/lib/iomgr/unix_sockets_posix.h"
+#ifdef GRPC_HAVE_UNIX_SOCKET
-#ifdef GPR_HAVE_UNIX_SOCKET
+#include "src/core/lib/iomgr/sockaddr.h"
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
+#include "src/core/lib/iomgr/unix_sockets_posix.h"
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -61,15 +64,18 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name,
return GRPC_ERROR_NONE;
}
-int grpc_is_unix_socket(const struct sockaddr *addr) {
+int grpc_is_unix_socket(const grpc_resolved_address *resolved_addr) {
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
return addr->sa_family == AF_UNIX;
}
-void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) {
+void grpc_unlink_if_unix_domain_socket(
+ const grpc_resolved_address *resolved_addr) {
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
if (addr->sa_family != AF_UNIX) {
return;
}
- struct sockaddr_un *un = (struct sockaddr_un *)addr;
+ struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr;
struct stat st;
if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
@@ -77,7 +83,9 @@ void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) {
}
}
-char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr) {
+char *grpc_sockaddr_to_uri_unix_if_possible(
+ const grpc_resolved_address *resolved_addr) {
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
if (addr->sa_family != AF_UNIX) {
return NULL;
}
diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h
index db0516d945..21afd3aa15 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.h
+++ b/src/core/lib/iomgr/unix_sockets_posix.h
@@ -34,22 +34,23 @@
#ifndef GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H
#define GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
#include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/iomgr/sockaddr.h"
void grpc_create_socketpair_if_unix(int sv[2]);
grpc_error *grpc_resolve_unix_domain_address(
const char *name, grpc_resolved_addresses **addresses);
-int grpc_is_unix_socket(const struct sockaddr *addr);
+int grpc_is_unix_socket(const grpc_resolved_address *resolved_addr);
-void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr);
+void grpc_unlink_if_unix_domain_socket(
+ const grpc_resolved_address *resolved_addr);
-char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr);
+char *grpc_sockaddr_to_uri_unix_if_possible(
+ const grpc_resolved_address *resolved_addr);
#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */
diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.c
index 56b47c3daf..1daf5152c1 100644
--- a/src/core/lib/iomgr/unix_sockets_posix_noop.c
+++ b/src/core/lib/iomgr/unix_sockets_posix_noop.c
@@ -33,7 +33,7 @@
#include "src/core/lib/iomgr/unix_sockets_posix.h"
-#ifndef GPR_HAVE_UNIX_SOCKET
+#ifndef GRPC_HAVE_UNIX_SOCKET
#include <grpc/support/log.h>
@@ -50,11 +50,11 @@ grpc_error *grpc_resolve_unix_domain_address(
return GRPC_ERROR_CREATE("Unix domain sockets are not supported on Windows");
}
-int grpc_is_unix_socket(const struct sockaddr *addr) { return false; }
+int grpc_is_unix_socket(const grpc_resolved_address *addr) { return false; }
-void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) {}
+void grpc_unlink_if_unix_domain_socket(const grpc_resolved_address *addr) {}
-char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr) {
+char *grpc_sockaddr_to_uri_unix_if_possible(const grpc_resolved_address *addr) {
return NULL;
}
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c
index b4165208ed..da4c2870cd 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.c
+++ b/src/core/lib/iomgr/wakeup_fd_cv.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_WAKEUP_FD
+#ifdef GRPC_POSIX_WAKEUP_FD
#include "src/core/lib/iomgr/wakeup_fd_cv.h"
@@ -115,4 +115,4 @@ const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable = {
cv_fd_init, cv_fd_consume, cv_fd_wakeup, cv_fd_destroy,
cv_check_availability};
-#endif /* GPR_POSIX_WAKUP_FD */
+#endif /* GRPC_POSIX_WAKUP_FD */
diff --git a/src/core/lib/iomgr/wakeup_fd_eventfd.c b/src/core/lib/iomgr/wakeup_fd_eventfd.c
index 95f6102330..373e21d3e1 100644
--- a/src/core/lib/iomgr/wakeup_fd_eventfd.c
+++ b/src/core/lib/iomgr/wakeup_fd_eventfd.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_LINUX_EVENTFD
+#ifdef GRPC_LINUX_EVENTFD
#include <errno.h>
#include <sys/eventfd.h>
@@ -94,4 +94,4 @@ const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = {
eventfd_create, eventfd_consume, eventfd_wakeup, eventfd_destroy,
eventfd_check_availability};
-#endif /* GPR_LINUX_EVENTFD */
+#endif /* GRPC_LINUX_EVENTFD */
diff --git a/src/core/lib/iomgr/wakeup_fd_nospecial.c b/src/core/lib/iomgr/wakeup_fd_nospecial.c
index cb2f707dc5..611bced029 100644
--- a/src/core/lib/iomgr/wakeup_fd_nospecial.c
+++ b/src/core/lib/iomgr/wakeup_fd_nospecial.c
@@ -36,9 +36,9 @@
* systems without anything better than pipe.
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_NO_SPECIAL_WAKEUP_FD
+#ifdef GRPC_POSIX_NO_SPECIAL_WAKEUP_FD
#include <stddef.h>
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
@@ -48,4 +48,4 @@ static int check_availability_invalid(void) { return 0; }
const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = {
NULL, NULL, NULL, NULL, check_availability_invalid};
-#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */
+#endif /* GRPC_POSIX_NO_SPECIAL_WAKEUP_FD */
diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c
index d0ea216aa0..183f0eb930 100644
--- a/src/core/lib/iomgr/wakeup_fd_pipe.c
+++ b/src/core/lib/iomgr/wakeup_fd_pipe.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_WAKEUP_FD
+#ifdef GRPC_POSIX_WAKEUP_FD
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c
index 5c894bef37..85526402bd 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.c
+++ b/src/core/lib/iomgr/wakeup_fd_posix.c
@@ -31,9 +31,9 @@
*
*/
-#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
-#ifdef GPR_POSIX_WAKEUP_FD
+#ifdef GRPC_POSIX_WAKEUP_FD
#include <stddef.h>
#include "src/core/lib/iomgr/wakeup_fd_cv.h"
@@ -98,4 +98,4 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
}
}
-#endif /* GPR_POSIX_WAKEUP_FD */
+#endif /* GRPC_POSIX_WAKEUP_FD */
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 5b96d1d851..73d9849843 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -39,6 +39,7 @@
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/port.h"
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/workqueue_windows.h"
diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c
new file mode 100644
index 0000000000..e58ca476cc
--- /dev/null
+++ b/src/core/lib/iomgr/workqueue_uv.c
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include "src/core/lib/iomgr/workqueue.h"
+
+// Minimal implementation of grpc_workqueue for libuv
+// Works by directly enqueuing workqueue items onto the current execution
+// context, which is at least correct, if not performant or in the spirit of
+// workqueues.
+
+void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason) {
+ return workqueue;
+}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
+ return workqueue;
+}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+#endif
+
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
+#endif /* GPR_UV */
diff --git a/src/core/lib/iomgr/workqueue_uv.h b/src/core/lib/iomgr/workqueue_uv.h
new file mode 100644
index 0000000000..be3f8e4d93
--- /dev/null
+++ b/src/core/lib/iomgr/workqueue_uv.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H
+#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H
+
+#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H */