diff options
author | murgatroid99 <mlumish@google.com> | 2016-10-07 09:55:35 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2016-10-07 09:55:35 -0700 |
commit | 2c287ca750c114c7230e57a1231d7e22863ab53d (patch) | |
tree | 2c74c603ba071ccb3c0ab64e2494d28f641d2cac /src/core/lib | |
parent | eebb129fd39c050a9d3b325fcd89df8aadb09218 (diff) |
UV tests pass on linux
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair_uv.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_uv.c | 77 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_uv.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_uv.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_uv.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_uv.c | 47 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.c | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.c | 4 |
8 files changed, 114 insertions, 38 deletions
diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.c index eeca8070b5..4f769901ae 100644 --- a/src/core/lib/iomgr/endpoint_pair_uv.c +++ b/src/core/lib/iomgr/endpoint_pair_uv.c @@ -37,13 +37,15 @@ #include <stdlib.h> +#include <grpc/support/log.h> + #include "src/core/lib/iomgr/endpoint_pair.h" grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read_slice_size) { grpc_endpoint_pair endpoint_pair; // TODO(mlumish): implement this properly under libuv - abort(); + GPR_ASSERT(false && "grpc_iomgr_create_endpoint_pair is not suppoted with libuv"); return endpoint_pair; } diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index b304eb64de..8f4e20f296 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -35,40 +35,105 @@ #ifdef GRPC_UV +#include <uv.h> + +#include <string.h> + +#include <grpc/support/log.h> #include <grpc/support/sync.h> #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_uv.h" +struct grpc_pollset { + uv_timer_t timer; + int shutting_down; +}; + +/* Indicates that grpc_pollset_work should run an iteration of the UV loop + before running callbacks. This defaults to 1, and should be disabled if + grpc_pollset_work will be called within the callstack of uv_run */ +int grpc_pollset_work_run_loop; + gpr_mu grpc_polling_mu; -size_t grpc_pollset_size() { return 1; } +size_t grpc_pollset_size() { return sizeof(grpc_pollset); } -void grpc_pollset_global_init(void) { gpr_mu_init(&grpc_polling_mu); } +void grpc_pollset_global_init(void) { + gpr_mu_init(&grpc_polling_mu); + grpc_pollset_work_run_loop = 1; +} void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &grpc_polling_mu; + memset(pollset, 0, sizeof(grpc_pollset)); + uv_timer_init(uv_default_loop(), &pollset->timer); + pollset->shutting_down = 0; +} + +static void timer_close_cb(uv_handle_t *handle) { + handle->data = (void *)1; } void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { + GPR_ASSERT(!pollset->shutting_down); + pollset->shutting_down = 1; + if (grpc_pollset_work_run_loop) { + // Drain any pending UV callbacks without blocking + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); } -void grpc_pollset_destroy(grpc_pollset *pollset) {} +void grpc_pollset_destroy(grpc_pollset *pollset) { + uv_close((uv_handle_t*)&pollset->timer, timer_close_cb); + // timer.data is a boolean indicating that the timer has finished closing + pollset->timer.data = (void *)0; + if (grpc_pollset_work_run_loop) { + while (!pollset->timer.data) { + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } + } +} -void grpc_pollset_reset(grpc_pollset *pollset) {} +void grpc_pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + pollset->shutting_down = 0; +} + +static void timer_run_cb(uv_timer_t *timer) { +} grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + uint64_t timeout; + gpr_mu_unlock(&grpc_polling_mu); + if (grpc_pollset_work_run_loop) { + if (gpr_time_cmp(deadline, now) >= 0) { + timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); + } else { + timeout = 0; + } + /* We special-case timeout=0 so that we don't bother with the timer when + the loop won't block anyway */ + if (timeout > 0) { + uv_timer_start(&pollset->timer, timer_run_cb, timeout, 0); + /* Run until there is some I/O activity or the timer triggers. It doesn't + matter which happens */ + uv_run(uv_default_loop(), UV_RUN_ONCE); + uv_timer_stop(&pollset->timer); + } else { + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } + } if (!grpc_closure_list_empty(exec_ctx->closure_list)) { - gpr_mu_unlock(&grpc_polling_mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&grpc_polling_mu); } + gpr_mu_lock(&grpc_polling_mu); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h index 5cbc83e991..9f1d1442b2 100644 --- a/src/core/lib/iomgr/pollset_uv.h +++ b/src/core/lib/iomgr/pollset_uv.h @@ -31,5 +31,7 @@ * */ +extern int grpc_pollset_work_run_loop; + void grpc_pollset_global_init(void); void grpc_pollset_global_shutdown(void); diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 76170722f2..b8295acfa1 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -143,6 +143,8 @@ static grpc_error *blocking_resolve_address_impl( int s; grpc_error *err; + req.addrinfo = NULL; + err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { goto done; diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 50e3615aad..d48147ce6e 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -61,7 +61,6 @@ static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) { } static void tcp_close_callback(uv_handle_t *handle) { - gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", handle); gpr_free(handle); } @@ -73,7 +72,6 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, /* error == NONE implies that the timer ran out, and wasn't cancelled. If it was cancelled, then the handler that cancelled it also should close the handle, if applicable */ - gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", connect->tcp_handle); uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback); } done = (--connect->refs == 0); @@ -104,7 +102,6 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { } else { error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); - gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", connect->tcp_handle); uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback); } } @@ -128,7 +125,6 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, connect->closure = closure; connect->endpoint = ep; connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t)); - gpr_log(GPR_DEBUG, "Allocated uv_tcp_t handle %p", connect->tcp_handle); connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index a9eaf206d0..e1eee2d460 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -116,7 +116,6 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { grpc_tcp_listener *sp = s->head; s->head = sp->next; sp->next = NULL; - gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", sp->handle); gpr_free(sp->handle); gpr_free(sp); } @@ -141,7 +140,6 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { immediately_done = 1; } for (sp = s->head; sp; sp = sp->next) { - gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", sp->handle); uv_close((uv_handle_t *)sp->handle, handle_close_callback); } @@ -166,6 +164,10 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } +static void accepted_connection_close_cb(uv_handle_t *handle) { + gpr_free(handle); +} + static void on_connect(uv_stream_t *server, int status) { grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0}; @@ -176,7 +178,6 @@ static void on_connect(uv_stream_t *server, int status) { char *peer_name_string; int err; - gpr_log(GPR_DEBUG, "Server %p received a connection", sp->server); if (status < 0) { gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", @@ -184,25 +185,28 @@ static void on_connect(uv_stream_t *server, int status) { return; } client = gpr_malloc(sizeof(uv_tcp_t)); - gpr_log(GPR_DEBUG, "Allocated uv_tcp_t handle %p", client); uv_tcp_init(uv_default_loop(), client); // UV documentation says this is guaranteed to succeed uv_accept((uv_stream_t *)server, (uv_stream_t *)client); - peer_name_string = NULL; - memset(&peer_name, 0, sizeof(grpc_resolved_address)); - peer_name.len = sizeof(struct sockaddr_storage); - err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, - (int *)&peer_name.len); - if (err == 0) { - peer_name_string = grpc_sockaddr_to_uri(&peer_name); + // If the server has not been started, we discard incoming connections + if (sp->server->on_accept_cb == NULL) { + uv_close((uv_handle_t *)client, accepted_connection_close_cb); } else { - gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); + peer_name_string = NULL; + memset(&peer_name, 0, sizeof(grpc_resolved_address)); + peer_name.len = sizeof(struct sockaddr_storage); + err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, + (int *)&peer_name.len); + if (err == 0) { + peer_name_string = grpc_sockaddr_to_uri(&peer_name); + } else { + gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); + } + ep = grpc_tcp_create(client, peer_name_string); + sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, + &acceptor); + grpc_exec_ctx_finish(&exec_ctx); } - ep = grpc_tcp_create(client, peer_name_string); - gpr_log(GPR_DEBUG, "Calling on_accept_cb for server %p", sp->server); - sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, - &acceptor); - grpc_exec_ctx_finish(&exec_ctx); } static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, @@ -224,6 +228,14 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, return error; } + status = uv_listen((uv_stream_t *)handle, SOMAXCONN, on_connect); + if (status != 0) { + error = GRPC_ERROR_CREATE("Failed to listen to port"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + return error; + } + sockname_temp.len = (int)sizeof(struct sockaddr_storage); status = uv_tcp_getsockname(handle, (struct sockaddr *)&sockname_temp.addr, (int *)&sockname_temp.len); @@ -308,7 +320,6 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, } handle = gpr_malloc(sizeof(uv_tcp_t)); - gpr_log(GPR_DEBUG, "Allocating uv_tcp_t handle %p", handle); status = uv_tcp_init(uv_default_loop(), handle); if (status == 0) { error = add_socket_to_server(s, handle, addr, port_index, &sp); diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 88c4195c2b..a78a40d261 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -64,13 +64,12 @@ typedef struct { gpr_slice_buffer *write_slices; uv_buf_t *write_buffers; - int shutting_down; + bool shutting_down; char *peer_string; grpc_pollset *pollset; } grpc_tcp; static void uv_close_callback(uv_handle_t *handle) { - gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", handle); gpr_free(handle); } @@ -281,14 +280,16 @@ static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); } static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t)); - uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); + if (!tcp->shutting_down) { + tcp->shutting_down = true; + uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t)); + uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); + } } static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; - gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", tcp->handle); uv_close((uv_handle_t *)tcp->handle, uv_close_callback); TCP_UNREF(tcp, "destroy"); } @@ -322,6 +323,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { handle->data = tcp; gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); + tcp->shutting_down = false; /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 222f1554a3..cfcb89268b 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -47,14 +47,12 @@ static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); } static void stop_uv_timer(uv_timer_t *handle) { uv_timer_stop(handle); uv_unref((uv_handle_t *)handle); - gpr_log(GPR_DEBUG, "Closing uv_timer_t handle %p", handle); uv_close((uv_handle_t *)handle, timer_close_callback); } void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_log(GPR_DEBUG, "Timer callback: %p", timer); GPR_ASSERT(!timer->triggered); timer->triggered = 1; grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); @@ -75,7 +73,6 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } timer->triggered = 0; timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); - gpr_log(GPR_DEBUG, "Setting timer %p: %lu", timer, timeout); uv_timer = gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); uv_timer->data = timer; @@ -85,7 +82,6 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { if (!timer->triggered) { - gpr_log(GPR_DEBUG, "Running cancelled timer callback"); timer->triggered = 1; grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); stop_uv_timer((uv_timer_t *)timer->uv_timer); |