diff options
author | 2017-10-13 16:07:13 -0700 | |
---|---|---|
committer | 2017-10-18 17:12:19 -0700 | |
commit | 0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch) | |
tree | e43d5de442fdcc3d39cd5af687f319fa39612d3f /src/core/lib/iomgr/tcp_server_posix.cc | |
parent | 6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff) |
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of
grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx
also keeps track of the previous exec_ctx so that nesting of exec_ctx is
allowed. This means that there is only one exec_ctx being used at any
time. Also, grpc_exec_ctx_finish is called in the destructor of the
object, and the previous exec_ctx is restored to avoid breaking current
functionality. The code still explicitly calls grpc_exec_ctx_finish
because removing all such instances causes the code to break.
Diffstat (limited to 'src/core/lib/iomgr/tcp_server_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.cc | 65 |
1 files changed, 31 insertions, 34 deletions
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 06612d639c..a9dcd68cfb 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -68,8 +68,7 @@ static void init(void) { #endif } -grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, - grpc_closure *shutdown_complete, +grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server) { gpr_once_init(&check_init, init); @@ -116,12 +115,12 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { +static void finish_shutdown(grpc_tcp_server *s) { gpr_mu_lock(&s->mu); GPR_ASSERT(s->shutdown); gpr_mu_unlock(&s->mu); if (s->shutdown_complete != NULL) { - GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); @@ -131,19 +130,18 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { s->head = sp->next; gpr_free(sp); } - grpc_channel_args_destroy(exec_ctx, s->channel_args); + grpc_channel_args_destroy(s->channel_args); gpr_free(s); } -static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, - grpc_error *error) { +static void destroyed_port(void *server, grpc_error *error) { grpc_tcp_server *s = (grpc_tcp_server *)server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(exec_ctx, s); + finish_shutdown(s); } else { GPR_ASSERT(s->destroyed_ports < s->nports); gpr_mu_unlock(&s->mu); @@ -153,7 +151,7 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { +static void deactivated_all_ports(grpc_tcp_server *s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -165,17 +163,17 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { grpc_unlink_if_unix_domain_socket(&sp->addr); GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); - grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, NULL, false /* already_closed */, "tcp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(exec_ctx, s); + finish_shutdown(s); } } -static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { +static void tcp_server_destroy(grpc_tcp_server *s) { gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -185,18 +183,18 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->active_ports) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Server destroyed")); + grpc_fd_shutdown( + sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed")); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(exec_ctx, s); + deactivated_all_ports(s); } } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { +static void on_read(void *arg, grpc_error *err) { grpc_tcp_listener *sp = (grpc_tcp_listener *)arg; grpc_pollset *read_notifier_pollset; if (err != GRPC_ERROR_NONE) { @@ -222,7 +220,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); return; default: gpr_mu_lock(&sp->server->mu); @@ -248,7 +246,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_fd *fdobj = grpc_fd_create(fd, name); - grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); + grpc_pollset_add_fd(read_notifier_pollset, fdobj); // Create acceptor. grpc_tcp_server_acceptor *acceptor = @@ -258,8 +256,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { acceptor->fd_index = sp->fd_index; sp->server->on_accept_cb( - exec_ctx, sp->server->on_accept_cb_arg, - grpc_tcp_create(exec_ctx, fdobj, sp->server->channel_args, addr_str), + sp->server->on_accept_cb_arg, + grpc_tcp_create(fdobj, sp->server->channel_args, addr_str), read_notifier_pollset, acceptor); gpr_free(name); @@ -272,7 +270,7 @@ error: gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(exec_ctx, sp->server); + deactivated_all_ports(sp->server); } else { gpr_mu_unlock(&sp->server->mu); } @@ -482,8 +480,8 @@ int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index, return -1; } -void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, - grpc_pollset **pollsets, size_t pollset_count, +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, + size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void *on_accept_cb_arg) { size_t i; @@ -503,20 +501,20 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, GPR_ASSERT(GRPC_LOG_IF_ERROR( "clone_port", clone_port(sp, (unsigned)(pollset_count - 1)))); for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); + grpc_pollset_add_fd(pollsets[i], sp->emfd); GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); s->active_ports++; sp = sp->next; } } else { for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); + grpc_pollset_add_fd(pollsets[i], sp->emfd); } GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); s->active_ports++; sp = sp->next; } @@ -537,25 +535,24 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, gpr_mu_unlock(&s->mu); } -void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { +void grpc_tcp_server_unref(grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { - grpc_tcp_server_shutdown_listeners(exec_ctx, s); + grpc_tcp_server_shutdown_listeners(s); gpr_mu_lock(&s->mu); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting); + GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting); gpr_mu_unlock(&s->mu); - tcp_server_destroy(exec_ctx, s); + tcp_server_destroy(s); } } -void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, - grpc_tcp_server *s) { +void grpc_tcp_server_shutdown_listeners(grpc_tcp_server *s) { gpr_mu_lock(&s->mu); s->shutdown_listeners = true; /* shutdown all fd's */ if (s->active_ports) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(exec_ctx, sp->emfd, + grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown")); } } |