aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_server_posix.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:05:05 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:05:05 -0800
commitad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/iomgr/tcp_server_posix.cc
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/iomgr/tcp_server_posix.cc')
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc64
1 files changed, 34 insertions, 30 deletions
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 99e1c6cd06..6fed13c6c7 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -68,7 +68,8 @@ static void init(void) {
#endif
}
-grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete,
+grpc_error* grpc_tcp_server_create(grpc_exec_ctx* exec_ctx,
+ grpc_closure* shutdown_complete,
const grpc_channel_args* args,
grpc_tcp_server** server) {
gpr_once_init(&check_init, init);
@@ -115,12 +116,12 @@ grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete,
return GRPC_ERROR_NONE;
}
-static void finish_shutdown(grpc_tcp_server* s) {
+static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != nullptr) {
- GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -130,18 +131,19 @@ static void finish_shutdown(grpc_tcp_server* s) {
s->head = sp->next;
gpr_free(sp);
}
- grpc_channel_args_destroy(s->channel_args);
+ grpc_channel_args_destroy(exec_ctx, s->channel_args);
gpr_free(s);
}
-static void destroyed_port(void* server, grpc_error* error) {
+static void destroyed_port(grpc_exec_ctx* exec_ctx, void* server,
+ grpc_error* error) {
grpc_tcp_server* s = (grpc_tcp_server*)server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
if (s->destroyed_ports == s->nports) {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ finish_shutdown(exec_ctx, s);
} else {
GPR_ASSERT(s->destroyed_ports < s->nports);
gpr_mu_unlock(&s->mu);
@@ -151,7 +153,7 @@ static void destroyed_port(void* server, grpc_error* error) {
/* called when all listening endpoints have been shutdown, so no further
events will be received on them - at this point it's safe to destroy
things */
-static void deactivated_all_ports(grpc_tcp_server* s) {
+static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s) {
/* delete ALL the things */
gpr_mu_lock(&s->mu);
@@ -163,17 +165,17 @@ static void deactivated_all_ports(grpc_tcp_server* s) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
- grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
+ grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, nullptr,
false /* already_closed */, "tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ finish_shutdown(exec_ctx, s);
}
}
-static void tcp_server_destroy(grpc_tcp_server* s) {
+static void tcp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
@@ -184,17 +186,18 @@ static void tcp_server_destroy(grpc_tcp_server* s) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(
- sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed"));
+ exec_ctx, sp->emfd,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- deactivated_all_ports(s);
+ deactivated_all_ports(exec_ctx, s);
}
}
/* event manager callback when reads are ready */
-static void on_read(void* arg, grpc_error* err) {
+static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* err) {
grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
grpc_pollset* read_notifier_pollset;
if (err != GRPC_ERROR_NONE) {
@@ -220,7 +223,7 @@ static void on_read(void* arg, grpc_error* err) {
case EINTR:
continue;
case EAGAIN:
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
return;
default:
gpr_mu_lock(&sp->server->mu);
@@ -246,7 +249,7 @@ static void on_read(void* arg, grpc_error* err) {
grpc_fd* fdobj = grpc_fd_create(fd, name);
- grpc_pollset_add_fd(read_notifier_pollset, fdobj);
+ grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
// Create acceptor.
grpc_tcp_server_acceptor* acceptor =
@@ -256,8 +259,8 @@ static void on_read(void* arg, grpc_error* err) {
acceptor->fd_index = sp->fd_index;
sp->server->on_accept_cb(
- sp->server->on_accept_cb_arg,
- grpc_tcp_create(fdobj, sp->server->channel_args, addr_str),
+ exec_ctx, sp->server->on_accept_cb_arg,
+ grpc_tcp_create(exec_ctx, fdobj, sp->server->channel_args, addr_str),
read_notifier_pollset, acceptor);
gpr_free(name);
@@ -270,7 +273,7 @@ error:
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
- deactivated_all_ports(sp->server);
+ deactivated_all_ports(exec_ctx, sp->server);
} else {
gpr_mu_unlock(&sp->server->mu);
}
@@ -480,8 +483,8 @@ int grpc_tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
return -1;
}
-void grpc_tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
- size_t pollset_count,
+void grpc_tcp_server_start(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s,
+ grpc_pollset** pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb,
void* on_accept_cb_arg) {
size_t i;
@@ -501,20 +504,20 @@ void grpc_tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
- grpc_pollset_add_fd(pollsets[i], sp->emfd);
+ grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
}
} else {
for (i = 0; i < pollset_count; i++) {
- grpc_pollset_add_fd(pollsets[i], sp->emfd);
+ grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
}
@@ -535,24 +538,25 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server* s,
gpr_mu_unlock(&s->mu);
}
-void grpc_tcp_server_unref(grpc_tcp_server* s) {
+void grpc_tcp_server_unref(grpc_exec_ctx* exec_ctx, grpc_tcp_server* s) {
if (gpr_unref(&s->refs)) {
- grpc_tcp_server_shutdown_listeners(s);
+ grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
- tcp_server_destroy(s);
+ tcp_server_destroy(exec_ctx, s);
}
}
-void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s) {
+void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx* exec_ctx,
+ grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
s->shutdown_listeners = true;
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
- grpc_fd_shutdown(sp->emfd,
+ grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"));
}
}