diff options
author | Dan Zhang <danzh@google.com> | 2018-02-27 18:27:10 -0500 |
---|---|---|
committer | Dan Zhang <danzh@google.com> | 2018-02-27 18:27:10 -0500 |
commit | b293e9e8224d1ebe03cc7ef28782c4148542bff0 (patch) | |
tree | c0ee27b95b43b14d883ce0210a72a7a51f4c6573 /test | |
parent | 0fc97adc9ee41d517ee49ec8e3a8338b793fba7e (diff) |
Refactors grpc udp_server_listener to be object oriented. Also adds a mutex to each listener. Instead of sharing the mutex in udp_server for all listeners, this per-listener mutex can make most of the call to different listeners in parallel.
Diffstat (limited to 'test')
-rw-r--r-- | test/core/iomgr/udp_server_test.cc | 108 |
1 files changed, 67 insertions, 41 deletions
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 13cbf2f6df..50e79a08b0 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -36,6 +36,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/socket_factory_posix.h" @@ -54,42 +55,71 @@ static int g_number_of_starts = 0; int rcv_buf_size = 1024; int snd_buf_size = 1024; -static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; } +class TestGrpcUdpHandler : public GrpcUdpHandler { + public: + TestGrpcUdpHandler(grpc_fd* emfd, void* user_data) + : GrpcUdpHandler(emfd, user_data), emfd_(emfd) { + g_number_of_starts++; + } + ~TestGrpcUdpHandler() override {} -static bool on_read(grpc_fd* emfd) { - char read_buffer[512]; - ssize_t byte_count; + protected: + bool Read() override { + char read_buffer[512]; + ssize_t byte_count; - gpr_mu_lock(g_mu); - byte_count = - recv(grpc_fd_wrapped_fd(emfd), read_buffer, sizeof(read_buffer), 0); + gpr_mu_lock(g_mu); + byte_count = + recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0); - g_number_of_reads++; - g_number_of_bytes_read += static_cast<int>(byte_count); + g_number_of_reads++; + g_number_of_bytes_read += static_cast<int>(byte_count); - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); - gpr_mu_unlock(g_mu); - return false; -} + GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(g_pollset, nullptr))); + gpr_mu_unlock(g_mu); + return false; + } -static void on_write(grpc_fd* emfd, void* user_data, - grpc_closure* notify_on_write_closure) { - gpr_mu_lock(g_mu); - g_number_of_writes++; + void OnCanWrite(void* user_data, + grpc_closure* notify_on_write_closure) override { + gpr_mu_lock(g_mu); + g_number_of_writes++; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); - gpr_mu_unlock(g_mu); -} + GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(g_pollset, nullptr))); + gpr_mu_unlock(g_mu); + } -static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure, - void* user_data) { - gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", - grpc_fd_wrapped_fd(emfd)); - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); - g_number_of_orphan_calls++; -} + void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure, + void* user_data) override { + gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", + grpc_fd_wrapped_fd(emfd())); + GRPC_CLOSURE_SCHED(orphan_fd_closure, GRPC_ERROR_NONE); + g_number_of_orphan_calls++; + } + + grpc_fd* emfd() { return emfd_; } + + private: + grpc_fd* emfd_; +}; + +class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory { + public: + GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override { + gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd)); + return grpc_core::New<TestGrpcUdpHandler>(emfd, user_data); + } + + void DestroyUdpHandler(GrpcUdpHandler* handler) override { + gpr_log(GPR_INFO, "Destroy handler"); + grpc_core::Delete(reinterpret_cast<TestGrpcUdpHandler*>(handler)); + + } +}; + +TestGrpcUdpHandlerFactory handler_factory; struct test_socket_factory { grpc_socket_factory base; @@ -184,13 +214,12 @@ static void test_no_op_with_port(void) { resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, on_start, on_read, on_write, - on_fd_orphaned)); + snd_buf_size, &handler_factory)); grpc_udp_server_destroy(s, nullptr); - /* The server had a single FD, which should have been orphaned. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + /* The server haven't start listening, so no udp handler to be notified. */ + GPR_ASSERT(g_number_of_orphan_calls == 0); shutdown_and_destroy_pollset(); } @@ -216,8 +245,7 @@ static void test_no_op_with_port_and_socket_factory(void) { resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, on_start, on_read, on_write, - on_fd_orphaned)); + snd_buf_size, &handler_factory)); GPR_ASSERT(socket_factory->number_of_socket_calls == 1); GPR_ASSERT(socket_factory->number_of_bind_calls == 1); @@ -225,8 +253,8 @@ static void test_no_op_with_port_and_socket_factory(void) { grpc_socket_factory_unref(&socket_factory->base); - /* The server had a single FD, which should have been orphaned. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + /* The server haven't start listening, so no udp handler to be notified. */ + GPR_ASSERT(g_number_of_orphan_calls == 0); shutdown_and_destroy_pollset(); } @@ -244,8 +272,7 @@ static void test_no_op_with_port_and_start(void) { resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, on_start, on_read, on_write, - on_fd_orphaned)); + snd_buf_size, &handler_factory)); grpc_udp_server_start(s, nullptr, 0, nullptr); GPR_ASSERT(g_number_of_starts == 1); @@ -278,8 +305,7 @@ static void test_receive(int number_of_clients) { resolved_addr.len = sizeof(struct sockaddr_storage); addr->ss_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, on_start, on_read, on_write, - on_fd_orphaned)); + snd_buf_size, &handler_factory)); svrfd = grpc_udp_server_get_fd(s, 0); GPR_ASSERT(svrfd >= 0); |