aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/udp_server.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:47:54 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:47:54 -0800
commit8cf1470a51ea276ca84825e7495d4ee24743540d (patch)
tree72385cc865094115bc08cb813201d48cb09840bb /src/core/lib/iomgr/udp_server.cc
parent1d4e99508409be052bd129ba507bae1fbe7eb7fa (diff)
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'src/core/lib/iomgr/udp_server.cc')
-rw-r--r--src/core/lib/iomgr/udp_server.cc83
1 files changed, 37 insertions, 46 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 7b7d6946b1..55e0b165ec 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -150,31 +150,30 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
return s;
}
-static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args,
- grpc_error* error) {
+static void shutdown_fd(void* args, grpc_error* error) {
struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args;
grpc_udp_listener* sp = shutdown_args->sp;
gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd);
gpr_mu_lock(shutdown_args->server_mu);
- grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error));
+ grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error));
sp->already_shutdown = true;
if (!sp->notify_on_write_armed) {
// Re-arm write notification to notify listener with error. This is
// necessary to decrement active_ports.
sp->notify_on_write_armed = true;
- grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
}
gpr_mu_unlock(shutdown_args->server_mu);
gpr_free(shutdown_args);
}
-static void dummy_cb(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+static void dummy_cb(void* arg, grpc_error* error) {
// No-op.
}
-static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
+static void finish_shutdown(grpc_udp_server* s) {
if (s->shutdown_complete != nullptr) {
- GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -193,14 +192,13 @@ static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
gpr_free(s);
}
-static void destroyed_port(grpc_exec_ctx* exec_ctx, void* server,
- grpc_error* error) {
+static void destroyed_port(void* server, grpc_error* error) {
grpc_udp_server* s = (grpc_udp_server*)server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
if (s->destroyed_ports == s->nports) {
gpr_mu_unlock(&s->mu);
- finish_shutdown(exec_ctx, s);
+ finish_shutdown(s);
} else {
gpr_mu_unlock(&s->mu);
}
@@ -209,7 +207,7 @@ static void destroyed_port(grpc_exec_ctx* exec_ctx, void* server,
/* called when all listening endpoints have been shutdown, so no further
events will be received on them - at this point it's safe to destroy
things */
-static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
+static void deactivated_all_ports(grpc_udp_server* s) {
/* delete ALL the things */
gpr_mu_lock(&s->mu);
@@ -230,21 +228,19 @@ static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb);
gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd);
- sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
- sp->server->user_data);
+ sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data);
}
- grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, nullptr,
+ grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
false /* already_closed */, "udp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- finish_shutdown(exec_ctx, s);
+ finish_shutdown(s);
}
}
-void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
- grpc_closure* on_done) {
+void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
grpc_udp_listener* sp;
gpr_mu_lock(&s->mu);
@@ -264,14 +260,13 @@ void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
args->server_mu = &s->mu;
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
grpc_schedule_on_exec_ctx);
- sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
- sp->server->user_data);
+ sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data);
sp->orphan_notified = true;
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- deactivated_all_ports(exec_ctx, s);
+ deactivated_all_ports(s);
}
}
@@ -350,7 +345,7 @@ error:
return -1;
}
-static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+static void do_read(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE);
/* TODO: the reason we hold server->mu here is merely to prevent fd
@@ -358,29 +353,28 @@ static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
* read lock if available. */
gpr_mu_lock(&sp->server->mu);
/* Tell the registered callback that data is available to read. */
- if (!sp->already_shutdown &&
- sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
+ if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
- GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
* get another chance to read. Or fd already shutdown, re-arm to get a
* notification with shutdown error. */
- grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
}
gpr_mu_unlock(&sp->server->mu);
}
/* event manager callback when reads are ready */
-static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+static void on_read(void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
- deactivated_all_ports(exec_ctx, sp->server);
+ deactivated_all_ports(sp->server);
} else {
gpr_mu_unlock(&sp->server->mu);
}
@@ -389,59 +383,57 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
/* Read once. If there is more data to read, off load the work to another
* thread to finish. */
GPR_ASSERT(sp->read_cb);
- if (sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
+ if (sp->read_cb(sp->emfd, sp->server->user_data)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
- GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
* get another chance to read. Or fd already shutdown, re-arm to get a
* notification with shutdown error. */
- grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
}
gpr_mu_unlock(&sp->server->mu);
}
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
-void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
gpr_mu_lock(&sp->server->mu);
if (!sp->notify_on_write_armed) {
- grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
sp->notify_on_write_armed = true;
}
gpr_mu_unlock(&sp->server->mu);
}
-static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+static void do_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
gpr_mu_lock(&(sp->server->mu));
if (sp->already_shutdown) {
// If fd has been shutdown, don't write any more and re-arm notification.
- grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
} else {
sp->notify_on_write_armed = false;
/* Tell the registered callback that the socket is writeable. */
GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper,
arg, grpc_schedule_on_exec_ctx);
- sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data,
- &sp->notify_on_write_closure);
+ sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure);
}
gpr_mu_unlock(&sp->server->mu);
}
-static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+static void on_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
gpr_mu_lock(&(sp->server->mu));
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
- deactivated_all_ports(exec_ctx, sp->server);
+ deactivated_all_ports(sp->server);
} else {
gpr_mu_unlock(&sp->server->mu);
}
@@ -452,7 +444,7 @@ static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
- GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&sp->server->mu);
}
@@ -593,9 +585,8 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
return sp->fd;
}
-void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
- grpc_pollset** pollsets, size_t pollset_count,
- void* user_data) {
+void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
+ size_t pollset_count, void* user_data) {
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener* sp;
@@ -606,16 +597,16 @@ void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
sp = s->head;
while (sp != nullptr) {
for (i = 0; i < pollset_count; i++) {
- grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+ grpc_pollset_add_fd(pollsets[i], sp->emfd);
}
GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
grpc_schedule_on_exec_ctx);
sp->notify_on_write_armed = true;
- grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+ grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
/* Registered for both read and write callbacks: increment active_ports
* twice to account for this, and delay free-ing of memory until both