diff options
author | 2017-12-06 09:47:54 -0800 | |
---|---|---|
committer | 2017-12-06 09:47:54 -0800 | |
commit | 8cf1470a51ea276ca84825e7495d4ee24743540d (patch) | |
tree | 72385cc865094115bc08cb813201d48cb09840bb /src/core/lib/iomgr/udp_server.cc | |
parent | 1d4e99508409be052bd129ba507bae1fbe7eb7fa (diff) |
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'src/core/lib/iomgr/udp_server.cc')
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 83 |
1 files changed, 37 insertions, 46 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 7b7d6946b1..55e0b165ec 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -150,31 +150,30 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { return s; } -static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args, - grpc_error* error) { +static void shutdown_fd(void* args, grpc_error* error) { struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args; grpc_udp_listener* sp = shutdown_args->sp; gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd); gpr_mu_lock(shutdown_args->server_mu); - grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error)); + grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error)); sp->already_shutdown = true; if (!sp->notify_on_write_armed) { // Re-arm write notification to notify listener with error. This is // necessary to decrement active_ports. sp->notify_on_write_armed = true; - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); } gpr_mu_unlock(shutdown_args->server_mu); gpr_free(shutdown_args); } -static void dummy_cb(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +static void dummy_cb(void* arg, grpc_error* error) { // No-op. } -static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { +static void finish_shutdown(grpc_udp_server* s) { if (s->shutdown_complete != nullptr) { - GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); @@ -193,14 +192,13 @@ static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { gpr_free(s); } -static void destroyed_port(grpc_exec_ctx* exec_ctx, void* server, - grpc_error* error) { +static void destroyed_port(void* server, grpc_error* error) { grpc_udp_server* s = (grpc_udp_server*)server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(exec_ctx, s); + finish_shutdown(s); } else { gpr_mu_unlock(&s->mu); } @@ -209,7 +207,7 @@ static void destroyed_port(grpc_exec_ctx* exec_ctx, void* server, /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { +static void deactivated_all_ports(grpc_udp_server* s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -230,21 +228,19 @@ static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->orphan_cb); gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd); - sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, - sp->server->user_data); + sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); } - grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, nullptr, + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, false /* already_closed */, "udp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(exec_ctx, s); + finish_shutdown(s); } } -void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, - grpc_closure* on_done) { +void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { grpc_udp_listener* sp; gpr_mu_lock(&s->mu); @@ -264,14 +260,13 @@ void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, grpc_schedule_on_exec_ctx); - sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, - sp->server->user_data); + sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); sp->orphan_notified = true; } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(exec_ctx, s); + deactivated_all_ports(s); } } @@ -350,7 +345,7 @@ error: return -1; } -static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +static void do_read(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE); /* TODO: the reason we hold server->mu here is merely to prevent fd @@ -358,29 +353,28 @@ static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { * read lock if available. */ gpr_mu_lock(&sp->server->mu); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && - sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) { + if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can * get another chance to read. Or fd already shutdown, re-arm to get a * notification with shutdown error. */ - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); } gpr_mu_unlock(&sp->server->mu); } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +static void on_read(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(exec_ctx, sp->server); + deactivated_all_ports(sp->server); } else { gpr_mu_unlock(&sp->server->mu); } @@ -389,59 +383,57 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { /* Read once. If there is more data to read, off load the work to another * thread to finish. */ GPR_ASSERT(sp->read_cb); - if (sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) { + if (sp->read_cb(sp->emfd, sp->server->user_data)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can * get another chance to read. Or fd already shutdown, re-arm to get a * notification with shutdown error. */ - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); } gpr_mu_unlock(&sp->server->mu); } // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface. -void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); gpr_mu_lock(&sp->server->mu); if (!sp->notify_on_write_armed) { - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); sp->notify_on_write_armed = true; } gpr_mu_unlock(&sp->server->mu); } -static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +static void do_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); gpr_mu_lock(&(sp->server->mu)); if (sp->already_shutdown) { // If fd has been shutdown, don't write any more and re-arm notification. - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); } else { sp->notify_on_write_armed = false; /* Tell the registered callback that the socket is writeable. */ GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE); GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper, arg, grpc_schedule_on_exec_ctx); - sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data, - &sp->notify_on_write_closure); + sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure); } gpr_mu_unlock(&sp->server->mu); } -static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +static void on_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; gpr_mu_lock(&(sp->server->mu)); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(exec_ctx, sp->server); + deactivated_all_ports(sp->server); } else { gpr_mu_unlock(&sp->server->mu); } @@ -452,7 +444,7 @@ static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg, grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE); gpr_mu_unlock(&sp->server->mu); } @@ -593,9 +585,8 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { return sp->fd; } -void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, - grpc_pollset** pollsets, size_t pollset_count, - void* user_data) { +void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, + size_t pollset_count, void* user_data) { size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener* sp; @@ -606,16 +597,16 @@ void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, sp = s->head; while (sp != nullptr) { for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); + grpc_pollset_add_fd(pollsets[i], sp->emfd); } GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, grpc_schedule_on_exec_ctx); sp->notify_on_write_armed = true; - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); /* Registered for both read and write callbacks: increment active_ports * twice to account for this, and delay free-ing of memory until both |