aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2017-12-22 14:16:41 -0800
committerGravatar GitHub <noreply@github.com>2017-12-22 14:16:41 -0800
commit9aaa284fcb067fb5c07742b6f34de0878f079ecf (patch)
treea33c605253c3c9d6c2c50be25f7dd8a16172a0ad
parente33d0272fd5b16521848535a98b6dbc8bd83233a (diff)
parent0c2fc922a62eedf02e0e3ad070685b8265347ec7 (diff)
Merge pull request #13844 from danzh2010/startcb
Change grpc_udp_server interface
-rw-r--r--src/core/lib/iomgr/udp_server.cc20
-rw-r--r--src/core/lib/iomgr/udp_server.h6
-rw-r--r--test/core/iomgr/udp_server_test.cc60
3 files changed, 58 insertions, 28 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 55e0b165ec..4a97f3353d 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -72,6 +72,7 @@ struct grpc_udp_listener {
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ grpc_udp_server_start_cb start_cb;
// To be scheduled on another thread to actually read/write.
grpc_closure do_read_closure;
grpc_closure do_write_closure;
@@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) {
* read lock if available. */
gpr_mu_lock(&sp->server->mu);
/* Tell the registered callback that data is available to read. */
- if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (!sp->already_shutdown && sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
@@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) {
/* Read once. If there is more data to read, off load the work to another
* thread to finish. */
GPR_ASSERT(sp->read_cb);
- if (sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
@@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
static void do_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (sp->already_shutdown) {
// If fd has been shutdown, don't write any more and re-arm notification.
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
@@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) {
static void on_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
@@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) {
static int add_socket_to_server(grpc_udp_server* s, int fd,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
+ sp->start_cb = start_cb;
sp->orphan_notified = false;
sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
@@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
- allocated_port1 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb,
+ write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr4_copy;
}
allocated_port2 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb);
done:
gpr_free(allocated_addr);
@@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
size_t pollset_count, void* user_data) {
+ gpr_log(GPR_DEBUG, "grpc_udp_server_start");
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener* sp;
@@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
sp = s->head;
while (sp != nullptr) {
+ sp->start_cb(sp->emfd, sp->server->user_data);
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
}
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 02e3acb7f5..a469ab9be5 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -30,9 +30,12 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
+/* Called when grpc server starts to listening on the grpc_fd. */
+typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data);
+
/* Called when data is available to read from the socket.
* Return true if there is more data to read from fd. */
-typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data);
+typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd);
/* Called when the socket is writeable. The given closure should be scheduled
* when the socket becomes blocked next time. */
@@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb);
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc
index 0deb534abd..dc1248bc1c 100644
--- a/test/core/iomgr/udp_server_test.cc
+++ b/test/core/iomgr/udp_server_test.cc
@@ -49,8 +49,11 @@ static int g_number_of_reads = 0;
static int g_number_of_writes = 0;
static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
+static int g_number_of_starts = 0;
-static bool on_read(grpc_fd* emfd, void* user_data) {
+static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; }
+
+static bool on_read(grpc_fd* emfd) {
char read_buffer[512];
ssize_t byte_count;
@@ -129,21 +132,41 @@ static test_socket_factory* test_socket_factory_create(void) {
return factory;
}
+static void destroy_pollset(void* p, grpc_error* error) {
+ grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+}
+
+static void shutdown_and_destroy_pollset() {
+ gpr_mu_lock(g_mu);
+ auto closure = GRPC_CLOSURE_CREATE(destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(g_pollset, closure);
+ gpr_mu_unlock(g_mu);
+ /* Flush exec_ctx to run |destroyed| */
+ grpc_core::ExecCtx::Get()->Flush();
+}
+
static void test_no_op(void) {
+ grpc_pollset_init(g_pollset, &g_mu);
grpc_core::ExecCtx exec_ctx;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
+ LOG_TEST("test_no_op");
grpc_udp_server_destroy(s, nullptr);
+ shutdown_and_destroy_pollset();
}
static void test_no_op_with_start(void) {
+ grpc_pollset_init(g_pollset, &g_mu);
grpc_core::ExecCtx exec_ctx;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
LOG_TEST("test_no_op_with_start");
grpc_udp_server_start(s, nullptr, 0, nullptr);
grpc_udp_server_destroy(s, nullptr);
+ shutdown_and_destroy_pollset();
}
static void test_no_op_with_port(void) {
+ grpc_pollset_init(g_pollset, &g_mu);
g_number_of_orphan_calls = 0;
grpc_core::ExecCtx exec_ctx;
grpc_resolved_address resolved_addr;
@@ -154,16 +177,18 @@ static void test_no_op_with_port(void) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
- GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
- on_fd_orphaned));
+ GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read,
+ on_write, on_fd_orphaned));
grpc_udp_server_destroy(s, nullptr);
/* The server had a single FD, which should have been orphaned. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
+ shutdown_and_destroy_pollset();
}
static void test_no_op_with_port_and_socket_factory(void) {
+ grpc_pollset_init(g_pollset, &g_mu);
g_number_of_orphan_calls = 0;
grpc_core::ExecCtx exec_ctx;
grpc_resolved_address resolved_addr;
@@ -182,8 +207,8 @@ static void test_no_op_with_port_and_socket_factory(void) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
- GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
- on_fd_orphaned));
+ GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read,
+ on_write, on_fd_orphaned));
GPR_ASSERT(socket_factory->number_of_socket_calls == 1);
GPR_ASSERT(socket_factory->number_of_bind_calls == 1);
@@ -193,9 +218,11 @@ static void test_no_op_with_port_and_socket_factory(void) {
/* The server had a single FD, which should have been orphaned. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
+ shutdown_and_destroy_pollset();
}
static void test_no_op_with_port_and_start(void) {
+ grpc_pollset_init(g_pollset, &g_mu);
g_number_of_orphan_calls = 0;
grpc_core::ExecCtx exec_ctx;
grpc_resolved_address resolved_addr;
@@ -206,19 +233,21 @@ static void test_no_op_with_port_and_start(void) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
- GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
- on_fd_orphaned));
+ GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read,
+ on_write, on_fd_orphaned));
grpc_udp_server_start(s, nullptr, 0, nullptr);
-
+ GPR_ASSERT(g_number_of_starts == 1);
grpc_udp_server_destroy(s, nullptr);
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
+ shutdown_and_destroy_pollset();
}
static void test_receive(int number_of_clients) {
+ grpc_pollset_init(g_pollset, &g_mu);
grpc_core::ExecCtx exec_ctx;
grpc_resolved_address resolved_addr;
struct sockaddr_storage* addr = (struct sockaddr_storage*)resolved_addr.addr;
@@ -236,8 +265,8 @@ static void test_receive(int number_of_clients) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_storage);
addr->ss_family = AF_INET;
- GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
- on_fd_orphaned));
+ GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read,
+ on_write, on_fd_orphaned));
svrfd = grpc_udp_server_get_fd(s, 0);
GPR_ASSERT(svrfd >= 0);
@@ -281,20 +310,15 @@ static void test_receive(int number_of_clients) {
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
-}
-
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+ shutdown_and_destroy_pollset();
}
int main(int argc, char** argv) {
- grpc_closure destroyed;
grpc_test_init(argc, argv);
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(g_pollset, &g_mu);
test_no_op();
test_no_op_with_start();
@@ -304,10 +328,6 @@ int main(int argc, char** argv) {
test_receive(1);
test_receive(10);
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
- grpc_core::ExecCtx::Get()->Flush();
gpr_free(g_pollset);
}
grpc_shutdown();