diff options
author | 2017-10-13 16:07:13 -0700 | |
---|---|---|
committer | 2017-10-18 17:12:19 -0700 | |
commit | 0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch) | |
tree | e43d5de442fdcc3d39cd5af687f319fa39612d3f /src/core/lib/iomgr/udp_server.cc | |
parent | 6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff) |
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of
grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx
also keeps track of the previous exec_ctx so that nesting of exec_ctx is
allowed. This means that there is only one exec_ctx being used at any
time. Also, grpc_exec_ctx_finish is called in the destructor of the
object, and the previous exec_ctx is restored to avoid breaking current
functionality. The code still explicitly calls grpc_exec_ctx_finish
because removing all such instances causes the code to break.
Diffstat (limited to 'src/core/lib/iomgr/udp_server.cc')
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 60 |
1 files changed, 27 insertions, 33 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 00b2e68bb5..c868e82d1d 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -141,22 +141,21 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { return s; } -static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *args, - grpc_error *error) { +static void shutdown_fd(void *args, grpc_error *error) { struct shutdown_fd_args *shutdown_args = (struct shutdown_fd_args *)args; gpr_mu_lock(shutdown_args->server_mu); - grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error)); + grpc_fd_shutdown(shutdown_args->fd, GRPC_ERROR_REF(error)); gpr_mu_unlock(shutdown_args->server_mu); gpr_free(shutdown_args); } -static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void dummy_cb(void *arg, grpc_error *error) { // No-op. } -static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { +static void finish_shutdown(grpc_udp_server *s) { if (s->shutdown_complete != NULL) { - GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); @@ -174,14 +173,13 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { gpr_free(s); } -static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, - grpc_error *error) { +static void destroyed_port(void *server, grpc_error *error) { grpc_udp_server *s = (grpc_udp_server *)server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(exec_ctx, s); + finish_shutdown(s); } else { gpr_mu_unlock(&s->mu); } @@ -190,7 +188,7 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { +static void deactivated_all_ports(grpc_udp_server *s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -210,21 +208,19 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd, grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->orphan_cb); - sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, - sp->server->user_data); + sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); } - grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, NULL, false /* already_closed */, "udp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(exec_ctx, s); + finish_shutdown(s); } } -void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, - grpc_closure *on_done) { +void grpc_udp_server_destroy(grpc_udp_server *s, grpc_closure *on_done) { grpc_udp_listener *sp; gpr_mu_lock(&s->mu); @@ -243,14 +239,13 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, grpc_schedule_on_exec_ctx); - sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, - sp->server->user_data); + sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); sp->orphan_notified = true; } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(exec_ctx, s); + deactivated_all_ports(s); } } @@ -331,14 +326,14 @@ error: } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void on_read(void *arg, grpc_error *error) { grpc_udp_listener *sp = (grpc_udp_listener *)arg; gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(exec_ctx, sp->server); + deactivated_all_ports(sp->server); } else { gpr_mu_unlock(&sp->server->mu); } @@ -347,21 +342,21 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data); + sp->read_cb(sp->emfd, sp->server->user_data); /* Re-arm the notification event so we get another chance to read. */ - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); gpr_mu_unlock(&sp->server->mu); } -static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void on_write(void *arg, grpc_error *error) { grpc_udp_listener *sp = (grpc_udp_listener *)arg; gpr_mu_lock(&(sp->server->mu)); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(exec_ctx, sp->server); + deactivated_all_ports(sp->server); } else { gpr_mu_unlock(&sp->server->mu); } @@ -370,10 +365,10 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Tell the registered callback that the socket is writeable. */ GPR_ASSERT(sp->write_cb); - sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data); + sp->write_cb(sp->emfd, sp->server->user_data); /* Re-arm the notification event so we get another chance to write. */ - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); gpr_mu_unlock(&sp->server->mu); } @@ -512,9 +507,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { return sp->fd; } -void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, - grpc_pollset **pollsets, size_t pollset_count, - void *user_data) { +void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, + size_t pollset_count, void *user_data) { size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener *sp; @@ -525,15 +519,15 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, sp = s->head; while (sp != NULL) { for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); + grpc_pollset_add_fd(pollsets[i], sp->emfd); } GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); /* Registered for both read and write callbacks: increment active_ports * twice to account for this, and delay free-ing of memory until both |