diff options
author | Dan Zhang <danzh@google.com> | 2017-12-20 15:07:38 -0500 |
---|---|---|
committer | Dan Zhang <danzh@google.com> | 2017-12-20 15:07:38 -0500 |
commit | 0d18814106f19197e80366ee147b4c1565fadf96 (patch) | |
tree | 01d301d2d43417ee07c6d2668bea42b1a6ffc347 /src/core | |
parent | 9d77be167a1c2bc8ccbe73b7aa0fda207b32b999 (diff) |
Add a start_cb to grpc_udp_listener to be called when listener is
created.
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 20 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 6 |
2 files changed, 18 insertions, 8 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 55e0b165ec..4a97f3353d 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -72,6 +72,7 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + grpc_udp_server_start_cb start_cb; // To be scheduled on another thread to actually read/write. grpc_closure do_read_closure; grpc_closure do_write_closure; @@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) { * read lock if available. */ gpr_mu_lock(&sp->server->mu); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) { + if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); @@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) { /* Read once. If there is more data to read, off load the work to another * thread to finish. */ GPR_ASSERT(sp->read_cb); - if (sp->read_cb(sp->emfd, sp->server->user_data)) { + if (sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, @@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { static void do_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (sp->already_shutdown) { // If fd has been shutdown, don't write any more and re-arm notification. grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); @@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) { static void on_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); @@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->read_cb = read_cb; sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; + sp->start_cb = start_cb; sp->orphan_notified = false; sp->already_shutdown = false; GPR_ASSERT(sp->emfd); @@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb, + write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, size_t pollset_count, void* user_data) { + gpr_log(GPR_DEBUG, "grpc_udp_server_start"); size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener* sp; @@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, sp = s->head; while (sp != nullptr) { + sp->start_cb(sp->emfd, sp->server->user_data); for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(pollsets[i], sp->emfd); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 02e3acb7f5..a469ab9be5 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -30,9 +30,12 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; +/* Called when grpc server starts to listening on the grpc_fd. */ +typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data); + /* Called when data is available to read from the socket. * Return true if there is more data to read from fd. */ -typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data); +typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd); /* Called when the socket is writeable. The given closure should be scheduled * when the socket becomes blocked next time. */ @@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); |