diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-06 09:05:05 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-06 09:05:05 -0800 |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/iomgr/tcp_server_posix.cc | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/iomgr/tcp_server_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.cc | 64 |
1 files changed, 34 insertions, 30 deletions
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 99e1c6cd06..6fed13c6c7 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -68,7 +68,8 @@ static void init(void) { #endif } -grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete, +grpc_error* grpc_tcp_server_create(grpc_exec_ctx* exec_ctx, + grpc_closure* shutdown_complete, const grpc_channel_args* args, grpc_tcp_server** server) { gpr_once_init(&check_init, init); @@ -115,12 +116,12 @@ grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete, return GRPC_ERROR_NONE; } -static void finish_shutdown(grpc_tcp_server* s) { +static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s) { gpr_mu_lock(&s->mu); GPR_ASSERT(s->shutdown); gpr_mu_unlock(&s->mu); if (s->shutdown_complete != nullptr) { - GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); @@ -130,18 +131,19 @@ static void finish_shutdown(grpc_tcp_server* s) { s->head = sp->next; gpr_free(sp); } - grpc_channel_args_destroy(s->channel_args); + grpc_channel_args_destroy(exec_ctx, s->channel_args); gpr_free(s); } -static void destroyed_port(void* server, grpc_error* error) { +static void destroyed_port(grpc_exec_ctx* exec_ctx, void* server, + grpc_error* error) { grpc_tcp_server* s = (grpc_tcp_server*)server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + finish_shutdown(exec_ctx, s); } else { GPR_ASSERT(s->destroyed_ports < s->nports); gpr_mu_unlock(&s->mu); @@ -151,7 +153,7 @@ static void destroyed_port(void* server, grpc_error* error) { /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_tcp_server* s) { +static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -163,17 +165,17 @@ static void deactivated_all_ports(grpc_tcp_server* s) { grpc_unlink_if_unix_domain_socket(&sp->addr); GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); - grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, + grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, nullptr, false /* already_closed */, "tcp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + finish_shutdown(exec_ctx, s); } } -static void tcp_server_destroy(grpc_tcp_server* s) { +static void tcp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s) { gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -184,17 +186,18 @@ static void tcp_server_destroy(grpc_tcp_server* s) { grpc_tcp_listener* sp; for (sp = s->head; sp; sp = sp->next) { grpc_fd_shutdown( - sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed")); + exec_ctx, sp->emfd, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed")); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(s); + deactivated_all_ports(exec_ctx, s); } } /* event manager callback when reads are ready */ -static void on_read(void* arg, grpc_error* err) { +static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* err) { grpc_tcp_listener* sp = (grpc_tcp_listener*)arg; grpc_pollset* read_notifier_pollset; if (err != GRPC_ERROR_NONE) { @@ -220,7 +223,7 @@ static void on_read(void* arg, grpc_error* err) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); return; default: gpr_mu_lock(&sp->server->mu); @@ -246,7 +249,7 @@ static void on_read(void* arg, grpc_error* err) { grpc_fd* fdobj = grpc_fd_create(fd, name); - grpc_pollset_add_fd(read_notifier_pollset, fdobj); + grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); // Create acceptor. grpc_tcp_server_acceptor* acceptor = @@ -256,8 +259,8 @@ static void on_read(void* arg, grpc_error* err) { acceptor->fd_index = sp->fd_index; sp->server->on_accept_cb( - sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, sp->server->channel_args, addr_str), + exec_ctx, sp->server->on_accept_cb_arg, + grpc_tcp_create(exec_ctx, fdobj, sp->server->channel_args, addr_str), read_notifier_pollset, acceptor); gpr_free(name); @@ -270,7 +273,7 @@ error: gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server); + deactivated_all_ports(exec_ctx, sp->server); } else { gpr_mu_unlock(&sp->server->mu); } @@ -480,8 +483,8 @@ int grpc_tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index, return -1; } -void grpc_tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets, - size_t pollset_count, +void grpc_tcp_server_start(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s, + grpc_pollset** pollsets, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void* on_accept_cb_arg) { size_t i; @@ -501,20 +504,20 @@ void grpc_tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets, GPR_ASSERT(GRPC_LOG_IF_ERROR( "clone_port", clone_port(sp, (unsigned)(pollset_count - 1)))); for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(pollsets[i], sp->emfd); + grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); s->active_ports++; sp = sp->next; } } else { for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(pollsets[i], sp->emfd); + grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); } GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); s->active_ports++; sp = sp->next; } @@ -535,24 +538,25 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server* s, gpr_mu_unlock(&s->mu); } -void grpc_tcp_server_unref(grpc_tcp_server* s) { +void grpc_tcp_server_unref(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s) { if (gpr_unref(&s->refs)) { - grpc_tcp_server_shutdown_listeners(s); + grpc_tcp_server_shutdown_listeners(exec_ctx, s); gpr_mu_lock(&s->mu); - GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting); gpr_mu_unlock(&s->mu); - tcp_server_destroy(s); + tcp_server_destroy(exec_ctx, s); } } -void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s) { +void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx* exec_ctx, + grpc_tcp_server* s) { gpr_mu_lock(&s->mu); s->shutdown_listeners = true; /* shutdown all fd's */ if (s->active_ports) { grpc_tcp_listener* sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(sp->emfd, + grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown")); } } |