diff options
author | Michael Lumish <mlumish@google.com> | 2017-07-21 09:57:36 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-21 09:57:36 -0700 |
commit | 8321cadeb031cfa7f142479b40fc13e52216790c (patch) | |
tree | da0512dee9f0d56084cb6881286b07df77cc14f6 /src | |
parent | 50791829380e47a04ee8dda9ef23c8288d8646c0 (diff) | |
parent | 9d7def71fe04ec25b7cace53adb5775dbcccc353 (diff) |
Merge pull request #11871 from murgatroid99/uv_portability_fix
Fix libuv core portability tests
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/iomgr/iomgr_uv.c | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr_uv.h | 37 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_uv.c | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_uv.c | 21 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_uv.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_uv.c | 115 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.c | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.c | 4 |
8 files changed, 159 insertions, 43 deletions
diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index 8b1245c640..df5d23af3b 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -21,12 +21,20 @@ #ifdef GRPC_UV #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/pollset_uv.h" #include "src/core/lib/iomgr/tcp_uv.h" +gpr_thd_id g_init_thread; + void grpc_iomgr_platform_init(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); grpc_register_tracer(&grpc_tcp_trace); + grpc_executor_set_threading(&exec_ctx, false); + g_init_thread = gpr_thd_currentid(); + grpc_exec_ctx_finish(&exec_ctx); } void grpc_iomgr_platform_flush(void) {} void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h new file mode 100644 index 0000000000..3b4daaa73b --- /dev/null +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -0,0 +1,37 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_UV_H +#define GRPC_CORE_LIB_IOMGR_IOMGR_UV_H + +#include "src/core/lib/iomgr/iomgr_internal.h" + +#include <grpc/support/thd.h> + +/* The thread ID of the thread on which grpc was initialized. Used to verify + * that all calls into libuv are made on that same thread */ +extern gpr_thd_id g_init_thread; + +#ifdef GRPC_UV_THREAD_CHECK +#define GRPC_UV_ASSERT_SAME_THREAD() \ + GPR_ASSERT(gpr_thd_currentid() == g_init_thread) +#else +#define GRPC_UV_ASSERT_SAME_THREAD() +#endif /* GRPC_UV_THREAD_CHECK */ + +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */ diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 946f0c8c60..a79fe89d3e 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -28,6 +28,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_uv.h" @@ -70,6 +71,7 @@ void grpc_pollset_global_init(void) { } void grpc_pollset_global_shutdown(void) { + GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_destroy(&grpc_polling_mu); uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb); } @@ -79,6 +81,7 @@ static void timer_run_cb(uv_timer_t *timer) {} static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + GRPC_UV_ASSERT_SAME_THREAD(); *mu = &grpc_polling_mu; uv_timer_init(uv_default_loop(), &pollset->timer); pollset->shutting_down = 0; @@ -87,6 +90,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(!pollset->shutting_down); + GRPC_UV_ASSERT_SAME_THREAD(); pollset->shutting_down = 1; if (grpc_pollset_work_run_loop) { // Drain any pending UV callbacks without blocking @@ -99,6 +103,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + GRPC_UV_ASSERT_SAME_THREAD(); uv_close((uv_handle_t *)&pollset->timer, timer_close_cb); // timer.data is a boolean indicating that the timer has finished closing pollset->timer.data = (void *)0; @@ -113,6 +118,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { uint64_t timeout; + GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { if (gpr_time_cmp(deadline, now) >= 0) { @@ -141,6 +147,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + GRPC_UV_ASSERT_SAME_THREAD(); uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index a98b8e62db..2d438e8b48 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -30,6 +30,7 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -114,11 +115,14 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *error; int retry_status; + char *port = r->port; gpr_free(req); retry_status = retry_named_port_failure(status, r, getaddrinfo_callback); if (retry_status == 0) { - // The request is being retried. Nothing should be done here + /* The request is being retried. It is using its own port string, so we free + * the original one */ + gpr_free(port); return; } /* Either no retry was attempted, or the retry failed. Either way, the @@ -171,6 +175,8 @@ static grpc_error *blocking_resolve_address_impl( grpc_error *err; int retry_status; + GRPC_UV_ASSERT_SAME_THREAD(); + req.addrinfo = NULL; err = try_split_host_port(name, default_port, &host, &port); @@ -218,16 +224,19 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_pollset_set *interested_parties, grpc_closure *on_done, grpc_resolved_addresses **addrs) { - uv_getaddrinfo_t *req; - request *r; - struct addrinfo *hints; - char *host; - char *port; + uv_getaddrinfo_t *req = NULL; + request *r = NULL; + struct addrinfo *hints = NULL; + char *host = NULL; + char *port = NULL; grpc_error *err; int s; + GRPC_UV_ASSERT_SAME_THREAD(); err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); + gpr_free(host); + gpr_free(port); return; } r = gpr_malloc(sizeof(request)); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 2f1d237c07..786c456b73 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -26,6 +26,7 @@ #include <grpc/support/log.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/tcp_uv.h" @@ -124,6 +125,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, (void)channel_args; (void)interested_parties; + GRPC_UV_ASSERT_SAME_THREAD(); + if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 079c913579..3b9332321f 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -20,6 +20,7 @@ #ifdef GRPC_UV +#include <assert.h> #include <string.h> #include <grpc/support/alloc.h> @@ -27,6 +28,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_server.h" @@ -43,6 +45,8 @@ struct grpc_tcp_listener { struct grpc_tcp_listener *next; bool closed; + + bool has_pending_connection; }; struct grpc_tcp_server { @@ -104,6 +108,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { + GRPC_UV_ASSERT_SAME_THREAD(); gpr_ref(&s->refs); return s; } @@ -168,6 +173,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + GRPC_UV_ASSERT_SAME_THREAD(); if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; @@ -183,18 +189,49 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -static void accepted_connection_close_cb(uv_handle_t *handle) { - gpr_free(handle); -} - -static void on_connect(uv_stream_t *server, int status) { - grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; +static void finish_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp) { + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); uv_tcp_t *client; grpc_endpoint *ep = NULL; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address peer_name; char *peer_name_string; int err; + uv_tcp_t *server = sp->handle; + + client = gpr_malloc(sizeof(uv_tcp_t)); + uv_tcp_init(uv_default_loop(), client); + // UV documentation says this is guaranteed to succeed + uv_accept((uv_stream_t *)server, (uv_stream_t *)client); + peer_name_string = NULL; + memset(&peer_name, 0, sizeof(grpc_resolved_address)); + peer_name.len = sizeof(struct sockaddr_storage); + err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, + (int *)&peer_name.len); + if (err == 0) { + peer_name_string = grpc_sockaddr_to_uri(&peer_name); + } else { + gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err)); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (peer_name_string) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s", + sp->server, peer_name_string); + } else { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection", sp->server); + } + } + ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = 0; + sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, + acceptor); + gpr_free(peer_name_string); +} + +static void on_connect(uv_stream_t *server, int status) { + grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (status < 0) { switch (status) { @@ -207,35 +244,19 @@ static void on_connect(uv_stream_t *server, int status) { } } - client = gpr_malloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), client); - // UV documentation says this is guaranteed to succeed - uv_accept((uv_stream_t *)server, (uv_stream_t *)client); - // If the server has not been started, we discard incoming connections - if (sp->server->on_accept_cb == NULL) { - uv_close((uv_handle_t *)client, accepted_connection_close_cb); + GPR_ASSERT(!sp->has_pending_connection); + + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + } + + // Create acceptor. + if (sp->server->on_accept_cb) { + finish_accept(&exec_ctx, sp); } else { - peer_name_string = NULL; - memset(&peer_name, 0, sizeof(grpc_resolved_address)); - peer_name.len = sizeof(struct sockaddr_storage); - err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, - (int *)&peer_name.len); - if (err == 0) { - peer_name_string = grpc_sockaddr_to_uri(&peer_name); - } else { - gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); - } - ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); - // Create acceptor. - grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); - acceptor->from_server = sp->server; - acceptor->port_index = sp->port_index; - acceptor->fd_index = 0; - sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, - acceptor); - grpc_exec_ctx_finish(&exec_ctx); - gpr_free(peer_name_string); + sp->has_pending_connection = true; } + grpc_exec_ctx_finish(&exec_ctx); } static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, @@ -282,7 +303,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, GPR_ASSERT(port >= 0); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp = gpr_zalloc(sizeof(grpc_tcp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -318,6 +339,8 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, grpc_error *error = GRPC_ERROR_NONE; int family; + GRPC_UV_ASSERT_SAME_THREAD(); + if (s->tail != NULL) { port_index = s->tail->port_index + 1; } @@ -378,6 +401,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, gpr_free(allocated_addr); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + char *port_string; + grpc_sockaddr_to_string(&port_string, addr, 0); + const char *str = grpc_error_string(error); + if (port_string) { + gpr_log(GPR_DEBUG, "SERVER %p add_port %s error=%s", s, port_string, str); + gpr_free(port_string); + } else { + gpr_log(GPR_DEBUG, "SERVER %p add_port error=%s", s, str); + } + } + if (error != GRPC_ERROR_NONE) { grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); @@ -397,13 +432,19 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; + GRPC_UV_ASSERT_SAME_THREAD(); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_START %p", server); + } GPR_ASSERT(on_accept_cb); GPR_ASSERT(!server->on_accept_cb); server->on_accept_cb = on_accept_cb; server->on_accept_cb_arg = cb_arg; for (sp = server->head; sp; sp = sp->next) { - GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) == - 0); + if (sp->has_pending_connection) { + finish_accept(exec_ctx, sp); + sp->has_pending_connection = false; + } } } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 7c6a9b85f5..a05c19b4ac 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -30,6 +30,7 @@ #include <grpc/support/string_util.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/tcp_uv.h" @@ -183,6 +184,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; + GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; @@ -236,6 +238,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, unsigned int i; grpc_slice *slice; uv_write_t *write_req; + GRPC_UV_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t j; @@ -307,6 +310,10 @@ static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + const char *str = grpc_error_string(why); + gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str); + } tcp->shutting_down = true; uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 1ab82ef1d5..70f49bcbe8 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -24,6 +24,7 @@ #include <grpc/support/log.h> #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/timer.h" #include <uv.h> @@ -43,6 +44,7 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(timer->pending); timer->pending = 0; GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE); @@ -55,6 +57,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec now) { uint64_t timeout; uv_timer_t *uv_timer; + GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { timer->pending = 0; @@ -75,6 +78,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { + GRPC_UV_ASSERT_SAME_THREAD(); if (timer->pending) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); |