From 87116f449985cd49728326ecbdb7293cc522b8ff Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 12 Jul 2017 14:22:46 -0700 Subject: Fix failures in libuv portability tests --- src/core/lib/iomgr/tcp_server_uv.c | 109 ++++++++++++++++++++++++------------- 1 file changed, 72 insertions(+), 37 deletions(-) (limited to 'src/core/lib/iomgr/tcp_server_uv.c') diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2ab836cc34..a63a36bf58 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -20,6 +20,7 @@ #ifdef GRPC_UV +#include #include #include @@ -43,6 +44,8 @@ struct grpc_tcp_listener { struct grpc_tcp_listener *next; bool closed; + + bool has_pending_connection; }; struct grpc_tcp_server { @@ -142,10 +145,12 @@ static void handle_close_callback(uv_handle_t *handle) { } static void close_listener(grpc_tcp_listener *sp) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (!sp->closed) { sp->closed = true; uv_close((uv_handle_t *)sp->handle, handle_close_callback); } + grpc_exec_ctx_finish(&exec_ctx); } static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { @@ -183,18 +188,49 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -static void accepted_connection_close_cb(uv_handle_t *handle) { - gpr_free(handle); -} - -static void on_connect(uv_stream_t *server, int status) { - grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; +static void finish_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp) { + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); uv_tcp_t *client; grpc_endpoint *ep = NULL; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address peer_name; char *peer_name_string; int err; + uv_tcp_t *server = sp->handle; + + client = gpr_malloc(sizeof(uv_tcp_t)); + uv_tcp_init(uv_default_loop(), client); + // UV documentation says this is guaranteed to succeed + uv_accept((uv_stream_t *)server, (uv_stream_t *)client); + peer_name_string = NULL; + memset(&peer_name, 0, sizeof(grpc_resolved_address)); + peer_name.len = sizeof(struct sockaddr_storage); + err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, + (int *)&peer_name.len); + if (err == 0) { + peer_name_string = grpc_sockaddr_to_uri(&peer_name); + } else { + gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err)); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (peer_name_string) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s", + sp->server, peer_name_string); + } else { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection", sp->server); + } + } + ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = 0; + sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, + acceptor); + gpr_free(peer_name_string); +} + +static void on_connect(uv_stream_t *server, int status) { + grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (status < 0) { switch (status) { @@ -207,35 +243,17 @@ static void on_connect(uv_stream_t *server, int status) { } } - client = gpr_malloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), client); - // UV documentation says this is guaranteed to succeed - uv_accept((uv_stream_t *)server, (uv_stream_t *)client); - // If the server has not been started, we discard incoming connections - if (sp->server->on_accept_cb == NULL) { - uv_close((uv_handle_t *)client, accepted_connection_close_cb); + GPR_ASSERT(!sp->has_pending_connection); + + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + + // Create acceptor. + if (sp->server->on_accept_cb) { + finish_accept(&exec_ctx, sp); } else { - peer_name_string = NULL; - memset(&peer_name, 0, sizeof(grpc_resolved_address)); - peer_name.len = sizeof(struct sockaddr_storage); - err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, - (int *)&peer_name.len); - if (err == 0) { - peer_name_string = grpc_sockaddr_to_uri(&peer_name); - } else { - gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); - } - ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); - // Create acceptor. - grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); - acceptor->from_server = sp->server; - acceptor->port_index = sp->port_index; - acceptor->fd_index = 0; - sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, - acceptor); - grpc_exec_ctx_finish(&exec_ctx); - gpr_free(peer_name_string); + sp->has_pending_connection = true; } + grpc_exec_ctx_finish(&exec_ctx); } static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, @@ -282,7 +300,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, GPR_ASSERT(port >= 0); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp = gpr_zalloc(sizeof(grpc_tcp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -366,6 +384,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, gpr_free(allocated_addr); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + char *port_string; + grpc_sockaddr_to_string(&port_string, addr, 0); + const char *str = grpc_error_string(error); + if (port_string) { + gpr_log(GPR_DEBUG, "SERVER %p add_port %s error=%s", s, port_string, str); + gpr_free(port_string); + } else { + gpr_log(GPR_DEBUG, "SERVER %p add_port error=%s", s, str); + } + } + if (error != GRPC_ERROR_NONE) { grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); @@ -385,13 +415,18 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_START %p", server); + } GPR_ASSERT(on_accept_cb); GPR_ASSERT(!server->on_accept_cb); server->on_accept_cb = on_accept_cb; server->on_accept_cb_arg = cb_arg; for (sp = server->head; sp; sp = sp->next) { - GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) == - 0); + if (sp->has_pending_connection) { + finish_accept(exec_ctx, sp); + sp->has_pending_connection = false; + } } } -- cgit v1.2.3 From b8e07ad780ceb0bae7c431d0de1c6343bf275ceb Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 18 Jul 2017 13:20:55 -0700 Subject: Add asserts that uv calls all run on the same thread --- BUILD | 1 + build.yaml | 1 + gRPC-Core.podspec | 2 ++ grpc.gemspec | 1 + package.xml | 1 + src/core/lib/iomgr/iomgr_uv.c | 4 +++ src/core/lib/iomgr/iomgr_uv.h | 37 ++++++++++++++++++++++ src/core/lib/iomgr/pollset_uv.c | 7 ++++ src/core/lib/iomgr/resolve_address_uv.c | 4 +++ src/core/lib/iomgr/tcp_client_uv.c | 3 ++ src/core/lib/iomgr/tcp_server_uv.c | 6 ++++ src/core/lib/iomgr/tcp_uv.c | 3 ++ src/core/lib/iomgr/timer_uv.c | 4 +++ tools/doxygen/Doxyfile.core.internal | 1 + tools/run_tests/generated/sources_and_headers.json | 2 ++ tools/run_tests/run_tests.py | 2 +- vsprojects/vcxproj/grpc/grpc.vcxproj | 1 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 3 ++ .../vcxproj/grpc_test_util/grpc_test_util.vcxproj | 1 + .../grpc_test_util/grpc_test_util.vcxproj.filters | 3 ++ .../vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 1 + .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 3 ++ 22 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 src/core/lib/iomgr/iomgr_uv.h (limited to 'src/core/lib/iomgr/tcp_server_uv.c') diff --git a/BUILD b/BUILD index 495bb668eb..189a8276e5 100644 --- a/BUILD +++ b/BUILD @@ -597,6 +597,7 @@ grpc_cc_library( "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_internal.h", "src/core/lib/iomgr/iomgr_posix.h", + "src/core/lib/iomgr/iomgr_uv.h", "src/core/lib/iomgr/is_epollexclusive_available.h", "src/core/lib/iomgr/load_file.h", "src/core/lib/iomgr/lockfree_event.h", diff --git a/build.yaml b/build.yaml index e55c4ca301..f43a13e9bc 100644 --- a/build.yaml +++ b/build.yaml @@ -213,6 +213,7 @@ filegroups: - src/core/lib/iomgr/iomgr.h - src/core/lib/iomgr/iomgr_internal.h - src/core/lib/iomgr/iomgr_posix.h + - src/core/lib/iomgr/iomgr_uv.h - src/core/lib/iomgr/is_epollexclusive_available.h - src/core/lib/iomgr/load_file.h - src/core/lib/iomgr/lockfree_event.h diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3f841d51d8..7a2c5a40f4 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -276,6 +276,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_internal.h', 'src/core/lib/iomgr/iomgr_posix.h', + 'src/core/lib/iomgr/iomgr_uv.h', 'src/core/lib/iomgr/is_epollexclusive_available.h', 'src/core/lib/iomgr/load_file.h', 'src/core/lib/iomgr/lockfree_event.h', @@ -753,6 +754,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_internal.h', 'src/core/lib/iomgr/iomgr_posix.h', + 'src/core/lib/iomgr/iomgr_uv.h', 'src/core/lib/iomgr/is_epollexclusive_available.h', 'src/core/lib/iomgr/load_file.h', 'src/core/lib/iomgr/lockfree_event.h', diff --git a/grpc.gemspec b/grpc.gemspec index 663915b75e..692977c740 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -208,6 +208,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/iomgr.h ) s.files += %w( src/core/lib/iomgr/iomgr_internal.h ) s.files += %w( src/core/lib/iomgr/iomgr_posix.h ) + s.files += %w( src/core/lib/iomgr/iomgr_uv.h ) s.files += %w( src/core/lib/iomgr/is_epollexclusive_available.h ) s.files += %w( src/core/lib/iomgr/load_file.h ) s.files += %w( src/core/lib/iomgr/lockfree_event.h ) diff --git a/package.xml b/package.xml index cfa45f06d7..0714e61415 100644 --- a/package.xml +++ b/package.xml @@ -222,6 +222,7 @@ + diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index 7a99a6efdd..c484a39c54 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -22,14 +22,18 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/pollset_uv.h" #include "src/core/lib/iomgr/tcp_uv.h" +gpr_thd_id grpc_init_thread; + void grpc_iomgr_platform_init(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); grpc_executor_set_threading(&exec_ctx, false); + grpc_init_thread = gpr_thd_currentid(); grpc_exec_ctx_finish(&exec_ctx); } void grpc_iomgr_platform_flush(void) {} diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h new file mode 100644 index 0000000000..35c073a033 --- /dev/null +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -0,0 +1,37 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_UV_H +#define GRPC_CORE_LIB_IOMGR_IOMGR_UV_H + +#include "src/core/lib/iomgr/iomgr_internal.h" + +#include + +/* The thread ID of the thread on which grpc was initialized. Used to verify + * that all calls into libuv are made on that same thread */ +extern gpr_thd_id grpc_init_thread; + +#ifdef GRPC_UV_THREAD_CHECK +#define GRPC_ASSERT_SAME_THREAD() \ + GPR_ASSERT(gpr_thd_currentid() == grpc_init_thread) +#else +#define GRPC_ASSERT_SAME_THREAD() +#endif /* GRPC_UV_THREAD_CHECK */ + +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */ diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 1a54065a91..47c8c79cc2 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -28,6 +28,7 @@ #include #include +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_uv.h" @@ -69,6 +70,7 @@ void grpc_pollset_global_init(void) { } void grpc_pollset_global_shutdown(void) { + GRPC_ASSERT_SAME_THREAD(); gpr_mu_destroy(&grpc_polling_mu); uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb); } @@ -78,6 +80,7 @@ static void timer_run_cb(uv_timer_t *timer) {} static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + GRPC_ASSERT_SAME_THREAD(); *mu = &grpc_polling_mu; uv_timer_init(uv_default_loop(), &pollset->timer); pollset->shutting_down = 0; @@ -86,6 +89,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(!pollset->shutting_down); + GRPC_ASSERT_SAME_THREAD(); pollset->shutting_down = 1; if (grpc_pollset_work_run_loop) { // Drain any pending UV callbacks without blocking @@ -98,6 +102,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + GRPC_ASSERT_SAME_THREAD(); uv_close((uv_handle_t *)&pollset->timer, timer_close_cb); // timer.data is a boolean indicating that the timer has finished closing pollset->timer.data = (void *)0; @@ -112,6 +117,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { uint64_t timeout; + GRPC_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { if (gpr_time_cmp(deadline, now) >= 0) { @@ -140,6 +146,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + GRPC_ASSERT_SAME_THREAD(); uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 5180357cb1..f910ec26b2 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -30,6 +30,7 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -174,6 +175,8 @@ static grpc_error *blocking_resolve_address_impl( grpc_error *err; int retry_status; + GRPC_ASSERT_SAME_THREAD(); + req.addrinfo = NULL; err = try_split_host_port(name, default_port, &host, &port); @@ -228,6 +231,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, char *port = NULL; grpc_error *err; int s; + GRPC_ASSERT_SAME_THREAD(); err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 2f1d237c07..098ff7648d 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -26,6 +26,7 @@ #include #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/tcp_uv.h" @@ -124,6 +125,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, (void)channel_args; (void)interested_parties; + GRPC_ASSERT_SAME_THREAD(); + if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index a63a36bf58..7b45e08c9b 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -28,6 +28,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_server.h" @@ -107,6 +108,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { + GRPC_ASSERT_SAME_THREAD(); gpr_ref(&s->refs); return s; } @@ -173,6 +175,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + GRPC_ASSERT_SAME_THREAD(); if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; @@ -335,6 +338,8 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, int status; grpc_error *error = GRPC_ERROR_NONE; + GRPC_ASSERT_SAME_THREAD(); + if (s->tail != NULL) { port_index = s->tail->port_index + 1; } @@ -415,6 +420,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; + GRPC_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "SERVER_START %p", server); } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 0302af2a06..115297634b 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -30,6 +30,7 @@ #include #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/tcp_uv.h" @@ -183,6 +184,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; + GRPC_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; @@ -236,6 +238,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, unsigned int i; grpc_slice *slice; uv_write_t *write_req; + GRPC_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t j; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 4f204cfbf8..e004ff5a3a 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -24,6 +24,7 @@ #include #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/timer.h" #include @@ -42,6 +43,7 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_ASSERT_SAME_THREAD(); GPR_ASSERT(timer->pending); timer->pending = 0; GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE); @@ -54,6 +56,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec now) { uint64_t timeout; uv_timer_t *uv_timer; + GRPC_ASSERT_SAME_THREAD(); timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { timer->pending = 0; @@ -74,6 +77,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { + GRPC_ASSERT_SAME_THREAD(); if (timer->pending) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 766c20f59b..1263bdbb74 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1118,6 +1118,7 @@ src/core/lib/iomgr/iomgr_internal.h \ src/core/lib/iomgr/iomgr_posix.c \ src/core/lib/iomgr/iomgr_posix.h \ src/core/lib/iomgr/iomgr_uv.c \ +src/core/lib/iomgr/iomgr_uv.h \ src/core/lib/iomgr/iomgr_windows.c \ src/core/lib/iomgr/is_epollexclusive_available.c \ src/core/lib/iomgr/is_epollexclusive_available.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 477a258daa..93411a8c61 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7654,6 +7654,7 @@ "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_internal.h", "src/core/lib/iomgr/iomgr_posix.h", + "src/core/lib/iomgr/iomgr_uv.h", "src/core/lib/iomgr/is_epollexclusive_available.h", "src/core/lib/iomgr/load_file.h", "src/core/lib/iomgr/lockfree_event.h", @@ -7812,6 +7813,7 @@ "src/core/lib/iomgr/iomgr_posix.c", "src/core/lib/iomgr/iomgr_posix.h", "src/core/lib/iomgr/iomgr_uv.c", + "src/core/lib/iomgr/iomgr_uv.h", "src/core/lib/iomgr/iomgr_windows.c", "src/core/lib/iomgr/is_epollexclusive_available.c", "src/core/lib/iomgr/is_epollexclusive_available.h", diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 611868ce5a..50eed6256c 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -245,7 +245,7 @@ class CLanguage(object): self._docker_distro, self._make_options = self._compiler_options(self.args.use_docker, self.args.compiler) if args.iomgr_platform == "uv": - cflags = '-DGRPC_UV ' + cflags = '-DGRPC_UV -DGRPC_UV_THREAD_CHECK' try: cflags += subprocess.check_output(['pkg-config', '--cflags', 'libuv']).strip() + ' ' except (subprocess.CalledProcessError, OSError): diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index f9badbbd35..12cb39a3df 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -333,6 +333,7 @@ + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index f3e997e4e0..c762200a00 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -944,6 +944,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 1899be91db..98bf5aeb69 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -228,6 +228,7 @@ + diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 9c81fc5bf5..145b51c206 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -677,6 +677,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 3a865f3e81..12c352a3f8 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -323,6 +323,7 @@ + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 0310ebc012..0b598363b8 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -854,6 +854,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr -- cgit v1.2.3 From 6617790c18e4048671d64d03212cfe84b960f376 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 18 Jul 2017 18:00:38 -0700 Subject: Add SO_REUSEPORT support to uv iomgr code --- src/core/lib/iomgr/sockaddr_utils.c | 5 +++++ src/core/lib/iomgr/sockaddr_utils.h | 2 ++ src/core/lib/iomgr/tcp_server_uv.c | 14 +++++++++++++- 3 files changed, 20 insertions(+), 1 deletion(-) (limited to 'src/core/lib/iomgr/tcp_server_uv.c') diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index 99dc2f1c78..3f4145d104 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -220,6 +220,11 @@ const char *grpc_sockaddr_get_uri_scheme( return NULL; } +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; + return addr->sa_family; +} + int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) { const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; switch (addr->sa_family) { diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 7692b969f2..a589a19705 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -75,4 +75,6 @@ char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr); /* Returns the URI scheme corresponding to \a addr */ const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr); +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2ab836cc34..079c913579 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -316,6 +316,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, unsigned port_index = 0; int status; grpc_error *error = GRPC_ERROR_NONE; + int family; if (s->tail != NULL) { port_index = s->tail->port_index + 1; @@ -353,7 +354,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, } handle = gpr_malloc(sizeof(uv_tcp_t)); - status = uv_tcp_init(uv_default_loop(), handle); + + family = grpc_sockaddr_get_family(addr); + status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (family == AF_INET || family == AF_INET6) { + int fd; + uv_fileno((uv_handle_t *)handle, &fd); + int enable = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif /* GPR_LINUX && SO_REUSEPORT */ + if (status == 0) { error = add_socket_to_server(s, handle, addr, port_index, &sp); } else { -- cgit v1.2.3 From 9d7def71fe04ec25b7cace53adb5775dbcccc353 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 20 Jul 2017 16:41:27 -0700 Subject: Address comments: change names and remove unnecessary lines --- src/core/lib/iomgr/iomgr_uv.c | 4 ++-- src/core/lib/iomgr/iomgr_uv.h | 8 ++++---- src/core/lib/iomgr/pollset_uv.c | 12 ++++++------ src/core/lib/iomgr/resolve_address_uv.c | 4 ++-- src/core/lib/iomgr/tcp_client_uv.c | 2 +- src/core/lib/iomgr/tcp_server_uv.c | 14 +++++++------- src/core/lib/iomgr/tcp_uv.c | 4 ++-- src/core/lib/iomgr/timer_uv.c | 6 +++--- 8 files changed, 27 insertions(+), 27 deletions(-) (limited to 'src/core/lib/iomgr/tcp_server_uv.c') diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index ffec6bcf76..df5d23af3b 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -26,14 +26,14 @@ #include "src/core/lib/iomgr/pollset_uv.h" #include "src/core/lib/iomgr/tcp_uv.h" -gpr_thd_id grpc_init_thread; +gpr_thd_id g_init_thread; void grpc_iomgr_platform_init(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); grpc_register_tracer(&grpc_tcp_trace); grpc_executor_set_threading(&exec_ctx, false); - grpc_init_thread = gpr_thd_currentid(); + g_init_thread = gpr_thd_currentid(); grpc_exec_ctx_finish(&exec_ctx); } void grpc_iomgr_platform_flush(void) {} diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h index bd406c34f7..3b4daaa73b 100644 --- a/src/core/lib/iomgr/iomgr_uv.h +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -25,13 +25,13 @@ /* The thread ID of the thread on which grpc was initialized. Used to verify * that all calls into libuv are made on that same thread */ -extern gpr_thd_id grpc_init_thread; +extern gpr_thd_id g_init_thread; #ifdef GRPC_UV_THREAD_CHECK -#define GRPC_ASSERT_SAME_THREAD() \ - GPR_ASSERT(gpr_thd_currentid() == grpc_init_thread) +#define GRPC_UV_ASSERT_SAME_THREAD() \ + GPR_ASSERT(gpr_thd_currentid() == g_init_thread) #else -#define GRPC_ASSERT_SAME_THREAD() +#define GRPC_UV_ASSERT_SAME_THREAD() #endif /* GRPC_UV_THREAD_CHECK */ #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */ diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index e59801834d..a79fe89d3e 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -71,7 +71,7 @@ void grpc_pollset_global_init(void) { } void grpc_pollset_global_shutdown(void) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_destroy(&grpc_polling_mu); uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb); } @@ -81,7 +81,7 @@ static void timer_run_cb(uv_timer_t *timer) {} static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); *mu = &grpc_polling_mu; uv_timer_init(uv_default_loop(), &pollset->timer); pollset->shutting_down = 0; @@ -90,7 +90,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(!pollset->shutting_down); - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); pollset->shutting_down = 1; if (grpc_pollset_work_run_loop) { // Drain any pending UV callbacks without blocking @@ -103,7 +103,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); uv_close((uv_handle_t *)&pollset->timer, timer_close_cb); // timer.data is a boolean indicating that the timer has finished closing pollset->timer.data = (void *)0; @@ -118,7 +118,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { uint64_t timeout; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { if (gpr_time_cmp(deadline, now) >= 0) { @@ -147,7 +147,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index f910ec26b2..2d438e8b48 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -175,7 +175,7 @@ static grpc_error *blocking_resolve_address_impl( grpc_error *err; int retry_status; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); req.addrinfo = NULL; @@ -231,7 +231,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, char *port = NULL; grpc_error *err; int s; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 098ff7648d..786c456b73 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -125,7 +125,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, (void)channel_args; (void)interested_parties; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 7b45e08c9b..7edf22a032 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -108,7 +108,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); gpr_ref(&s->refs); return s; } @@ -147,12 +147,10 @@ static void handle_close_callback(uv_handle_t *handle) { } static void close_listener(grpc_tcp_listener *sp) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (!sp->closed) { sp->closed = true; uv_close((uv_handle_t *)sp->handle, handle_close_callback); } - grpc_exec_ctx_finish(&exec_ctx); } static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { @@ -175,7 +173,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; @@ -248,7 +246,9 @@ static void on_connect(uv_stream_t *server, int status) { GPR_ASSERT(!sp->has_pending_connection); - gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + } // Create acceptor. if (sp->server->on_accept_cb) { @@ -338,7 +338,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, int status; grpc_error *error = GRPC_ERROR_NONE; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (s->tail != NULL) { port_index = s->tail->port_index + 1; @@ -420,7 +420,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "SERVER_START %p", server); } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 9b1a8db723..a05c19b4ac 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -184,7 +184,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; @@ -238,7 +238,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, unsigned int i; grpc_slice *slice; uv_write_t *write_req; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t j; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index ff2570c60d..70f49bcbe8 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -44,7 +44,7 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(timer->pending); timer->pending = 0; GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE); @@ -57,7 +57,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec now) { uint64_t timeout; uv_timer_t *uv_timer; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { timer->pending = 0; @@ -78,7 +78,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (timer->pending) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); -- cgit v1.2.3