aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Dan Zhang <danzh@google.com>2017-12-20 15:07:38 -0500
committerGravatar Dan Zhang <danzh@google.com>2017-12-20 15:07:38 -0500
commit0d18814106f19197e80366ee147b4c1565fadf96 (patch)
tree01d301d2d43417ee07c6d2668bea42b1a6ffc347 /src/core
parent9d77be167a1c2bc8ccbe73b7aa0fda207b32b999 (diff)
Add a start_cb to grpc_udp_listener to be called when listener is
created.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/iomgr/udp_server.cc20
-rw-r--r--src/core/lib/iomgr/udp_server.h6
2 files changed, 18 insertions, 8 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 55e0b165ec..4a97f3353d 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -72,6 +72,7 @@ struct grpc_udp_listener {
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ grpc_udp_server_start_cb start_cb;
// To be scheduled on another thread to actually read/write.
grpc_closure do_read_closure;
grpc_closure do_write_closure;
@@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) {
* read lock if available. */
gpr_mu_lock(&sp->server->mu);
/* Tell the registered callback that data is available to read. */
- if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (!sp->already_shutdown && sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
@@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) {
/* Read once. If there is more data to read, off load the work to another
* thread to finish. */
GPR_ASSERT(sp->read_cb);
- if (sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
@@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
static void do_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (sp->already_shutdown) {
// If fd has been shutdown, don't write any more and re-arm notification.
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
@@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) {
static void on_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
@@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) {
static int add_socket_to_server(grpc_udp_server* s, int fd,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
+ sp->start_cb = start_cb;
sp->orphan_notified = false;
sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
@@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
- allocated_port1 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb,
+ write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr4_copy;
}
allocated_port2 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb);
done:
gpr_free(allocated_addr);
@@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
size_t pollset_count, void* user_data) {
+ gpr_log(GPR_DEBUG, "grpc_udp_server_start");
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener* sp;
@@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
sp = s->head;
while (sp != nullptr) {
+ sp->start_cb(sp->emfd, sp->server->user_data);
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
}
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 02e3acb7f5..a469ab9be5 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -30,9 +30,12 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
+/* Called when grpc server starts to listening on the grpc_fd. */
+typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data);
+
/* Called when data is available to read from the socket.
* Return true if there is more data to read from fd. */
-typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data);
+typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd);
/* Called when the socket is writeable. The given closure should be scheduled
* when the socket becomes blocked next time. */
@@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb);