diff options
author | 2017-12-22 14:16:41 -0800 | |
---|---|---|
committer | 2017-12-22 14:16:41 -0800 | |
commit | 9aaa284fcb067fb5c07742b6f34de0878f079ecf (patch) | |
tree | a33c605253c3c9d6c2c50be25f7dd8a16172a0ad | |
parent | e33d0272fd5b16521848535a98b6dbc8bd83233a (diff) | |
parent | 0c2fc922a62eedf02e0e3ad070685b8265347ec7 (diff) |
Merge pull request #13844 from danzh2010/startcb
Change grpc_udp_server interface
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 20 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 6 | ||||
-rw-r--r-- | test/core/iomgr/udp_server_test.cc | 60 |
3 files changed, 58 insertions, 28 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 55e0b165ec..4a97f3353d 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -72,6 +72,7 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + grpc_udp_server_start_cb start_cb; // To be scheduled on another thread to actually read/write. grpc_closure do_read_closure; grpc_closure do_write_closure; @@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) { * read lock if available. */ gpr_mu_lock(&sp->server->mu); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) { + if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); @@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) { /* Read once. If there is more data to read, off load the work to another * thread to finish. */ GPR_ASSERT(sp->read_cb); - if (sp->read_cb(sp->emfd, sp->server->user_data)) { + if (sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, @@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { static void do_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (sp->already_shutdown) { // If fd has been shutdown, don't write any more and re-arm notification. grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); @@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) { static void on_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); @@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->read_cb = read_cb; sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; + sp->start_cb = start_cb; sp->orphan_notified = false; sp->already_shutdown = false; GPR_ASSERT(sp->emfd); @@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb, + write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, size_t pollset_count, void* user_data) { + gpr_log(GPR_DEBUG, "grpc_udp_server_start"); size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener* sp; @@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, sp = s->head; while (sp != nullptr) { + sp->start_cb(sp->emfd, sp->server->user_data); for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(pollsets[i], sp->emfd); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 02e3acb7f5..a469ab9be5 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -30,9 +30,12 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; +/* Called when grpc server starts to listening on the grpc_fd. */ +typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data); + /* Called when data is available to read from the socket. * Return true if there is more data to read from fd. */ -typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data); +typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd); /* Called when the socket is writeable. The given closure should be scheduled * when the socket becomes blocked next time. */ @@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 0deb534abd..dc1248bc1c 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -49,8 +49,11 @@ static int g_number_of_reads = 0; static int g_number_of_writes = 0; static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; +static int g_number_of_starts = 0; -static bool on_read(grpc_fd* emfd, void* user_data) { +static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; } + +static bool on_read(grpc_fd* emfd) { char read_buffer[512]; ssize_t byte_count; @@ -129,21 +132,41 @@ static test_socket_factory* test_socket_factory_create(void) { return factory; } +static void destroy_pollset(void* p, grpc_error* error) { + grpc_pollset_destroy(static_cast<grpc_pollset*>(p)); +} + +static void shutdown_and_destroy_pollset() { + gpr_mu_lock(g_mu); + auto closure = GRPC_CLOSURE_CREATE(destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(g_pollset, closure); + gpr_mu_unlock(g_mu); + /* Flush exec_ctx to run |destroyed| */ + grpc_core::ExecCtx::Get()->Flush(); +} + static void test_no_op(void) { + grpc_pollset_init(g_pollset, &g_mu); grpc_core::ExecCtx exec_ctx; grpc_udp_server* s = grpc_udp_server_create(nullptr); + LOG_TEST("test_no_op"); grpc_udp_server_destroy(s, nullptr); + shutdown_and_destroy_pollset(); } static void test_no_op_with_start(void) { + grpc_pollset_init(g_pollset, &g_mu); grpc_core::ExecCtx exec_ctx; grpc_udp_server* s = grpc_udp_server_create(nullptr); LOG_TEST("test_no_op_with_start"); grpc_udp_server_start(s, nullptr, 0, nullptr); grpc_udp_server_destroy(s, nullptr); + shutdown_and_destroy_pollset(); } static void test_no_op_with_port(void) { + grpc_pollset_init(g_pollset, &g_mu); g_number_of_orphan_calls = 0; grpc_core::ExecCtx exec_ctx; grpc_resolved_address resolved_addr; @@ -154,16 +177,18 @@ static void test_no_op_with_port(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, - on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, + on_write, on_fd_orphaned)); grpc_udp_server_destroy(s, nullptr); /* The server had a single FD, which should have been orphaned. */ GPR_ASSERT(g_number_of_orphan_calls == 1); + shutdown_and_destroy_pollset(); } static void test_no_op_with_port_and_socket_factory(void) { + grpc_pollset_init(g_pollset, &g_mu); g_number_of_orphan_calls = 0; grpc_core::ExecCtx exec_ctx; grpc_resolved_address resolved_addr; @@ -182,8 +207,8 @@ static void test_no_op_with_port_and_socket_factory(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, - on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, + on_write, on_fd_orphaned)); GPR_ASSERT(socket_factory->number_of_socket_calls == 1); GPR_ASSERT(socket_factory->number_of_bind_calls == 1); @@ -193,9 +218,11 @@ static void test_no_op_with_port_and_socket_factory(void) { /* The server had a single FD, which should have been orphaned. */ GPR_ASSERT(g_number_of_orphan_calls == 1); + shutdown_and_destroy_pollset(); } static void test_no_op_with_port_and_start(void) { + grpc_pollset_init(g_pollset, &g_mu); g_number_of_orphan_calls = 0; grpc_core::ExecCtx exec_ctx; grpc_resolved_address resolved_addr; @@ -206,19 +233,21 @@ static void test_no_op_with_port_and_start(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, - on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, + on_write, on_fd_orphaned)); grpc_udp_server_start(s, nullptr, 0, nullptr); - + GPR_ASSERT(g_number_of_starts == 1); grpc_udp_server_destroy(s, nullptr); /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ GPR_ASSERT(g_number_of_orphan_calls == 1); + shutdown_and_destroy_pollset(); } static void test_receive(int number_of_clients) { + grpc_pollset_init(g_pollset, &g_mu); grpc_core::ExecCtx exec_ctx; grpc_resolved_address resolved_addr; struct sockaddr_storage* addr = (struct sockaddr_storage*)resolved_addr.addr; @@ -236,8 +265,8 @@ static void test_receive(int number_of_clients) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_storage); addr->ss_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, - on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, + on_write, on_fd_orphaned)); svrfd = grpc_udp_server_get_fd(s, 0); GPR_ASSERT(svrfd >= 0); @@ -281,20 +310,15 @@ static void test_receive(int number_of_clients) { /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ GPR_ASSERT(g_number_of_orphan_calls == 1); -} - -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast<grpc_pollset*>(p)); + shutdown_and_destroy_pollset(); } int main(int argc, char** argv) { - grpc_closure destroyed; grpc_test_init(argc, argv); grpc_init(); { grpc_core::ExecCtx exec_ctx; g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(g_pollset, &g_mu); test_no_op(); test_no_op_with_start(); @@ -304,10 +328,6 @@ int main(int argc, char** argv) { test_receive(1); test_receive(10); - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - grpc_core::ExecCtx::Get()->Flush(); gpr_free(g_pollset); } grpc_shutdown(); |