aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2016-09-16 13:25:08 -0700
committerGravatar murgatroid99 <mlumish@google.com>2016-09-16 13:25:08 -0700
commit9030c81f20c3721e7b0a92564f6e4d772a78ffcb (patch)
tree3bbe7fbaa32805f15d2153e7c9b814846ebc23c9 /src
parent6257159a3ae19f5caa3b44bcd201fc9183963cfe (diff)
Add a libuv endpoint to the C core, for use in the Node library
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_config/client_config_plugin.c5
-rw-r--r--src/core/ext/client_config/connector.h5
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c5
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c5
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c5
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c5
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c5
-rw-r--r--src/core/lib/http/httpcli.h5
-rw-r--r--src/core/lib/iomgr/iomgr.h2
-rw-r--r--src/core/lib/iomgr/iomgr_uv.c51
-rw-r--r--src/core/lib/iomgr/pollset_set_uv.c62
-rw-r--r--src/core/lib/iomgr/pollset_set_windows.c2
-rw-r--r--src/core/lib/iomgr/pollset_uv.c84
-rw-r--r--src/core/lib/iomgr/pollset_uv.h35
-rw-r--r--src/core/lib/iomgr/port.h13
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c234
-rw-r--r--src/core/lib/iomgr/sockaddr.h4
-rw-r--r--src/core/lib/iomgr/socket_utils.h6
-rw-r--r--src/core/lib/iomgr/socket_utils_uv.c50
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c142
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c358
-rw-r--r--src/core/lib/iomgr/tcp_uv.c336
-rw-r--r--src/core/lib/iomgr/tcp_uv.h57
-rw-r--r--src/core/lib/iomgr/timer.h17
-rw-r--r--src/core/lib/iomgr/timer_generic.c (renamed from src/core/lib/iomgr/timer.c)6
-rw-r--r--src/core/lib/iomgr/timer_generic.h49
-rw-r--r--src/core/lib/iomgr/timer_heap.c6
-rw-r--r--src/core/lib/iomgr/timer_uv.c103
-rw-r--r--src/core/lib/iomgr/timer_uv.h47
-rw-r--r--src/core/lib/iomgr/workqueue_uv.c62
-rw-r--r--src/core/lib/iomgr/workqueue_uv.h37
-rw-r--r--src/core/lib/security/context/security_context.c5
-rw-r--r--src/core/lib/security/context/security_context.h5
-rw-r--r--src/core/lib/security/credentials/credentials.h5
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c5
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c5
-rw-r--r--src/core/lib/surface/call.c6
-rw-r--r--src/core/lib/surface/init_secure.c5
-rw-r--r--src/core/lib/tsi/ssl_transport_security.c5
-rw-r--r--src/node/ext/call.cc73
-rw-r--r--src/node/ext/call.h68
-rw-r--r--src/node/ext/channel.cc8
-rw-r--r--src/node/ext/completion_queue.cc114
-rw-r--r--src/node/ext/completion_queue.h46
-rw-r--r--src/node/ext/completion_queue_async_worker.cc2
-rw-r--r--src/node/ext/node_grpc.cc19
-rw-r--r--src/node/ext/server.cc91
-rw-r--r--src/node/ext/server.h1
-rw-r--r--src/node/ext/server_credentials.cc8
-rw-r--r--src/node/index.js4
-rw-r--r--src/node/src/client.js1
-rw-r--r--src/node/src/grpc_extension.js2
-rw-r--r--src/node/test/async_test.js1
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py12
54 files changed, 2213 insertions, 81 deletions
diff --git a/src/core/ext/client_config/client_config_plugin.c b/src/core/ext/client_config/client_config_plugin.c
index 5e31613420..e065d06be9 100644
--- a/src/core/ext/client_config/client_config_plugin.c
+++ b/src/core/ext/client_config/client_config_plugin.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <limits.h>
#include <stdbool.h>
#include <string.h>
diff --git a/src/core/ext/client_config/connector.h b/src/core/ext/client_config/connector.h
index ea9d23706e..5cf8f14721 100644
--- a/src/core/ext/client_config/connector.h
+++ b/src/core/ext/client_config/connector.h
@@ -34,6 +34,11 @@
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/transport/transport.h"
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index af913d8a9d..a390daec01 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -96,6 +96,11 @@
* - Implement LB service forwarding (point 2c. in the doc's diagram).
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <string.h>
#include <grpc/byte_buffer_reader.h>
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9decf70692..61b637fe7d 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <string.h>
#include <grpc/support/alloc.h>
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7bcf608ab9..da53143948 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -59,6 +59,11 @@
* the subchannel by the caller.
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <string.h>
#include <grpc/support/alloc.h>
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 31ac968670..6a699804fe 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <string.h>
#include <grpc/support/alloc.h>
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index d613c5393e..46d185a984 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h
index 662e176f4c..eba6ecb9ab 100644
--- a/src/core/lib/http/httpcli.h
+++ b/src/core/lib/http/httpcli.h
@@ -34,6 +34,11 @@
#ifndef GRPC_CORE_LIB_HTTP_HTTPCLI_H
#define GRPC_CORE_LIB_HTTP_HTTPCLI_H
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <stddef.h>
#include <grpc/support/time.h>
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index 6c82de78ac..c1cfaf302e 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -34,6 +34,8 @@
#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_H
#define GRPC_CORE_LIB_IOMGR_IOMGR_H
+#include "src/core/lib/iomgr/port.h"
+
/** Initializes the iomgr. */
void grpc_iomgr_init(void);
diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c
new file mode 100644
index 0000000000..4c8acfbd96
--- /dev/null
+++ b/src/core/lib/iomgr/iomgr_uv.c
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/pollset_uv.h"
+#include "src/core/lib/iomgr/tcp_uv.h"
+
+void grpc_iomgr_platform_init(void) {
+ grpc_pollset_global_init();
+ grpc_register_tracer("tcp", &grpc_tcp_trace);
+}
+void grpc_iomgr_platform_flush(void) {}
+void grpc_iomgr_platform_shutdown(void) {
+ grpc_pollset_global_shutdown();
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/pollset_set_uv.c b/src/core/lib/iomgr/pollset_set_uv.c
new file mode 100644
index 0000000000..e5ef8b29e0
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_set_uv.c
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include "src/core/lib/iomgr/pollset_set.h"
+
+grpc_pollset_set* grpc_pollset_set_create(void) {
+ return (grpc_pollset_set*)((intptr_t)0xdeafbeef);
+}
+
+void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
+
+void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {}
+
+void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {}
+
+void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
+void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c
index 645650db9b..293893f18e 100644
--- a/src/core/lib/iomgr/pollset_set_windows.c
+++ b/src/core/lib/iomgr/pollset_set_windows.c
@@ -31,8 +31,8 @@
*
*/
-#include <stdint.h>
#include "src/core/lib/iomgr/port.h"
+#include <stdint.h>
#ifdef GRPC_WINSOCK_SOCKET
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
new file mode 100644
index 0000000000..2c41a58c30
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_uv.h"
+
+gpr_mu grpc_polling_mu;
+
+size_t grpc_pollset_size() {
+ return 1;
+}
+
+void grpc_pollset_global_init(void) {
+ gpr_mu_init(&grpc_polling_mu);
+}
+
+void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
+
+void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ *mu = &grpc_polling_mu;
+}
+
+void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+}
+
+void grpc_pollset_destroy(grpc_pollset *pollset) {}
+
+void grpc_pollset_reset(grpc_pollset *pollset) {}
+
+grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl,
+ gpr_timespec now, gpr_timespec deadline) {
+ if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
+ gpr_mu_unlock(&grpc_polling_mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&grpc_polling_mu);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker) {
+ return GRPC_ERROR_NONE;
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h
new file mode 100644
index 0000000000..5cbc83e991
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_uv.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+void grpc_pollset_global_init(void);
+void grpc_pollset_global_shutdown(void);
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index e6f01802ed..60bb16a423 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -36,7 +36,9 @@
#ifndef GRPC_CORE_LIB_IOMGR_PORT_H
#define GRPC_CORE_LIB_IOMGR_PORT_H
-#if defined(GPR_MANYLINUX1)
+#if defined(GRPC_UV)
+#include <uv.h>
+#elif defined(GPR_MANYLINUX1)
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
@@ -46,7 +48,9 @@
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_WINDOWS)
+#define GRPC_TIMER_USE_GENERIC 1
#define GRPC_WINSOCK_SOCKET 1
#define GRPC_WINDOWS_SOCKETUTILS 1
#elif defined(GPR_ANDROID)
@@ -59,6 +63,7 @@
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_LINUX)
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -68,6 +73,7 @@
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#ifdef __GLIBC_PREREQ
#if __GLIBC_PREREQ(2, 9)
#define GRPC_LINUX_EPOLL 1
@@ -93,6 +99,7 @@
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_FREEBSD)
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -103,18 +110,20 @@
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#elif !defined(GPR_NO_AUTODETECT_PLATFORM)
#error "Platform not recognized"
#endif
#if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \
- defined(GRPC_CUSTOM_SOCKET) != \
+ defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_UV) != \
1
#error Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GPR_CUSTOM_SOCKET
#endif
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
new file mode 100644
index 0000000000..550047b4d2
--- /dev/null
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -0,0 +1,234 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+#ifdef GRPC_UV
+
+#include <uv.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+
+#include <string.h>
+
+typedef struct request {
+ grpc_closure *on_done;
+ grpc_resolved_addresses **addresses;
+ struct addrinfo *hints;
+} request;
+
+static grpc_error *handle_addrinfo_result(int status,
+ struct addrinfo *result,
+ grpc_resolved_addresses **addresses) {
+
+ struct addrinfo *resp;
+ size_t i;
+ if (status != 0) {
+ grpc_error *error;
+ *addresses = NULL;
+ error = GRPC_ERROR_CREATE("getaddrinfo failed");
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(status));
+ return error;
+ }
+ (*addresses) = gpr_malloc(sizeof(grpc_resolved_addresses));
+ (*addresses)->naddrs = 0;
+ for (resp = result; resp != NULL; resp = resp->ai_next) {
+ (*addresses)->naddrs++;
+ }
+ (*addresses)->addrs =
+ gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs);
+ i = 0;
+ for (resp = result; resp != NULL; resp = resp->ai_next) {
+ memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
+ (*addresses)->addrs[i].len = resp->ai_addrlen;
+ i++;
+ }
+
+ {
+ for (i = 0; i < (*addresses)->naddrs; i++) {
+ char *buf;
+ grpc_sockaddr_to_string(
+ &buf, (struct sockaddr *)&(*addresses)->addrs[i].addr, 0);
+ gpr_free(buf);
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
+ struct addrinfo *res) {
+ request *r = (request *)req->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_error *error;
+ error = handle_addrinfo_result(status, res, r->addresses);
+ grpc_exec_ctx_sched(&exec_ctx, r->on_done, error, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
+
+ gpr_free(r->hints);
+ gpr_free(r);
+ gpr_free(req);
+ uv_freeaddrinfo(res);
+}
+
+static grpc_error *try_split_host_port(const char *name,
+ const char *default_port, char **host,
+ char **port) {
+ /* parse name, splitting it into host and port parts */
+ grpc_error *error;
+ gpr_split_host_port(name, host, port);
+ if (host == NULL) {
+ char *msg;
+ gpr_asprintf(&msg, "unparseable host:port: '%s'", name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return error;
+ }
+ if (port == NULL) {
+ if (default_port == NULL) {
+ char *msg;
+ gpr_asprintf(&msg, "no port in name '%s'", name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return error;
+ }
+ *port = gpr_strdup(default_port);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error *blocking_resolve_address_impl(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses) {
+ char *host;
+ char *port;
+ struct addrinfo hints;
+ uv_getaddrinfo_t req;
+ int s;
+ grpc_error *err;
+
+ err = try_split_host_port(name, default_port, &host, &port);
+ if (err != GRPC_ERROR_NONE) {
+ goto done;
+ }
+
+ /* Call getaddrinfo */
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
+ hints.ai_socktype = SOCK_STREAM; /* stream socket */
+ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+
+ s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
+ err = handle_addrinfo_result(s, req.addrinfo, addresses);
+
+done:
+ gpr_free(host);
+ gpr_free(port);
+ if (req.addrinfo) {
+ uv_freeaddrinfo(req.addrinfo);
+ }
+ return err;
+}
+
+
+
+grpc_error *(*grpc_blocking_resolve_address)(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses) = blocking_resolve_address_impl;
+
+void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
+ if (addrs != NULL) {
+ gpr_free(addrs->addrs);
+ }
+ gpr_free(addrs);
+}
+
+static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) {
+ uv_getaddrinfo_t *req;
+ request *r;
+ struct addrinfo *hints;
+ char *host;
+ char *port;
+ grpc_error *err;
+ int s;
+ err = try_split_host_port(name, default_port, &host, &port);
+ if (err != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ return;
+ }
+ r = gpr_malloc(sizeof(request));
+ r->on_done = on_done;
+ r->addresses = addrs;
+ req = gpr_malloc(sizeof(uv_getaddrinfo_t));
+ req->data = r;
+
+ /* Call getaddrinfo */
+ hints = gpr_malloc(sizeof(struct addrinfo));
+ memset(hints, 0, sizeof(struct addrinfo));
+ hints->ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
+ hints->ai_socktype = SOCK_STREAM; /* stream socket */
+ hints->ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ r->hints = hints;
+
+ s = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_callback, host, port,
+ hints);
+
+ if (s != 0) {
+ *addrs = NULL;
+ err = GRPC_ERROR_CREATE("getaddrinfo failed");
+ err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(s));
+ grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ gpr_free(r);
+ gpr_free(req);
+ gpr_free(hints);
+ }
+}
+
+void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) =
+ resolve_address_impl;
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/sockaddr.h b/src/core/lib/iomgr/sockaddr.h
index 58cae6cc01..418488a0ea 100644
--- a/src/core/lib/iomgr/sockaddr.h
+++ b/src/core/lib/iomgr/sockaddr.h
@@ -36,6 +36,10 @@
#include "src/core/lib/iomgr/port.h"
+#ifdef GRPC_UV
+#include <uv.h>
+#endif
+
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/sockaddr_windows.h"
#endif
diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h
index 6ce9a6859c..b01197ad18 100644
--- a/src/core/lib/iomgr/socket_utils.h
+++ b/src/core/lib/iomgr/socket_utils.h
@@ -36,10 +36,12 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_WINSOCK_SOCKET
+#if defined(GRPC_WINSOCK_SOCKET)
#include "src/core/lib/iomgr/sockaddr_windows.h"
-#else
+#elif defined(GRPC_POSIX_SOCKET)
#include "src/core/lib/iomgr/sockaddr_posix.h"
+#elif defined(GRPC_UV)
+#include <uv.h>
#endif
/* A wrapper for inet_ntop on POSIX systems and InetNtop on Windows systems */
diff --git a/src/core/lib/iomgr/socket_utils_uv.c b/src/core/lib/iomgr/socket_utils_uv.c
new file mode 100644
index 0000000000..7f61384e74
--- /dev/null
+++ b/src/core/lib/iomgr/socket_utils_uv.c
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <uv.h>
+
+#include "src/core/lib/iomgr/socket_utils.h"
+
+#include <grpc/support/log.h>
+
+const char *grpc_inet_ntop(int af, const void *src, char *dst, socklen_t size) {
+ GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t));
+ uv_inet_ntop(af, src, dst, (size_t)size);
+ return dst;
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
new file mode 100644
index 0000000000..7823a1d873
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_uv.h"
+#include "src/core/lib/iomgr/timer.h"
+
+typedef struct grpc_uv_tcp_connect {
+ uv_connect_t connect_req;
+ grpc_timer alarm;
+ uv_tcp_t *tcp_handle;
+ grpc_closure *closure;
+ grpc_endpoint **endpoint;
+ int refs;
+ char *addr_name;
+} grpc_uv_tcp_connect;
+
+static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) {
+ gpr_free(connect);
+}
+
+static void tcp_close_callback(uv_handle_t *handle) {
+ gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", handle);
+ gpr_free(handle);
+}
+
+static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
+ int done;
+ grpc_uv_tcp_connect *connect = acp;
+ if (error == GRPC_ERROR_NONE) {
+ /* error == NONE implies that the timer ran out, and wasn't cancelled. If
+ it was cancelled, then the handler that cancelled it also should close
+ the handle, if applicable */
+ gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", connect->tcp_handle);
+ uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback);
+ }
+ done = (--connect->refs == 0);
+ if (done) {
+ uv_tcp_connect_cleanup(connect);
+ }
+}
+
+static void uv_tc_on_connect(uv_connect_t *req, int status) {
+ grpc_uv_tcp_connect *connect = req->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_error *error = GRPC_ERROR_NONE;
+ int done;
+ grpc_closure *closure = connect->closure;
+ grpc_timer_cancel(&exec_ctx, &connect->alarm);
+ if (status == 0) {
+ *connect->endpoint = grpc_tcp_create(connect->tcp_handle,
+ connect->addr_name);
+ } else {
+ error = GRPC_ERROR_CREATE("Failed to connect to remote host");
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status);
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(status));
+ if (status == UV_ECANCELED) {
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ "Timeout occurred");
+ // This should only happen if the handle is already closed
+ } else {
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(status));
+ gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", connect->tcp_handle);
+ uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback);
+ }
+ }
+ done = (--connect->refs == 0);
+ if (done) {
+ uv_tcp_connect_cleanup(connect);
+ }
+ grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties,
+ const struct sockaddr *addr,
+ size_t addr_len, gpr_timespec deadline) {
+ grpc_uv_tcp_connect *connect;
+ (void)interested_parties;
+ connect = gpr_malloc(sizeof(grpc_uv_tcp_connect));
+ memset(connect, 0, sizeof(grpc_uv_tcp_connect));
+ connect->closure = closure;
+ connect->endpoint = ep;
+ connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t));
+ gpr_log(GPR_DEBUG, "Allocated uv_tcp_t handle %p", connect->tcp_handle);
+ connect->addr_name = grpc_sockaddr_to_uri(addr);
+ uv_tcp_init(uv_default_loop(), connect->tcp_handle);
+ connect->connect_req.data = connect;
+ // TODO(murgatroid99): figure out what the return value here means
+ uv_tcp_connect(&connect->connect_req, connect->tcp_handle, addr,
+ uv_tc_on_connect);
+ grpc_timer_init(exec_ctx, &connect->alarm,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ uv_tc_on_alarm, connect, gpr_now(GPR_CLOCK_MONOTONIC));
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
new file mode 100644
index 0000000000..b69e70881f
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -0,0 +1,358 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/iomgr/tcp_uv.h"
+
+/* one listening port */
+typedef struct grpc_tcp_listener grpc_tcp_listener;
+struct grpc_tcp_listener {
+ uv_tcp_t *handle;
+ grpc_tcp_server *server;
+ unsigned port_index;
+ int port;
+ /* linked list */
+ struct grpc_tcp_listener *next;
+};
+
+struct grpc_tcp_server {
+ gpr_refcount refs;
+
+ /* Called whenever accept() succeeds on a server port. */
+ grpc_tcp_server_cb on_accept_cb;
+ void *on_accept_cb_arg;
+
+ int open_ports;
+
+ /* linked list of server ports */
+ grpc_tcp_listener *head;
+ grpc_tcp_listener *tail;
+
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
+
+ /* shutdown callback */
+ grpc_closure *shutdown_complete;
+};
+
+grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
+ grpc_tcp_server **server) {
+ grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ (void)args;
+ gpr_ref_init(&s->refs, 1);
+ s->on_accept_cb = NULL;
+ s->on_accept_cb_arg = NULL;
+ s->open_ports = 0;
+ s->head = NULL;
+ s->tail = NULL;
+ s->shutdown_starting.head = NULL;
+ s->shutdown_starting.tail = NULL;
+ s->shutdown_complete = shutdown_complete;
+ *server = s;
+ return GRPC_ERROR_NONE;
+}
+
+grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
+ gpr_ref(&s->refs);
+ return s;
+}
+
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
+ grpc_closure *shutdown_starting) {
+ grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
+ GRPC_ERROR_NONE);
+}
+
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ }
+
+ while (s->head) {
+ grpc_tcp_listener *sp = s->head;
+ s->head = sp->next;
+ sp->next = NULL;
+ gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", sp->handle);
+ gpr_free(sp->handle);
+ gpr_free(sp);
+ }
+ gpr_free(s);
+}
+
+static void handle_close_callback(uv_handle_t *handle) {
+ grpc_tcp_listener *sp = (grpc_tcp_listener *)handle->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ sp->server->open_ports--;
+ if (sp->server->open_ports == 0) {
+ finish_shutdown(&exec_ctx, sp->server);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ int immediately_done = 0;
+ grpc_tcp_listener *sp;
+
+ if (s->open_ports == 0) {
+ immediately_done = 1;
+ }
+ for (sp = s->head; sp; sp = sp->next){
+ gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", sp->handle);
+ uv_close((uv_handle_t *)sp->handle, handle_close_callback);
+ }
+
+ if (immediately_done) {
+ finish_shutdown(exec_ctx, s);
+ }
+}
+
+void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ if (gpr_unref(&s->refs)) {
+ /* Complete shutdown_starting work before destroying. */
+ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
+ if (exec_ctx == NULL) {
+ grpc_exec_ctx_flush(&local_exec_ctx);
+ tcp_server_destroy(&local_exec_ctx, s);
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ } else {
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ tcp_server_destroy(exec_ctx, s);
+ }
+ }
+}
+
+static void on_connect(uv_stream_t *server, int status) {
+ grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
+ grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
+ uv_tcp_t *client;
+ grpc_endpoint *ep = NULL;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ struct sockaddr_storage peer_name;
+ int peer_name_len = sizeof(peer_name);
+ char *peer_name_string;
+ int err;
+
+ gpr_log(GPR_DEBUG, "Server %p received a connection", sp->server);
+
+ if (status < 0) {
+ gpr_log(GPR_INFO, "Skipping on_accept due to error: %s",
+ uv_strerror(status));
+ return;
+ }
+ client = gpr_malloc(sizeof(uv_tcp_t));
+ gpr_log(GPR_DEBUG, "Allocated uv_tcp_t handle %p", client);
+ uv_tcp_init(uv_default_loop(), client);
+ // UV documentation says this is guaranteed to succeed
+ uv_accept((uv_stream_t *)server, (uv_stream_t *)client);
+ peer_name_string = NULL;
+ err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name,
+ &peer_name_len);
+ if (err == 0) {
+ peer_name_string = grpc_sockaddr_to_uri((struct sockaddr *)&peer_name);
+ } else {
+ gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s",
+ uv_strerror(status));
+ }
+ ep = grpc_tcp_create(client, peer_name_string);
+ gpr_log(GPR_DEBUG, "Calling on_accept_cb for server %p", sp->server);
+ sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
+ &acceptor);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static grpc_error *add_socket_to_server(grpc_tcp_server *s,
+ uv_tcp_t *handle,
+ struct sockaddr *addr,
+ size_t addr_len, unsigned port_index,
+ grpc_tcp_listener **listener) {
+ grpc_tcp_listener *sp = NULL;
+ int port = -1;
+ int status;
+ grpc_error *error;
+ struct sockaddr_storage sockname_temp;
+ int sockname_len;
+
+ // The last argument to uv_tcp_bind is flags
+ status = uv_tcp_bind(handle, addr, 0);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE("Failed to bind to port");
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(status));
+ return error;
+ }
+
+ sockname_len = (int)sizeof(sockname_temp);
+ status = uv_tcp_getsockname(handle, (struct sockaddr *)&sockname_temp,
+ &sockname_len);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE("getsockname failed");
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(status));
+ return error;
+ }
+
+ port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+
+ GPR_ASSERT(port >= 0);
+ GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
+ sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
+ sp->server = s;
+ sp->handle = handle;
+ sp->port = port;
+ sp->port_index = port_index;
+ handle->data = sp;
+ s->open_ports++;
+ GPR_ASSERT(sp->handle);
+ *listener = sp;
+
+ return GRPC_ERROR_NONE;
+}
+
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len, int *port) {
+ // This function is mostly copied from tcp_server_windows.c
+ grpc_tcp_listener *sp = NULL;
+ uv_tcp_t *handle;
+ struct sockaddr_in6 addr6_v4mapped;
+ struct sockaddr_in6 wildcard;
+ struct sockaddr *allocated_addr = NULL;
+ struct sockaddr_storage sockname_temp;
+ int sockname_len;
+ unsigned port_index = 0;
+ int status;
+ grpc_error *error = GRPC_ERROR_NONE;
+
+ if (s->tail != NULL) {
+ port_index = s->tail->port_index + 1;
+ }
+
+ /* Check if this is a wildcard port, and if so, try to keep the port the same
+ as some previously created listener. */
+ if (grpc_sockaddr_get_port(addr) == 0) {
+ for (sp = s->head; sp; sp = sp->next) {
+ sockname_len = sizeof(sockname_temp);
+ if (0 == uv_tcp_getsockname(sp->handle, (struct sockaddr *)&sockname_temp,
+ &sockname_len)) {
+ *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ if (*port > 0) {
+ allocated_addr = gpr_malloc(addr_len);
+ memcpy(allocated_addr, addr, addr_len);
+ grpc_sockaddr_set_port(allocated_addr, *port);
+ addr = allocated_addr;
+ break;
+ }
+ }
+ }
+ }
+
+ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
+ addr = (const struct sockaddr *)&addr6_v4mapped;
+ addr_len = sizeof(addr6_v4mapped);
+ }
+
+ /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
+ if (grpc_sockaddr_is_wildcard(addr, port)) {
+ grpc_sockaddr_make_wildcard6(*port, &wildcard);
+
+ addr = (struct sockaddr *)&wildcard;
+ addr_len = sizeof(wildcard);
+ }
+
+ handle = gpr_malloc(sizeof(uv_tcp_t));
+ gpr_log(GPR_DEBUG, "Allocating uv_tcp_t handle %p", handle);
+ status = uv_tcp_init(uv_default_loop(), handle);
+ if (status == 0) {
+ error = add_socket_to_server(s, handle, (struct sockaddr *)addr, addr_len,
+ port_index, &sp);
+ } else {
+ error = GRPC_ERROR_CREATE("Failed to initialize UV tcp handle");
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(status));
+ }
+
+ gpr_free(allocated_addr);
+
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING(
+ "Failed to add port to server", &error, 1);
+ GRPC_ERROR_UNREF(error);
+ error = error_out;
+ *port = -1;
+ } else {
+ GPR_ASSERT(sp != NULL);
+ *port = sp->port;
+ }
+ return error;
+}
+
+void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
+ grpc_pollset **pollsets, size_t pollset_count,
+ grpc_tcp_server_cb on_accept_cb, void *cb_arg) {
+ grpc_tcp_listener *sp;
+ (void)pollsets;
+ (void)pollset_count;
+ GPR_ASSERT(on_accept_cb);
+ GPR_ASSERT(!server->on_accept_cb);
+ server->on_accept_cb = on_accept_cb;
+ server->on_accept_cb_arg = cb_arg;
+ for(sp = server->head; sp; sp = sp->next) {
+ GPR_ASSERT(uv_listen((uv_stream_t *) sp->handle, SOMAXCONN, on_connect) == 0);
+ }
+}
+
+void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {}
+
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
new file mode 100644
index 0000000000..fa198fd8e1
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -0,0 +1,336 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/network_status_tracker.h"
+#include "src/core/lib/iomgr/tcp_uv.h"
+#include "src/core/lib/support/string.h"
+
+int grpc_tcp_trace = 0;
+
+typedef struct {
+ grpc_endpoint base;
+ gpr_refcount refcount;
+
+ uv_tcp_t *handle;
+
+ grpc_closure *read_cb;
+ grpc_closure *write_cb;
+
+ gpr_slice read_slice;
+ gpr_slice_buffer *read_slices;
+ gpr_slice_buffer *write_slices;
+ uv_buf_t *write_buffers;
+
+ int shutting_down;
+ char *peer_string;
+ grpc_pollset *pollset;
+} grpc_tcp;
+
+static void uv_close_callback(uv_handle_t *handle) {
+ gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", handle);
+ gpr_free(handle);
+}
+
+static void tcp_free(grpc_tcp *tcp) {
+ gpr_free(tcp);
+}
+
+/*#define GRPC_TCP_REFCOUNT_DEBUG*/
+#ifdef GRPC_TCP_REFCOUNT_DEBUG
+#define TCP_UNREF(tcp, reason) \
+ tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count - 1);
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
+ }
+}
+
+static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count + 1);
+ gpr_ref(&tcp->refcount);
+}
+#else
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_REF(tcp, reason) tcp_ref((tcp))
+static void tcp_unref(grpc_tcp *tcp) {
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
+ }
+}
+
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
+#endif
+
+static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
+ grpc_tcp *tcp = handle->data;
+ (void)suggested_size;
+ tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
+ buf->len = GPR_SLICE_LENGTH(tcp->read_slice);
+}
+
+static void read_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
+ gpr_slice sub;
+ grpc_error *error;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_tcp *tcp = stream->data;
+ grpc_closure *cb = tcp->read_cb;
+ if (nread == 0) {
+ // Nothing happened. Wait for the next callback
+ return;
+ }
+ TCP_UNREF(tcp, "read");
+ tcp->read_cb = NULL;
+ // TODO(murgatroid99): figure out what the return value here means
+ uv_read_stop(stream);
+ if (nread == UV_EOF) {
+ error = GRPC_ERROR_CREATE("EOF");
+ } else if (nread > 0) {
+ // Successful read
+ sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, nread);
+ gpr_slice_buffer_add(tcp->read_slices, sub);
+ error = GRPC_ERROR_NONE;
+ if (grpc_tcp_trace) {
+ size_t i;
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+ grpc_error_free_string(str);
+ for (i = 0; i < tcp->read_slices->count; i++) {
+ char *dump = gpr_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
+ gpr_free(dump);
+ }
+ }
+ } else {
+ // nread < 0: Error
+ error = GRPC_ERROR_CREATE("TCP Read failed");
+ }
+ grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ gpr_slice_buffer *read_slices, grpc_closure *cb) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ int status;
+ grpc_error *error = GRPC_ERROR_NONE;
+ GPR_ASSERT(tcp->read_cb == NULL);
+ tcp->read_cb = cb;
+ tcp->read_slices = read_slices;
+ gpr_slice_buffer_reset_and_unref(read_slices);
+ TCP_REF(tcp, "read");
+ // TODO(murgatroid99): figure out what the return value here means
+ status = uv_read_start((uv_stream_t *)tcp->handle, alloc_uv_buf, read_callback);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE("TCP Read failed at start");
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ uv_strerror(status));
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ }
+ if (grpc_tcp_trace) {
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str);
+ }
+}
+
+static void write_callback(uv_write_t *req, int status) {
+ grpc_tcp *tcp = req->data;
+ grpc_error *error;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_closure *cb = tcp->write_cb;
+ tcp->write_cb = NULL;
+ TCP_UNREF(tcp, "write");
+ if (status == 0) {
+ error = GRPC_ERROR_NONE;
+ } else {
+ error = GRPC_ERROR_CREATE("TCP Write failed");
+ }
+ if (grpc_tcp_trace) {
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
+ }
+ gpr_free(tcp->write_buffers);
+ gpr_free(req);
+ grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ gpr_slice_buffer *write_slices,
+ grpc_closure *cb) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ uv_buf_t *buffers;
+ unsigned int buffer_count;
+ unsigned int i;
+ gpr_slice *slice;
+ uv_write_t *write_req;
+
+ if (grpc_tcp_trace) {
+ size_t i;
+
+ for (i = 0; i < write_slices->count; i++) {
+ char *data =
+ gpr_dump_slice(write_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
+ gpr_free(data);
+ }
+ }
+
+ if (tcp->shutting_down) {
+ grpc_exec_ctx_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ return;
+ }
+
+ GPR_ASSERT(tcp->write_cb == NULL);
+ tcp->write_slices = write_slices;
+ GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
+ if (tcp->write_slices->count == 0) {
+ // No slices means we don't have to do anything,
+ // and libuv doesn't like empty writes
+ grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL);
+ return;
+ }
+
+ tcp->write_cb = cb;
+ buffer_count = (unsigned int)tcp->write_slices->count;
+ buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
+ for (i = 0; i < buffer_count; i++) {
+ slice = &tcp->write_slices->slices[i];
+ buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice);
+ buffers[i].len = GPR_SLICE_LENGTH(*slice);
+ }
+ write_req = gpr_malloc(sizeof(uv_write_t));
+ write_req->data = tcp;
+ TCP_REF(tcp, "write");
+ // TODO(murgatroid99): figure out what the return value here means
+ uv_write(write_req, (uv_stream_t *)tcp->handle, buffers, buffer_count,
+ write_callback);
+}
+
+static void uv_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_pollset *pollset) {
+ // No-op. We're ignoring pollsets currently
+ (void) exec_ctx;
+ (void) ep;
+ (void) pollset;
+ grpc_tcp *tcp = (grpc_tcp *) ep;
+ tcp->pollset = pollset;
+}
+
+static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_pollset_set *pollset) {
+ // No-op. We're ignoring pollsets currently
+ (void) exec_ctx;
+ (void) ep;
+ (void) pollset;
+}
+
+static void shutdown_callback(uv_shutdown_t *req, int status) {
+ gpr_free(req);
+}
+
+static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t));
+ uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
+}
+
+static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ grpc_network_status_unregister_endpoint(ep);
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", tcp->handle);
+ uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
+ TCP_UNREF(tcp, "destroy");
+}
+
+static char *uv_get_peer(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return gpr_strdup(tcp->peer_string);
+}
+
+static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) {return NULL; }
+
+static grpc_endpoint_vtable vtable = {uv_endpoint_read,
+ uv_endpoint_write,
+ uv_get_workqueue,
+ uv_add_to_pollset,
+ uv_add_to_pollset_set,
+ uv_endpoint_shutdown,
+ uv_destroy,
+ uv_get_peer};
+
+grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
+ grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
+
+ if (grpc_tcp_trace) {
+ gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
+ }
+
+ memset(tcp, 0, sizeof(grpc_tcp));
+ tcp->base.vtable = &vtable;
+ tcp->handle = handle;
+ handle->data = tcp;
+ gpr_ref_init(&tcp->refcount, 1);
+ tcp->peer_string = gpr_strdup(peer_string);
+ /* Tell network status tracking code about the new endpoint */
+ grpc_network_status_register_endpoint(&tcp->base);
+
+#ifndef GRPC_UV_TCP_HOLD_LOOP
+ uv_unref((uv_handle_t *)handle);
+#endif
+
+ return &tcp->base;
+}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h
new file mode 100644
index 0000000000..eed41151ea
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_uv.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_UV_H
+#define GRPC_CORE_LIB_IOMGR_TCP_UV_H
+/*
+ Low level TCP "bottom half" implementation, for use by transports built on
+ top of a TCP connection.
+
+ Note that this file does not (yet) include APIs for creating the socket in
+ the first place.
+
+ All calls passing slice transfer ownership of a slice refcount unless
+ otherwise specified.
+*/
+
+#include "src/core/lib/iomgr/endpoint.h"
+
+#include <uv.h>
+
+extern int grpc_tcp_trace;
+
+#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
+
+grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string);
+
+#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index a825d2a28b..b76ba079c4 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -34,19 +34,20 @@
#ifndef GRPC_CORE_LIB_IOMGR_TIMER_H
#define GRPC_CORE_LIB_IOMGR_TIMER_H
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+#include "src/core/lib/iomgr/timer_uv.h"
+#else
+#include "src/core/lib/iomgr/timer_generic.h"
+#endif /* GRPC_UV */
+
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
-typedef struct grpc_timer {
- gpr_timespec deadline;
- uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
- int triggered;
- struct grpc_timer *next;
- struct grpc_timer *prev;
- grpc_closure closure;
-} grpc_timer;
+typedef struct grpc_timer grpc_timer;
/* Initialize *timer. When expired or canceled, timer_cb will be called with
*timer_cb_arg and status to indicate if it expired (SUCCESS) or was
diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer_generic.c
index 9975fa1671..00058f9d86 100644
--- a/src/core/lib/iomgr/timer.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -31,6 +31,10 @@
*
*/
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_TIMER_USE_GENERIC
+
#include "src/core/lib/iomgr/timer.h"
#include <grpc/support/log.h>
@@ -382,3 +386,5 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("Shutting down timer system"));
}
+
+#endif /* GRPC_TIMER_USE_GENERIC */
diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h
new file mode 100644
index 0000000000..e4494adb5f
--- /dev/null
+++ b/src/core/lib/iomgr/timer_generic.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H
+#define GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H
+
+#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+struct grpc_timer {
+ gpr_timespec deadline;
+ uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
+ int triggered;
+ struct grpc_timer *next;
+ struct grpc_timer *prev;
+ grpc_closure closure;
+};
+
+#endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */
diff --git a/src/core/lib/iomgr/timer_heap.c b/src/core/lib/iomgr/timer_heap.c
index 2ad9bb9cd2..f736d335e6 100644
--- a/src/core/lib/iomgr/timer_heap.c
+++ b/src/core/lib/iomgr/timer_heap.c
@@ -31,6 +31,10 @@
*
*/
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_TIMER_USE_GENERIC
+
#include "src/core/lib/iomgr/timer_heap.h"
#include <string.h>
@@ -144,3 +148,5 @@ grpc_timer *grpc_timer_heap_top(grpc_timer_heap *heap) {
void grpc_timer_heap_pop(grpc_timer_heap *heap) {
grpc_timer_heap_remove(heap, grpc_timer_heap_top(heap));
}
+
+#endif /* GRPC_TIMER_USE_GENERIC */
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
new file mode 100644
index 0000000000..ffeb08cb79
--- /dev/null
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#if GRPC_UV
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/timer.h"
+
+#include <uv.h>
+
+static void timer_close_callback(uv_handle_t *handle) {
+ gpr_free(handle);
+}
+
+static void stop_uv_timer(uv_timer_t *handle) {
+ uv_timer_stop(handle);
+ uv_unref((uv_handle_t*) handle);
+ gpr_log(GPR_DEBUG, "Closing uv_timer_t handle %p", handle);
+ uv_close((uv_handle_t*) handle, timer_close_callback);
+}
+
+void run_expired_timer(uv_timer_t *handle) {
+ grpc_timer *timer = (grpc_timer*)handle->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_log(GPR_DEBUG, "Timer callback: %p", timer);
+ GPR_ASSERT(!timer->triggered);
+ timer->triggered = 1;
+ grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ stop_uv_timer(handle);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
+ gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
+ void *timer_cb_arg, gpr_timespec now) {
+ uint64_t timeout;
+ grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
+ if (gpr_time_cmp(deadline, now) <= 0) {
+ timer->triggered = 1;
+ grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ return;
+ }
+ timer->triggered = 0;
+ timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now));
+ gpr_log(GPR_DEBUG, "Setting timer %p: %lu", timer, timeout);
+ timer->uv_timer = gpr_malloc(sizeof(uv_timer_t));
+ uv_timer_init(uv_default_loop(), timer->uv_timer);
+ timer->uv_timer->data = timer;
+ uv_timer_start(timer->uv_timer, run_expired_timer, timeout, 0);
+}
+
+void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
+ if (!timer->triggered) {
+ gpr_log(GPR_DEBUG, "Running cancelled timer callback");
+ timer->triggered = 1;
+ grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
+ stop_uv_timer(timer->uv_timer);
+ }
+}
+
+bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
+ gpr_timespec *next) {
+ return false;
+}
+
+void grpc_timer_list_init(gpr_timespec now) {}
+void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {}
+
+#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h
new file mode 100644
index 0000000000..d78d6a6ca2
--- /dev/null
+++ b/src/core/lib/iomgr/timer_uv.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H
+#define GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H
+
+#include <uv.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+struct grpc_timer {
+ grpc_closure closure;
+ uv_timer_t *uv_timer;
+ int triggered;
+};
+
+#endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */
diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c
new file mode 100644
index 0000000000..5e4eb504ac
--- /dev/null
+++ b/src/core/lib/iomgr/workqueue_uv.c
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include "src/core/lib/iomgr/workqueue.h"
+
+// Minimal implementation of grpc_workqueue for libuv
+// Works by directly enqueuing workqueue items onto the current execution
+// context, which is at least correct, if not performant or in the spirit of
+// workqueues.
+
+void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
+ const char *reason) {}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+void grpc_workqueue_ref(grpc_workqueue *workqueue) {}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+#endif
+
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
+#endif /* GPR_UV */
diff --git a/src/core/lib/iomgr/workqueue_uv.h b/src/core/lib/iomgr/workqueue_uv.h
new file mode 100644
index 0000000000..be3f8e4d93
--- /dev/null
+++ b/src/core/lib/iomgr/workqueue_uv.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H
+#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H
+
+#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H */
diff --git a/src/core/lib/security/context/security_context.c b/src/core/lib/security/context/security_context.c
index 127b13ee50..874cc05455 100644
--- a/src/core/lib/security/context/security_context.c
+++ b/src/core/lib/security/context/security_context.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <string.h>
#include "src/core/lib/security/context/security_context.h"
diff --git a/src/core/lib/security/context/security_context.h b/src/core/lib/security/context/security_context.h
index 4e7666dfe3..b48d1d0ab7 100644
--- a/src/core/lib/security/context/security_context.h
+++ b/src/core/lib/security/context/security_context.h
@@ -34,6 +34,11 @@
#ifndef GRPC_CORE_LIB_SECURITY_CONTEXT_SECURITY_CONTEXT_H
#define GRPC_CORE_LIB_SECURITY_CONTEXT_SECURITY_CONTEXT_H
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/security/credentials/credentials.h"
diff --git a/src/core/lib/security/credentials/credentials.h b/src/core/lib/security/credentials/credentials.h
index 8e9d842ead..bfdfcb6761 100644
--- a/src/core/lib/security/credentials/credentials.h
+++ b/src/core/lib/security/credentials/credentials.h
@@ -34,6 +34,11 @@
#ifndef GRPC_CORE_LIB_SECURITY_CREDENTIALS_CREDENTIALS_H
#define GRPC_CORE_LIB_SECURITY_CREDENTIALS_CREDENTIALS_H
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/sync.h>
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index c22ea5c468..e9638c5a22 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include "src/core/lib/security/credentials/oauth2/oauth2_credentials.h"
#include <string.h>
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index def16c8229..3339c0a3be 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <string.h>
#include "src/core/lib/security/context/security_context.h"
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 772681109a..6a38b4b427 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -30,6 +30,12 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
+
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include <assert.h>
#include <limits.h>
#include <stdio.h>
diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c
index 7ee7b51568..9a9342ff2c 100644
--- a/src/core/lib/surface/init_secure.c
+++ b/src/core/lib/surface/init_secure.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include "src/core/lib/surface/init.h"
#include <limits.h>
diff --git a/src/core/lib/tsi/ssl_transport_security.c b/src/core/lib/tsi/ssl_transport_security.c
index f6e8c518e3..116b66a179 100644
--- a/src/core/lib/tsi/ssl_transport_security.c
+++ b/src/core/lib/tsi/ssl_transport_security.c
@@ -31,6 +31,11 @@
*
*/
+/* We currently need this at the top of the file if we import some iomgr
+ headers because if we are building with libuv, those headers will include
+ uv.h, which needs to be included before other system headers */
+#include "src/core/lib/iomgr/port.h"
+
#include "src/core/lib/tsi/ssl_transport_security.h"
#include "src/core/lib/iomgr/socket_utils.h"
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 9f023b5883..b48a7bd698 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -45,6 +45,7 @@
#include "byte_buffer.h"
#include "call.h"
#include "channel.h"
+#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "call_credentials.h"
#include "timeval.h"
@@ -222,6 +223,9 @@ class SendMetadataOp : public Op {
out->data.send_initial_metadata.metadata = array.metadata;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "send_metadata";
@@ -263,6 +267,9 @@ class SendMessageOp : public Op {
resources->handles.push_back(unique_ptr<PersistentValue>(handle));
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "send_message";
@@ -281,6 +288,9 @@ class SendClientCloseOp : public Op {
shared_ptr<Resources> resources) {
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "client_close";
@@ -341,6 +351,9 @@ class SendServerStatusOp : public Op {
out->data.send_status_from_server.status_details = **str;
return true;
}
+ bool IsFinalOp() {
+ return true;
+ }
protected:
std::string GetTypeString() const {
return "send_status";
@@ -367,6 +380,9 @@ class GetMetadataOp : public Op {
out->data.recv_initial_metadata = &recv_metadata;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -397,6 +413,9 @@ class ReadMessageOp : public Op {
out->data.recv_message = &recv_message;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -442,6 +461,9 @@ class ClientStatusOp : public Op {
ParseMetadata(&metadata_array));
return scope.Escape(status_obj);
}
+ bool IsFinalOp() {
+ return true;
+ }
protected:
std::string GetTypeString() const {
return "status";
@@ -465,6 +487,9 @@ class ServerCloseResponseOp : public Op {
out->data.recv_close_on_server.cancelled = &cancelled;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -476,8 +501,8 @@ class ServerCloseResponseOp : public Op {
};
tag::tag(Callback *callback, OpVec *ops,
- shared_ptr<Resources> resources) :
- callback(callback), ops(ops), resources(resources){
+ shared_ptr<Resources> resources, Call *call) :
+ callback(callback), ops(ops), resources(resources), call(call){
}
tag::~tag() {
@@ -502,16 +527,36 @@ Callback *GetTagCallback(void *tag) {
return tag_struct->callback;
}
+void CompleteTag(void *tag) {
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ bool is_final_op = false;
+ if (tag_struct->call == NULL) {
+ return;
+ }
+ for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
+ it != tag_struct->ops->end(); ++it) {
+ Op *op_ptr = it->get();
+ if (op_ptr->IsFinalOp()) {
+ is_final_op = true;
+ }
+ }
+ tag_struct->call->CompleteBatch(is_final_op);
+}
+
void DestroyTag(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
delete tag_struct;
}
-Call::Call(grpc_call *call) : wrapped_call(call) {
+Call::Call(grpc_call *call) : wrapped_call(call),
+ pending_batches(0),
+ has_final_op_completed(false) {
}
Call::~Call() {
- grpc_call_destroy(wrapped_call);
+ if (wrapped_call != NULL) {
+ grpc_call_destroy(wrapped_call);
+ }
}
void Call::Init(Local<Object> exports) {
@@ -552,6 +597,17 @@ Local<Value> Call::WrapStruct(grpc_call *call) {
}
}
+void Call::CompleteBatch(bool is_final_op) {
+ if (is_final_op) {
+ this->has_final_op_completed = true;
+ }
+ this->pending_batches--;
+ if (this->has_final_op_completed && this->pending_batches == 0) {
+ grpc_call_destroy(this->wrapped_call);
+ this->wrapped_call = NULL;
+ }
+}
+
NAN_METHOD(Call::New) {
if (info.IsConstructCall()) {
Call *call;
@@ -602,12 +658,12 @@ NAN_METHOD(Call::New) {
Utf8String host_override(info[3]);
wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags,
- CompletionQueueAsyncWorker::GetQueue(), *method,
+ GetCompletionQueue(), *method,
*host_override, MillisecondsToTimespec(deadline), NULL);
} else if (info[3]->IsUndefined() || info[3]->IsNull()) {
wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags,
- CompletionQueueAsyncWorker::GetQueue(), *method,
+ GetCompletionQueue(), *method,
NULL, MillisecondsToTimespec(deadline), NULL);
} else {
return Nan::ThrowTypeError("Call's fourth argument must be a string");
@@ -697,11 +753,12 @@ NAN_METHOD(Call::StartBatch) {
Callback *callback = new Callback(callback_func);
grpc_call_error error = grpc_call_start_batch(
call->wrapped_call, &ops[0], nops, new struct tag(
- callback, op_vector.release(), resources), NULL);
+ callback, op_vector.release(), resources, call), NULL);
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("startBatch failed", error));
}
- CompletionQueueAsyncWorker::Next();
+ call->pending_batches++;
+ CompletionQueueNext();
}
NAN_METHOD(Call::Cancel) {
diff --git a/src/node/ext/call.h b/src/node/ext/call.h
index 1e3c3ba18d..31c6566d14 100644
--- a/src/node/ext/call.h
+++ b/src/node/ext/call.h
@@ -66,34 +66,6 @@ bool CreateMetadataArray(v8::Local<v8::Object> metadata,
grpc_metadata_array *array,
shared_ptr<Resources> resources);
-class Op {
- public:
- virtual v8::Local<v8::Value> GetNodeValue() const = 0;
- virtual bool ParseOp(v8::Local<v8::Value> value, grpc_op *out,
- shared_ptr<Resources> resources) = 0;
- virtual ~Op();
- v8::Local<v8::Value> GetOpType() const;
-
- protected:
- virtual std::string GetTypeString() const = 0;
-};
-
-typedef std::vector<unique_ptr<Op>> OpVec;
-struct tag {
- tag(Nan::Callback *callback, OpVec *ops,
- shared_ptr<Resources> resources);
- ~tag();
- Nan::Callback *callback;
- OpVec *ops;
- shared_ptr<Resources> resources;
-};
-
-v8::Local<v8::Value> GetTagNodeValue(void *tag);
-
-Nan::Callback *GetTagCallback(void *tag);
-
-void DestroyTag(void *tag);
-
/* Wrapper class for grpc_call structs. */
class Call : public Nan::ObjectWrap {
public:
@@ -102,6 +74,8 @@ class Call : public Nan::ObjectWrap {
/* Wrap a grpc_call struct in a javascript object */
static v8::Local<v8::Value> WrapStruct(grpc_call *call);
+ void CompleteBatch(bool is_final_op);
+
private:
explicit Call(grpc_call *call);
~Call();
@@ -121,8 +95,46 @@ class Call : public Nan::ObjectWrap {
static Nan::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_call *wrapped_call;
+ // The number of ops that were started but not completed on this call
+ int pending_batches;
+ /* Indicates whether the "final" op on a call has completed. For a client
+ call, this is GRPC_OP_RECV_STATUS_ON_CLIENT and for a server call, this
+ is GRPC_OP_SEND_STATUS_FROM_SERVER */
+ bool has_final_op_completed;
};
+class Op {
+ public:
+ virtual v8::Local<v8::Value> GetNodeValue() const = 0;
+ virtual bool ParseOp(v8::Local<v8::Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) = 0;
+ virtual ~Op();
+ v8::Local<v8::Value> GetOpType() const;
+ virtual bool IsFinalOp() = 0;
+
+ protected:
+ virtual std::string GetTypeString() const = 0;
+};
+
+typedef std::vector<unique_ptr<Op>> OpVec;
+struct tag {
+ tag(Nan::Callback *callback, OpVec *ops,
+ shared_ptr<Resources> resources, Call *call);
+ ~tag();
+ Nan::Callback *callback;
+ OpVec *ops;
+ shared_ptr<Resources> resources;
+ Call *call;
+};
+
+v8::Local<v8::Value> GetTagNodeValue(void *tag);
+
+Nan::Callback *GetTagCallback(void *tag);
+
+void DestroyTag(void *tag);
+
+void CompleteTag(void *tag);
+
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc
index 00fcca6dc8..c4028170e7 100644
--- a/src/node/ext/channel.cc
+++ b/src/node/ext/channel.cc
@@ -41,6 +41,7 @@
#include "grpc/grpc_security.h"
#include "call.h"
#include "channel.h"
+#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "channel_credentials.h"
#include "timeval.h"
@@ -140,6 +141,7 @@ void DeallocateChannelArgs(grpc_channel_args *channel_args) {
Channel::Channel(grpc_channel *channel) : wrapped_channel(channel) {}
Channel::~Channel() {
+ gpr_log(GPR_DEBUG, "Destroying channel");
if (wrapped_channel != NULL) {
grpc_channel_destroy(wrapped_channel);
}
@@ -276,11 +278,11 @@ NAN_METHOD(Channel::WatchConnectivityState) {
unique_ptr<OpVec> ops(new OpVec());
grpc_channel_watch_connectivity_state(
channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline),
- CompletionQueueAsyncWorker::GetQueue(),
+ GetCompletionQueue(),
new struct tag(callback,
ops.release(),
- shared_ptr<Resources>(nullptr)));
- CompletionQueueAsyncWorker::Next();
+ shared_ptr<Resources>(nullptr), NULL));
+ CompletionQueueNext();
}
} // namespace node
diff --git a/src/node/ext/completion_queue.cc b/src/node/ext/completion_queue.cc
new file mode 100644
index 0000000000..fcfa77b39c
--- /dev/null
+++ b/src/node/ext/completion_queue.cc
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <uv.h>
+#include <node.h>
+#include <v8.h>
+#include <grpc/grpc.h>
+
+#include "call.h"
+#include "completion_queue.h"
+#include "completion_queue_async_worker.h"
+
+namespace grpc {
+namespace node {
+
+using v8::Local;
+using v8::Object;
+using v8::Value;
+
+grpc_completion_queue *queue;
+uv_prepare_t prepare;
+int pending_batches;
+
+void drain_completion_queue(uv_prepare_t *handle) {
+ Nan::HandleScope scope;
+ grpc_event event;
+ (void)handle;
+ do {
+ event = grpc_completion_queue_next(
+ queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
+
+ if (event.type == GRPC_OP_COMPLETE) {
+ Nan::Callback *callback = grpc::node::GetTagCallback(event.tag);
+ if (event.success) {
+ Local<Value> argv[] = {Nan::Null(),
+ grpc::node::GetTagNodeValue(event.tag)};
+ callback->Call(2, argv);
+ } else {
+ Local<Value> argv[] = {Nan::Error(
+ "The async function encountered an error")};
+ callback->Call(1, argv);
+ }
+ grpc::node::CompleteTag(event.tag);
+ grpc::node::DestroyTag(event.tag);
+ pending_batches--;
+ if (pending_batches == 0) {
+ uv_prepare_stop(&prepare);
+ }
+ }
+ } while (event.type != GRPC_QUEUE_TIMEOUT);
+}
+
+grpc_completion_queue *GetCompletionQueue() {
+#ifdef GRPC_UV
+ return queue;
+#else
+ return CompletionQueueAsyncWorker::GetQueue();
+#endif
+}
+
+void CompletionQueueNext() {
+#ifdef GRPC_UV
+ if (pending_batches == 0) {
+ GPR_ASSERT(!uv_is_active((uv_handle_t *)&prepare));
+ uv_prepare_start(&prepare, drain_completion_queue);
+ }
+ pending_batches++;
+#else
+ CompletionQueueAsyncWorker::Next();
+#endif
+}
+
+void CompletionQueueInit(Local<Object> exports) {
+#ifdef GRPC_UV
+ queue = grpc_completion_queue_create(NULL);
+ uv_prepare_init(uv_default_loop(), &prepare);
+ pending_batches = 0;
+#else
+ CompletionQueueAsyncWorker::Init(exports);
+#endif
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/ext/completion_queue.h b/src/node/ext/completion_queue.h
new file mode 100644
index 0000000000..bf280f768b
--- /dev/null
+++ b/src/node/ext/completion_queue.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <v8.h>
+
+namespace grpc {
+namespace node {
+
+grpc_completion_queue *GetCompletionQueue();
+
+void CompletionQueueNext();
+
+void CompletionQueueInit(v8::Local<v8::Object> exports);
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc
index 619ea41515..f5e03b277b 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_async_worker.cc
@@ -74,6 +74,7 @@ void CompletionQueueAsyncWorker::Execute() {
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
void CompletionQueueAsyncWorker::Next() {
+#ifndef GRPC_UV
Nan::HandleScope scope;
if (current_threads < max_queue_threads) {
current_threads += 1;
@@ -85,6 +86,7 @@ void CompletionQueueAsyncWorker::Next() {
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
+#endif
}
void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index 745b5023d5..a246a8c678 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -50,6 +50,7 @@
#include "completion_queue_async_worker.h"
#include "server_credentials.h"
#include "timeval.h"
+#include "completion_queue.h"
using v8::FunctionTemplate;
using v8::Local;
@@ -261,8 +262,8 @@ void InitLogConstants(Local<Object> exports) {
Nan::HandleScope scope;
Local<Object> log_verbosity = Nan::New<Object>();
Nan::Set(exports, Nan::New("logVerbosity").ToLocalChecked(), log_verbosity);
- Local<Value> DEBUG(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_DEBUG));
- Nan::Set(log_verbosity, Nan::New("DEBUG").ToLocalChecked(), DEBUG);
+ Local<Value> DEBUG_LOG(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_DEBUG));
+ Nan::Set(log_verbosity, Nan::New("DEBUG").ToLocalChecked(), DEBUG_LOG);
Local<Value> INFO(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_INFO));
Nan::Set(log_verbosity, Nan::New("INFO").ToLocalChecked(), INFO);
Local<Value> LOG_ERROR(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_ERROR));
@@ -414,6 +415,12 @@ NAN_METHOD(SetLogVerbosity) {
gpr_set_log_verbosity(severity);
}
+uv_signal_t signal_handle;
+
+void signal_callback(uv_signal_t *handle, int signum) {
+ uv_print_all_handles(uv_default_loop(), stderr);
+}
+
void init(Local<Object> exports) {
Nan::HandleScope scope;
grpc_init();
@@ -428,14 +435,20 @@ void init(Local<Object> exports) {
InitWriteFlags(exports);
InitLogConstants(exports);
+ uv_signal_init(uv_default_loop(), &signal_handle);
+ uv_signal_start(&signal_handle, signal_callback, SIGUSR2);
+ uv_unref((uv_handle_t *)&signal_handle);
+
+
grpc::node::Call::Init(exports);
grpc::node::CallCredentials::Init(exports);
grpc::node::Channel::Init(exports);
grpc::node::ChannelCredentials::Init(exports);
grpc::node::Server::Init(exports);
- grpc::node::CompletionQueueAsyncWorker::Init(exports);
grpc::node::ServerCredentials::Init(exports);
+ grpc::node::CompletionQueueInit(exports);
+
// Attach a few utility functions directly to the module
Nan::Set(exports, Nan::New("metadataKeyIsLegal").ToLocalChecked(),
Nan::GetFunction(
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index dd1b777ac8..29f31ff15e 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -40,6 +40,7 @@
#include <vector>
#include "call.h"
+#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
@@ -64,6 +65,7 @@ using v8::Array;
using v8::Boolean;
using v8::Date;
using v8::Exception;
+using v8::External;
using v8::Function;
using v8::FunctionTemplate;
using v8::Local;
@@ -75,6 +77,8 @@ using v8::Value;
Nan::Callback *Server::constructor;
Persistent<FunctionTemplate> Server::fun_tpl;
+static Callback *shutdown_callback;
+
class NewCallOp : public Op {
public:
NewCallOp() {
@@ -111,6 +115,9 @@ class NewCallOp : public Op {
shared_ptr<Resources> resources) {
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
grpc_call *call;
grpc_call_details details;
@@ -120,17 +127,50 @@ class NewCallOp : public Op {
std::string GetTypeString() const { return "new_call"; }
};
+class ServerShutdownOp : public Op {
+ public:
+ ServerShutdownOp(grpc_server *server): server(server) {
+ }
+
+ ~ServerShutdownOp() {
+ }
+
+ Local<Value> GetNodeValue() const {
+ return Nan::New<External>(reinterpret_cast<void *>(server));
+ }
+
+ bool ParseOp(Local<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ return true;
+ }
+ bool IsFinalOp() {
+ return false;
+ }
+
+ grpc_server *server;
+
+ protected:
+ std::string GetTypeString() const { return "shutdown"; }
+};
+
+NAN_METHOD(ServerShutdownCallback) {
+ if (!info[0]->IsNull()) {
+ return Nan::ThrowError("forceShutdown failed somehow");
+ }
+ MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]);
+ Local<Object> result = maybe_result.ToLocalChecked();
+ Local<Value> server_val = Nan::Get(
+ result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked();
+ Local<External> server_extern = server_val.As<External>();
+ grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value());
+ grpc_server_destroy(server);
+}
+
Server::Server(grpc_server *server) : wrapped_server(server) {
- shutdown_queue = grpc_completion_queue_create(NULL);
- grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
- NULL);
}
Server::~Server() {
this->ShutdownServer();
- grpc_completion_queue_shutdown(this->shutdown_queue);
- grpc_server_destroy(this->wrapped_server);
- grpc_completion_queue_destroy(this->shutdown_queue);
}
void Server::Init(Local<Object> exports) {
@@ -147,6 +187,11 @@ void Server::Init(Local<Object> exports) {
Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked();
Nan::Set(exports, Nan::New("Server").ToLocalChecked(), ctr);
constructor = new Callback(ctr);
+
+ Local<FunctionTemplate>callback_tpl =
+ Nan::New<FunctionTemplate>(ServerShutdownCallback);
+ shutdown_callback = new Callback(
+ Nan::GetFunction(callback_tpl).ToLocalChecked());
}
bool Server::HasInstance(Local<Value> val) {
@@ -155,11 +200,19 @@ bool Server::HasInstance(Local<Value> val) {
}
void Server::ShutdownServer() {
- grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue,
- NULL);
- grpc_server_cancel_all_calls(this->wrapped_server);
- grpc_completion_queue_pluck(this->shutdown_queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ if (this->wrapped_server != NULL) {
+ ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server);
+ unique_ptr<OpVec> ops(new OpVec());
+ ops->push_back(unique_ptr<Op>(op));
+
+ grpc_server_shutdown_and_notify(
+ this->wrapped_server, GetCompletionQueue(),
+ new struct tag(new Callback(**shutdown_callback), ops.release(),
+ shared_ptr<Resources>(nullptr), NULL));
+ grpc_server_cancel_all_calls(this->wrapped_server);
+ CompletionQueueNext();
+ this->wrapped_server = NULL;
+ }
}
NAN_METHOD(Server::New) {
@@ -179,7 +232,7 @@ NAN_METHOD(Server::New) {
}
}
grpc_server *wrapped_server;
- grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue();
+ grpc_completion_queue *queue = GetCompletionQueue();
grpc_channel_args *channel_args;
if (!ParseChannelArgs(info[0], &channel_args)) {
DeallocateChannelArgs(channel_args);
@@ -205,14 +258,14 @@ NAN_METHOD(Server::RequestCall) {
ops->push_back(unique_ptr<Op>(op));
grpc_call_error error = grpc_server_request_call(
server->wrapped_server, &op->call, &op->details, &op->request_metadata,
- CompletionQueueAsyncWorker::GetQueue(),
- CompletionQueueAsyncWorker::GetQueue(),
+ GetCompletionQueue(),
+ GetCompletionQueue(),
new struct tag(new Callback(info[0].As<Function>()), ops.release(),
- shared_ptr<Resources>(nullptr)));
+ shared_ptr<Resources>(nullptr), NULL));
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("requestCall failed", error));
}
- CompletionQueueAsyncWorker::Next();
+ CompletionQueueNext();
}
NAN_METHOD(Server::AddHttp2Port) {
@@ -259,10 +312,10 @@ NAN_METHOD(Server::TryShutdown) {
Server *server = ObjectWrap::Unwrap<Server>(info.This());
unique_ptr<OpVec> ops(new OpVec());
grpc_server_shutdown_and_notify(
- server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(),
+ server->wrapped_server, GetCompletionQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
- shared_ptr<Resources>(nullptr)));
- CompletionQueueAsyncWorker::Next();
+ shared_ptr<Resources>(nullptr), NULL));
+ CompletionQueueNext();
}
NAN_METHOD(Server::ForceShutdown) {
diff --git a/src/node/ext/server.h b/src/node/ext/server.h
index ab5fc210e8..9e6a7bd1e0 100644
--- a/src/node/ext/server.h
+++ b/src/node/ext/server.h
@@ -73,7 +73,6 @@ class Server : public Nan::ObjectWrap {
static Nan::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_server *wrapped_server;
- grpc_completion_queue *shutdown_queue;
};
} // namespace node
diff --git a/src/node/ext/server_credentials.cc b/src/node/ext/server_credentials.cc
index a817ade518..0ff58bb209 100644
--- a/src/node/ext/server_credentials.cc
+++ b/src/node/ext/server_credentials.cc
@@ -169,18 +169,18 @@ NAN_METHOD(ServerCredentials::CreateSsl) {
for(uint32_t i = 0; i < key_cert_pair_count; i++) {
Local<Value> pair_val = Nan::Get(pair_list, i).ToLocalChecked();
if (!pair_val->IsObject()) {
- delete key_cert_pairs;
+ delete[] key_cert_pairs;
return Nan::ThrowTypeError("Key/cert pairs must be objects");
}
Local<Object> pair_obj = Nan::To<Object>(pair_val).ToLocalChecked();
Local<Value> maybe_key = Nan::Get(pair_obj, key_key).ToLocalChecked();
Local<Value> maybe_cert = Nan::Get(pair_obj, cert_key).ToLocalChecked();
if (!::node::Buffer::HasInstance(maybe_key)) {
- delete key_cert_pairs;
+ delete[] key_cert_pairs;
return Nan::ThrowTypeError("private_key must be a Buffer");
}
if (!::node::Buffer::HasInstance(maybe_cert)) {
- delete key_cert_pairs;
+ delete[] key_cert_pairs;
return Nan::ThrowTypeError("cert_chain must be a Buffer");
}
key_cert_pairs[i].private_key = ::node::Buffer::Data(maybe_key);
@@ -189,7 +189,7 @@ NAN_METHOD(ServerCredentials::CreateSsl) {
grpc_server_credentials *creds = grpc_ssl_server_credentials_create_ex(
root_certs, key_cert_pairs, key_cert_pair_count,
client_certificate_request, NULL);
- delete key_cert_pairs;
+ delete[] key_cert_pairs;
if (creds == NULL) {
info.GetReturnValue().SetNull();
} else {
diff --git a/src/node/index.js b/src/node/index.js
index 9fb6faa5d7..a294aad8ee 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -219,3 +219,7 @@ exports.getClientChannel = client.getClientChannel;
* @see module:src/client.waitForClientReady
*/
exports.waitForClientReady = client.waitForClientReady;
+
+exports.closeClient = function closeClient(client_obj) {
+ client.getClientChannel(client_obj).close();
+};
diff --git a/src/node/src/client.js b/src/node/src/client.js
index f75f951eb8..9c1562e8b8 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -382,6 +382,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
if (args.options) {
message.grpcWriteFlags = args.options.flags;
}
+
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;
diff --git a/src/node/src/grpc_extension.js b/src/node/src/grpc_extension.js
index 6a8fe2c03c..63a281ddbc 100644
--- a/src/node/src/grpc_extension.js
+++ b/src/node/src/grpc_extension.js
@@ -31,7 +31,7 @@
*
*/
-var binary = require('node-pre-gyp');
+var binary = require('node-pre-gyp/lib/pre-binding');
var path = require('path');
var binding_path =
binary.find(path.resolve(path.join(__dirname, '../../../package.json')));
diff --git a/src/node/test/async_test.js b/src/node/test/async_test.js
index c46e745116..7b467e5475 100644
--- a/src/node/test/async_test.js
+++ b/src/node/test/async_test.js
@@ -61,6 +61,7 @@ describe('Async functionality', function() {
done();
});
after(function() {
+ grpc.closeClient(math_client);
server.forceShutdown();
});
it('should not hang', function(done) {
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index ab7b461178..9fe0ef3c00 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -104,29 +104,38 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/iocp_windows.c',
'src/core/lib/iomgr/iomgr.c',
'src/core/lib/iomgr/iomgr_posix.c',
+ 'src/core/lib/iomgr/iomgr_uv.c',
'src/core/lib/iomgr/iomgr_windows.c',
'src/core/lib/iomgr/load_file.c',
'src/core/lib/iomgr/network_status_tracker.c',
'src/core/lib/iomgr/polling_entity.c',
+ 'src/core/lib/iomgr/pollset_set_uv.c',
'src/core/lib/iomgr/pollset_set_windows.c',
+ 'src/core/lib/iomgr/pollset_uv.c',
'src/core/lib/iomgr/pollset_windows.c',
'src/core/lib/iomgr/resolve_address_posix.c',
+ 'src/core/lib/iomgr/resolve_address_uv.c',
'src/core/lib/iomgr/resolve_address_windows.c',
'src/core/lib/iomgr/sockaddr_utils.c',
'src/core/lib/iomgr/socket_utils_common_posix.c',
'src/core/lib/iomgr/socket_utils_linux.c',
'src/core/lib/iomgr/socket_utils_posix.c',
+ 'src/core/lib/iomgr/socket_utils_uv.c',
'src/core/lib/iomgr/socket_utils_windows.c',
'src/core/lib/iomgr/socket_windows.c',
'src/core/lib/iomgr/tcp_client_posix.c',
+ 'src/core/lib/iomgr/tcp_client_uv.c',
'src/core/lib/iomgr/tcp_client_windows.c',
'src/core/lib/iomgr/tcp_posix.c',
'src/core/lib/iomgr/tcp_server_posix.c',
+ 'src/core/lib/iomgr/tcp_server_uv.c',
'src/core/lib/iomgr/tcp_server_windows.c',
+ 'src/core/lib/iomgr/tcp_uv.c',
'src/core/lib/iomgr/tcp_windows.c',
'src/core/lib/iomgr/time_averaged_stats.c',
- 'src/core/lib/iomgr/timer.c',
+ 'src/core/lib/iomgr/timer_generic.c',
'src/core/lib/iomgr/timer_heap.c',
+ 'src/core/lib/iomgr/timer_uv.c',
'src/core/lib/iomgr/udp_server.c',
'src/core/lib/iomgr/unix_sockets_posix.c',
'src/core/lib/iomgr/unix_sockets_posix_noop.c',
@@ -135,6 +144,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/wakeup_fd_pipe.c',
'src/core/lib/iomgr/wakeup_fd_posix.c',
'src/core/lib/iomgr/workqueue_posix.c',
+ 'src/core/lib/iomgr/workqueue_uv.c',
'src/core/lib/iomgr/workqueue_windows.c',
'src/core/lib/json/json.c',
'src/core/lib/json/json_reader.c',