aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/udp_server_test.cc
diff options
context:
space:
mode:
authorGravatar Dan Zhang <danzh@google.com>2018-02-27 18:27:10 -0500
committerGravatar Dan Zhang <danzh@google.com>2018-02-27 18:27:10 -0500
commitb293e9e8224d1ebe03cc7ef28782c4148542bff0 (patch)
treec0ee27b95b43b14d883ce0210a72a7a51f4c6573 /test/core/iomgr/udp_server_test.cc
parent0fc97adc9ee41d517ee49ec8e3a8338b793fba7e (diff)
Refactors grpc udp_server_listener to be object oriented. Also adds a mutex to each listener. Instead of sharing the mutex in udp_server for all listeners, this per-listener mutex can make most of the call to different listeners in parallel.
Diffstat (limited to 'test/core/iomgr/udp_server_test.cc')
-rw-r--r--test/core/iomgr/udp_server_test.cc108
1 files changed, 67 insertions, 41 deletions
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc
index 13cbf2f6df..50e79a08b0 100644
--- a/test/core/iomgr/udp_server_test.cc
+++ b/test/core/iomgr/udp_server_test.cc
@@ -36,6 +36,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/socket_factory_posix.h"
@@ -54,42 +55,71 @@ static int g_number_of_starts = 0;
int rcv_buf_size = 1024;
int snd_buf_size = 1024;
-static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; }
+class TestGrpcUdpHandler : public GrpcUdpHandler {
+ public:
+ TestGrpcUdpHandler(grpc_fd* emfd, void* user_data)
+ : GrpcUdpHandler(emfd, user_data), emfd_(emfd) {
+ g_number_of_starts++;
+ }
+ ~TestGrpcUdpHandler() override {}
-static bool on_read(grpc_fd* emfd) {
- char read_buffer[512];
- ssize_t byte_count;
+ protected:
+ bool Read() override {
+ char read_buffer[512];
+ ssize_t byte_count;
- gpr_mu_lock(g_mu);
- byte_count =
- recv(grpc_fd_wrapped_fd(emfd), read_buffer, sizeof(read_buffer), 0);
+ gpr_mu_lock(g_mu);
+ byte_count =
+ recv(grpc_fd_wrapped_fd(emfd()), read_buffer, sizeof(read_buffer), 0);
- g_number_of_reads++;
- g_number_of_bytes_read += static_cast<int>(byte_count);
+ g_number_of_reads++;
+ g_number_of_bytes_read += static_cast<int>(byte_count);
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
- gpr_mu_unlock(g_mu);
- return false;
-}
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
+ grpc_pollset_kick(g_pollset, nullptr)));
+ gpr_mu_unlock(g_mu);
+ return false;
+ }
-static void on_write(grpc_fd* emfd, void* user_data,
- grpc_closure* notify_on_write_closure) {
- gpr_mu_lock(g_mu);
- g_number_of_writes++;
+ void OnCanWrite(void* user_data,
+ grpc_closure* notify_on_write_closure) override {
+ gpr_mu_lock(g_mu);
+ g_number_of_writes++;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
- gpr_mu_unlock(g_mu);
-}
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
+ grpc_pollset_kick(g_pollset, nullptr)));
+ gpr_mu_unlock(g_mu);
+ }
-static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure,
- void* user_data) {
- gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
- grpc_fd_wrapped_fd(emfd));
- GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
- g_number_of_orphan_calls++;
-}
+ void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure,
+ void* user_data) override {
+ gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
+ grpc_fd_wrapped_fd(emfd()));
+ GRPC_CLOSURE_SCHED(orphan_fd_closure, GRPC_ERROR_NONE);
+ g_number_of_orphan_calls++;
+ }
+
+ grpc_fd* emfd() { return emfd_; }
+
+ private:
+ grpc_fd* emfd_;
+};
+
+class TestGrpcUdpHandlerFactory : public GrpcUdpHandlerFactory {
+ public:
+ GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, void* user_data) override {
+ gpr_log(GPR_INFO, "create udp handler for fd %d", grpc_fd_wrapped_fd(emfd));
+ return grpc_core::New<TestGrpcUdpHandler>(emfd, user_data);
+ }
+
+ void DestroyUdpHandler(GrpcUdpHandler* handler) override {
+ gpr_log(GPR_INFO, "Destroy handler");
+ grpc_core::Delete(reinterpret_cast<TestGrpcUdpHandler*>(handler));
+
+ }
+};
+
+TestGrpcUdpHandlerFactory handler_factory;
struct test_socket_factory {
grpc_socket_factory base;
@@ -184,13 +214,12 @@ static void test_no_op_with_port(void) {
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
- snd_buf_size, on_start, on_read, on_write,
- on_fd_orphaned));
+ snd_buf_size, &handler_factory));
grpc_udp_server_destroy(s, nullptr);
- /* The server had a single FD, which should have been orphaned. */
- GPR_ASSERT(g_number_of_orphan_calls == 1);
+ /* The server haven't start listening, so no udp handler to be notified. */
+ GPR_ASSERT(g_number_of_orphan_calls == 0);
shutdown_and_destroy_pollset();
}
@@ -216,8 +245,7 @@ static void test_no_op_with_port_and_socket_factory(void) {
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
- snd_buf_size, on_start, on_read, on_write,
- on_fd_orphaned));
+ snd_buf_size, &handler_factory));
GPR_ASSERT(socket_factory->number_of_socket_calls == 1);
GPR_ASSERT(socket_factory->number_of_bind_calls == 1);
@@ -225,8 +253,8 @@ static void test_no_op_with_port_and_socket_factory(void) {
grpc_socket_factory_unref(&socket_factory->base);
- /* The server had a single FD, which should have been orphaned. */
- GPR_ASSERT(g_number_of_orphan_calls == 1);
+ /* The server haven't start listening, so no udp handler to be notified. */
+ GPR_ASSERT(g_number_of_orphan_calls == 0);
shutdown_and_destroy_pollset();
}
@@ -244,8 +272,7 @@ static void test_no_op_with_port_and_start(void) {
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
- snd_buf_size, on_start, on_read, on_write,
- on_fd_orphaned));
+ snd_buf_size, &handler_factory));
grpc_udp_server_start(s, nullptr, 0, nullptr);
GPR_ASSERT(g_number_of_starts == 1);
@@ -278,8 +305,7 @@ static void test_receive(int number_of_clients) {
resolved_addr.len = sizeof(struct sockaddr_storage);
addr->ss_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
- snd_buf_size, on_start, on_read, on_write,
- on_fd_orphaned));
+ snd_buf_size, &handler_factory));
svrfd = grpc_udp_server_get_fd(s, 0);
GPR_ASSERT(svrfd >= 0);