diff options
author | murgatroid99 <mlumish@google.com> | 2017-07-12 14:22:46 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2017-07-18 13:03:42 -0700 |
commit | 87116f449985cd49728326ecbdb7293cc522b8ff (patch) | |
tree | b3a075b8e366cfc3faef33884131802734a9c991 | |
parent | e908821d242c4435ab5973a824b48ad544763ce3 (diff) |
Fix failures in libuv portability tests
-rw-r--r-- | src/core/lib/iomgr/iomgr_uv.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_uv.c | 17 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_uv.c | 109 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.c | 4 |
4 files changed, 91 insertions, 43 deletions
diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index 49d1a03c32..7a99a6efdd 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -21,12 +21,16 @@ #ifdef GRPC_UV #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/pollset_uv.h" #include "src/core/lib/iomgr/tcp_uv.h" void grpc_iomgr_platform_init(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); + grpc_executor_set_threading(&exec_ctx, false); + grpc_exec_ctx_finish(&exec_ctx); } void grpc_iomgr_platform_flush(void) {} void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index a98b8e62db..5180357cb1 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -114,11 +114,14 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *error; int retry_status; + char *port = r->port; gpr_free(req); retry_status = retry_named_port_failure(status, r, getaddrinfo_callback); if (retry_status == 0) { - // The request is being retried. Nothing should be done here + /* The request is being retried. It is using its own port string, so we free + * the original one */ + gpr_free(port); return; } /* Either no retry was attempted, or the retry failed. Either way, the @@ -218,16 +221,18 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_pollset_set *interested_parties, grpc_closure *on_done, grpc_resolved_addresses **addrs) { - uv_getaddrinfo_t *req; - request *r; - struct addrinfo *hints; - char *host; - char *port; + uv_getaddrinfo_t *req = NULL; + request *r = NULL; + struct addrinfo *hints = NULL; + char *host = NULL; + char *port = NULL; grpc_error *err; int s; err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); + gpr_free(host); + gpr_free(port); return; } r = gpr_malloc(sizeof(request)); diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2ab836cc34..a63a36bf58 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -20,6 +20,7 @@ #ifdef GRPC_UV +#include <assert.h> #include <string.h> #include <grpc/support/alloc.h> @@ -43,6 +44,8 @@ struct grpc_tcp_listener { struct grpc_tcp_listener *next; bool closed; + + bool has_pending_connection; }; struct grpc_tcp_server { @@ -142,10 +145,12 @@ static void handle_close_callback(uv_handle_t *handle) { } static void close_listener(grpc_tcp_listener *sp) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (!sp->closed) { sp->closed = true; uv_close((uv_handle_t *)sp->handle, handle_close_callback); } + grpc_exec_ctx_finish(&exec_ctx); } static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { @@ -183,18 +188,49 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -static void accepted_connection_close_cb(uv_handle_t *handle) { - gpr_free(handle); -} - -static void on_connect(uv_stream_t *server, int status) { - grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; +static void finish_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp) { + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); uv_tcp_t *client; grpc_endpoint *ep = NULL; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address peer_name; char *peer_name_string; int err; + uv_tcp_t *server = sp->handle; + + client = gpr_malloc(sizeof(uv_tcp_t)); + uv_tcp_init(uv_default_loop(), client); + // UV documentation says this is guaranteed to succeed + uv_accept((uv_stream_t *)server, (uv_stream_t *)client); + peer_name_string = NULL; + memset(&peer_name, 0, sizeof(grpc_resolved_address)); + peer_name.len = sizeof(struct sockaddr_storage); + err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, + (int *)&peer_name.len); + if (err == 0) { + peer_name_string = grpc_sockaddr_to_uri(&peer_name); + } else { + gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err)); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (peer_name_string) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s", + sp->server, peer_name_string); + } else { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection", sp->server); + } + } + ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = 0; + sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, + acceptor); + gpr_free(peer_name_string); +} + +static void on_connect(uv_stream_t *server, int status) { + grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (status < 0) { switch (status) { @@ -207,35 +243,17 @@ static void on_connect(uv_stream_t *server, int status) { } } - client = gpr_malloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), client); - // UV documentation says this is guaranteed to succeed - uv_accept((uv_stream_t *)server, (uv_stream_t *)client); - // If the server has not been started, we discard incoming connections - if (sp->server->on_accept_cb == NULL) { - uv_close((uv_handle_t *)client, accepted_connection_close_cb); + GPR_ASSERT(!sp->has_pending_connection); + + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + + // Create acceptor. + if (sp->server->on_accept_cb) { + finish_accept(&exec_ctx, sp); } else { - peer_name_string = NULL; - memset(&peer_name, 0, sizeof(grpc_resolved_address)); - peer_name.len = sizeof(struct sockaddr_storage); - err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, - (int *)&peer_name.len); - if (err == 0) { - peer_name_string = grpc_sockaddr_to_uri(&peer_name); - } else { - gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); - } - ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); - // Create acceptor. - grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); - acceptor->from_server = sp->server; - acceptor->port_index = sp->port_index; - acceptor->fd_index = 0; - sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, - acceptor); - grpc_exec_ctx_finish(&exec_ctx); - gpr_free(peer_name_string); + sp->has_pending_connection = true; } + grpc_exec_ctx_finish(&exec_ctx); } static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, @@ -282,7 +300,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, GPR_ASSERT(port >= 0); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp = gpr_zalloc(sizeof(grpc_tcp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -366,6 +384,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, gpr_free(allocated_addr); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + char *port_string; + grpc_sockaddr_to_string(&port_string, addr, 0); + const char *str = grpc_error_string(error); + if (port_string) { + gpr_log(GPR_DEBUG, "SERVER %p add_port %s error=%s", s, port_string, str); + gpr_free(port_string); + } else { + gpr_log(GPR_DEBUG, "SERVER %p add_port error=%s", s, str); + } + } + if (error != GRPC_ERROR_NONE) { grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); @@ -385,13 +415,18 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_START %p", server); + } GPR_ASSERT(on_accept_cb); GPR_ASSERT(!server->on_accept_cb); server->on_accept_cb = on_accept_cb; server->on_accept_cb_arg = cb_arg; for (sp = server->head; sp; sp = sp->next) { - GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) == - 0); + if (sp->has_pending_connection) { + finish_accept(exec_ctx, sp); + sp->has_pending_connection = false; + } } } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index ff5fd3edc8..0302af2a06 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -307,6 +307,10 @@ static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + const char *str = grpc_error_string(why); + gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str); + } tcp->shutting_down = true; uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); |