aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2016-10-07 09:55:35 -0700
committerGravatar murgatroid99 <mlumish@google.com>2016-10-07 09:55:35 -0700
commit2c287ca750c114c7230e57a1231d7e22863ab53d (patch)
tree2c74c603ba071ccb3c0ab64e2494d28f641d2cac /src
parenteebb129fd39c050a9d3b325fcd89df8aadb09218 (diff)
UV tests pass on linux
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/iomgr/endpoint_pair_uv.c4
-rw-r--r--src/core/lib/iomgr/pollset_uv.c77
-rw-r--r--src/core/lib/iomgr/pollset_uv.h2
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c2
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c47
-rw-r--r--src/core/lib/iomgr/tcp_uv.c12
-rw-r--r--src/core/lib/iomgr/timer_uv.c4
-rw-r--r--src/node/ext/node_grpc.cc7
9 files changed, 121 insertions, 38 deletions
diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.c
index eeca8070b5..4f769901ae 100644
--- a/src/core/lib/iomgr/endpoint_pair_uv.c
+++ b/src/core/lib/iomgr/endpoint_pair_uv.c
@@ -37,13 +37,15 @@
#include <stdlib.h>
+#include <grpc/support/log.h>
+
#include "src/core/lib/iomgr/endpoint_pair.h"
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size) {
grpc_endpoint_pair endpoint_pair;
// TODO(mlumish): implement this properly under libuv
- abort();
+ GPR_ASSERT(false && "grpc_iomgr_create_endpoint_pair is not suppoted with libuv");
return endpoint_pair;
}
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index b304eb64de..8f4e20f296 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -35,40 +35,105 @@
#ifdef GRPC_UV
+#include <uv.h>
+
+#include <string.h>
+
+#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_uv.h"
+struct grpc_pollset {
+ uv_timer_t timer;
+ int shutting_down;
+};
+
+/* Indicates that grpc_pollset_work should run an iteration of the UV loop
+ before running callbacks. This defaults to 1, and should be disabled if
+ grpc_pollset_work will be called within the callstack of uv_run */
+int grpc_pollset_work_run_loop;
+
gpr_mu grpc_polling_mu;
-size_t grpc_pollset_size() { return 1; }
+size_t grpc_pollset_size() { return sizeof(grpc_pollset); }
-void grpc_pollset_global_init(void) { gpr_mu_init(&grpc_polling_mu); }
+void grpc_pollset_global_init(void) {
+ gpr_mu_init(&grpc_polling_mu);
+ grpc_pollset_work_run_loop = 1;
+}
void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &grpc_polling_mu;
+ memset(pollset, 0, sizeof(grpc_pollset));
+ uv_timer_init(uv_default_loop(), &pollset->timer);
+ pollset->shutting_down = 0;
+}
+
+static void timer_close_cb(uv_handle_t *handle) {
+ handle->data = (void *)1;
}
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
+ GPR_ASSERT(!pollset->shutting_down);
+ pollset->shutting_down = 1;
+ if (grpc_pollset_work_run_loop) {
+ // Drain any pending UV callbacks without blocking
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ }
grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
}
-void grpc_pollset_destroy(grpc_pollset *pollset) {}
+void grpc_pollset_destroy(grpc_pollset *pollset) {
+ uv_close((uv_handle_t*)&pollset->timer, timer_close_cb);
+ // timer.data is a boolean indicating that the timer has finished closing
+ pollset->timer.data = (void *)0;
+ if (grpc_pollset_work_run_loop) {
+ while (!pollset->timer.data) {
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ }
+ }
+}
-void grpc_pollset_reset(grpc_pollset *pollset) {}
+void grpc_pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ pollset->shutting_down = 0;
+}
+
+static void timer_run_cb(uv_timer_t *timer) {
+}
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
+ uint64_t timeout;
+ gpr_mu_unlock(&grpc_polling_mu);
+ if (grpc_pollset_work_run_loop) {
+ if (gpr_time_cmp(deadline, now) >= 0) {
+ timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now));
+ } else {
+ timeout = 0;
+ }
+ /* We special-case timeout=0 so that we don't bother with the timer when
+ the loop won't block anyway */
+ if (timeout > 0) {
+ uv_timer_start(&pollset->timer, timer_run_cb, timeout, 0);
+ /* Run until there is some I/O activity or the timer triggers. It doesn't
+ matter which happens */
+ uv_run(uv_default_loop(), UV_RUN_ONCE);
+ uv_timer_stop(&pollset->timer);
+ } else {
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ }
+ }
if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
- gpr_mu_unlock(&grpc_polling_mu);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&grpc_polling_mu);
}
+ gpr_mu_lock(&grpc_polling_mu);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h
index 5cbc83e991..9f1d1442b2 100644
--- a/src/core/lib/iomgr/pollset_uv.h
+++ b/src/core/lib/iomgr/pollset_uv.h
@@ -31,5 +31,7 @@
*
*/
+extern int grpc_pollset_work_run_loop;
+
void grpc_pollset_global_init(void);
void grpc_pollset_global_shutdown(void);
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 76170722f2..b8295acfa1 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -143,6 +143,8 @@ static grpc_error *blocking_resolve_address_impl(
int s;
grpc_error *err;
+ req.addrinfo = NULL;
+
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
goto done;
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index 50e3615aad..d48147ce6e 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -61,7 +61,6 @@ static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) {
}
static void tcp_close_callback(uv_handle_t *handle) {
- gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", handle);
gpr_free(handle);
}
@@ -73,7 +72,6 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
/* error == NONE implies that the timer ran out, and wasn't cancelled. If
it was cancelled, then the handler that cancelled it also should close
the handle, if applicable */
- gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", connect->tcp_handle);
uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback);
}
done = (--connect->refs == 0);
@@ -104,7 +102,6 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
} else {
error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
uv_strerror(status));
- gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", connect->tcp_handle);
uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback);
}
}
@@ -128,7 +125,6 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
connect->closure = closure;
connect->endpoint = ep;
connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t));
- gpr_log(GPR_DEBUG, "Allocated uv_tcp_t handle %p", connect->tcp_handle);
connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
uv_tcp_init(uv_default_loop(), connect->tcp_handle);
connect->connect_req.data = connect;
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index a9eaf206d0..e1eee2d460 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -116,7 +116,6 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_tcp_listener *sp = s->head;
s->head = sp->next;
sp->next = NULL;
- gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", sp->handle);
gpr_free(sp->handle);
gpr_free(sp);
}
@@ -141,7 +140,6 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
immediately_done = 1;
}
for (sp = s->head; sp; sp = sp->next) {
- gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", sp->handle);
uv_close((uv_handle_t *)sp->handle, handle_close_callback);
}
@@ -166,6 +164,10 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}
+static void accepted_connection_close_cb(uv_handle_t *handle) {
+ gpr_free(handle);
+}
+
static void on_connect(uv_stream_t *server, int status) {
grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
@@ -176,7 +178,6 @@ static void on_connect(uv_stream_t *server, int status) {
char *peer_name_string;
int err;
- gpr_log(GPR_DEBUG, "Server %p received a connection", sp->server);
if (status < 0) {
gpr_log(GPR_INFO, "Skipping on_accept due to error: %s",
@@ -184,25 +185,28 @@ static void on_connect(uv_stream_t *server, int status) {
return;
}
client = gpr_malloc(sizeof(uv_tcp_t));
- gpr_log(GPR_DEBUG, "Allocated uv_tcp_t handle %p", client);
uv_tcp_init(uv_default_loop(), client);
// UV documentation says this is guaranteed to succeed
uv_accept((uv_stream_t *)server, (uv_stream_t *)client);
- peer_name_string = NULL;
- memset(&peer_name, 0, sizeof(grpc_resolved_address));
- peer_name.len = sizeof(struct sockaddr_storage);
- err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr,
- (int *)&peer_name.len);
- if (err == 0) {
- peer_name_string = grpc_sockaddr_to_uri(&peer_name);
+ // If the server has not been started, we discard incoming connections
+ if (sp->server->on_accept_cb == NULL) {
+ uv_close((uv_handle_t *)client, accepted_connection_close_cb);
} else {
- gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
+ peer_name_string = NULL;
+ memset(&peer_name, 0, sizeof(grpc_resolved_address));
+ peer_name.len = sizeof(struct sockaddr_storage);
+ err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr,
+ (int *)&peer_name.len);
+ if (err == 0) {
+ peer_name_string = grpc_sockaddr_to_uri(&peer_name);
+ } else {
+ gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
+ }
+ ep = grpc_tcp_create(client, peer_name_string);
+ sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
+ &acceptor);
+ grpc_exec_ctx_finish(&exec_ctx);
}
- ep = grpc_tcp_create(client, peer_name_string);
- gpr_log(GPR_DEBUG, "Calling on_accept_cb for server %p", sp->server);
- sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
- &acceptor);
- grpc_exec_ctx_finish(&exec_ctx);
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle,
@@ -224,6 +228,14 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle,
return error;
}
+ status = uv_listen((uv_stream_t *)handle, SOMAXCONN, on_connect);
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE("Failed to listen to port");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
+ return error;
+ }
+
sockname_temp.len = (int)sizeof(struct sockaddr_storage);
status = uv_tcp_getsockname(handle, (struct sockaddr *)&sockname_temp.addr,
(int *)&sockname_temp.len);
@@ -308,7 +320,6 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
}
handle = gpr_malloc(sizeof(uv_tcp_t));
- gpr_log(GPR_DEBUG, "Allocating uv_tcp_t handle %p", handle);
status = uv_tcp_init(uv_default_loop(), handle);
if (status == 0) {
error = add_socket_to_server(s, handle, addr, port_index, &sp);
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 88c4195c2b..a78a40d261 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -64,13 +64,12 @@ typedef struct {
gpr_slice_buffer *write_slices;
uv_buf_t *write_buffers;
- int shutting_down;
+ bool shutting_down;
char *peer_string;
grpc_pollset *pollset;
} grpc_tcp;
static void uv_close_callback(uv_handle_t *handle) {
- gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", handle);
gpr_free(handle);
}
@@ -281,14 +280,16 @@ static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); }
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t));
- uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
+ if (!tcp->shutting_down) {
+ tcp->shutting_down = true;
+ uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t));
+ uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
+ }
}
static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
- gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", tcp->handle);
uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
TCP_UNREF(tcp, "destroy");
}
@@ -322,6 +323,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
handle->data = tcp;
gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
+ tcp->shutting_down = false;
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index 222f1554a3..cfcb89268b 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -47,14 +47,12 @@ static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); }
static void stop_uv_timer(uv_timer_t *handle) {
uv_timer_stop(handle);
uv_unref((uv_handle_t *)handle);
- gpr_log(GPR_DEBUG, "Closing uv_timer_t handle %p", handle);
uv_close((uv_handle_t *)handle, timer_close_callback);
}
void run_expired_timer(uv_timer_t *handle) {
grpc_timer *timer = (grpc_timer *)handle->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- gpr_log(GPR_DEBUG, "Timer callback: %p", timer);
GPR_ASSERT(!timer->triggered);
timer->triggered = 1;
grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
@@ -75,7 +73,6 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
}
timer->triggered = 0;
timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now));
- gpr_log(GPR_DEBUG, "Setting timer %p: %lu", timer, timeout);
uv_timer = gpr_malloc(sizeof(uv_timer_t));
uv_timer_init(uv_default_loop(), uv_timer);
uv_timer->data = timer;
@@ -85,7 +82,6 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (!timer->triggered) {
- gpr_log(GPR_DEBUG, "Running cancelled timer callback");
timer->triggered = 1;
grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index a246a8c678..b8013c4193 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -42,6 +42,10 @@
#include "grpc/support/log.h"
#include "grpc/support/time.h"
+#ifdef GRPC_UV
+#include "src/core/lib/iomgr/pollset_uv.h"
+#endif
+
#include "call.h"
#include "call_credentials.h"
#include "channel.h"
@@ -439,6 +443,9 @@ void init(Local<Object> exports) {
uv_signal_start(&signal_handle, signal_callback, SIGUSR2);
uv_unref((uv_handle_t *)&signal_handle);
+#ifdef GRPC_UV
+ grpc_pollset_work_run_loop = 0;
+#endif
grpc::node::Call::Init(exports);
grpc::node::CallCredentials::Init(exports);