diff options
Diffstat (limited to 'src/core/lib/iomgr/udp_server.cc')
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 110 |
1 files changed, 96 insertions, 14 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 68ab9355ca..7b7d6946b1 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -47,6 +47,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -71,14 +72,22 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + // To be scheduled on another thread to actually read/write. + grpc_closure do_read_closure; + grpc_closure do_write_closure; + grpc_closure notify_on_write_closure; // True if orphan_cb is trigered. bool orphan_notified; + // True if grpc_fd_notify_on_write() is called after on_write() call. + bool notify_on_write_armed; + // True if fd has been shutdown. + bool already_shutdown; struct grpc_udp_listener* next; }; struct shutdown_fd_args { - grpc_fd* fd; + grpc_udp_listener* sp; gpr_mu* server_mu; }; @@ -144,8 +153,17 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args, grpc_error* error) { struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args; + grpc_udp_listener* sp = shutdown_args->sp; + gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd); gpr_mu_lock(shutdown_args->server_mu); - grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error)); + grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error)); + sp->already_shutdown = true; + if (!sp->notify_on_write_armed) { + // Re-arm write notification to notify listener with error. This is + // necessary to decrement active_ports. + sp->notify_on_write_armed = true; + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + } gpr_mu_unlock(shutdown_args->server_mu); gpr_free(shutdown_args); } @@ -161,6 +179,7 @@ static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { gpr_mu_destroy(&s->mu); + gpr_log(GPR_DEBUG, "Destroy all listeners."); while (s->head) { grpc_udp_listener* sp = s->head; s->head = sp->next; @@ -207,9 +226,10 @@ static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { /* Call the orphan_cb to signal that the FD is about to be closed and * should no longer be used. Because at this point, all listening ports * have been shutdown already, no need to shutdown again.*/ - GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd, + GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp, grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->orphan_cb); + gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd); sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); } @@ -233,13 +253,14 @@ void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, s->shutdown_complete = on_done; + gpr_log(GPR_DEBUG, "start to destroy udp_server"); /* shutdown all fd's */ if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); struct shutdown_fd_args* args = (struct shutdown_fd_args*)gpr_malloc(sizeof(*args)); - args->fd = sp->emfd; + args->sp = sp; args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, grpc_schedule_on_exec_ctx); @@ -329,6 +350,28 @@ error: return -1; } +static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE); + /* TODO: the reason we hold server->mu here is merely to prevent fd + * shutdown while we are reading. However, it blocks do_write(). Switch to + * read lock if available. */ + gpr_mu_lock(&sp->server->mu); + /* Tell the registered callback that data is available to read. */ + if (!sp->already_shutdown && + sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) { + /* There maybe more packets to read. Schedule read_more_cb_ closure to run + * after finishing this event loop. */ + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE); + } else { + /* Finish reading all the packets, re-arm the notification event so we can + * get another chance to read. Or fd already shutdown, re-arm to get a + * notification with shutdown error. */ + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + } + gpr_mu_unlock(&sp->server->mu); +} + /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; @@ -343,13 +386,51 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } return; } - - /* Tell the registered callback that data is available to read. */ + /* Read once. If there is more data to read, off load the work to another + * thread to finish. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data); + if (sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) { + /* There maybe more packets to read. Schedule read_more_cb_ closure to run + * after finishing this event loop. */ + GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE); + } else { + /* Finish reading all the packets, re-arm the notification event so we can + * get another chance to read. Or fd already shutdown, re-arm to get a + * notification with shutdown error. */ + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + } + gpr_mu_unlock(&sp->server->mu); +} + +// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface. +void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + gpr_mu_lock(&sp->server->mu); + if (!sp->notify_on_write_armed) { + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + sp->notify_on_write_armed = true; + } + gpr_mu_unlock(&sp->server->mu); +} - /* Re-arm the notification event so we get another chance to read. */ - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); +static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + gpr_mu_lock(&(sp->server->mu)); + if (sp->already_shutdown) { + // If fd has been shutdown, don't write any more and re-arm notification. + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + } else { + sp->notify_on_write_armed = false; + /* Tell the registered callback that the socket is writeable. */ + GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper, + arg, grpc_schedule_on_exec_ctx); + sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data, + &sp->notify_on_write_closure); + } gpr_mu_unlock(&sp->server->mu); } @@ -367,12 +448,11 @@ static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { return; } - /* Tell the registered callback that the socket is writeable. */ - GPR_ASSERT(sp->write_cb); - sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data); + /* Schedule actual write in another thread. */ + GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - /* Re-arm the notification event so we get another chance to write. */ - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE); gpr_mu_unlock(&sp->server->mu); } @@ -409,6 +489,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; sp->orphan_notified = false; + sp->already_shutdown = false; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(name); @@ -533,6 +614,7 @@ void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, grpc_schedule_on_exec_ctx); + sp->notify_on_write_armed = true; grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); /* Registered for both read and write callbacks: increment active_ports |