aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/iomgr/pollset_kick_posix.c30
-rw-r--r--test/core/iomgr/poll_kick_test.c30
2 files changed, 49 insertions, 11 deletions
diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c
index d16e49e459..9f85b6137a 100644
--- a/src/core/iomgr/pollset_kick_posix.c
+++ b/src/core/iomgr/pollset_kick_posix.c
@@ -43,6 +43,9 @@
/* This implementation is based on a freelist of pipes. */
+#define GRPC_MAX_CACHED_PIPES 50
+#define GRPC_PIPE_LOW_WATERMARK 25
+
typedef struct grpc_kick_pipe_info {
int pipe_read_fd;
int pipe_write_fd;
@@ -50,14 +53,16 @@ typedef struct grpc_kick_pipe_info {
} grpc_kick_pipe_info;
static grpc_kick_pipe_info *pipe_freelist = NULL;
+static int pipe_freelist_count = 0;
static gpr_mu pipe_freelist_mu;
-static grpc_kick_pipe_info *allocate_pipe() {
+static grpc_kick_pipe_info *allocate_pipe(void) {
grpc_kick_pipe_info *info;
gpr_mu_lock(&pipe_freelist_mu);
if (pipe_freelist != NULL) {
info = pipe_freelist;
pipe_freelist = pipe_freelist->next;
+ --pipe_freelist_count;
} else {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
@@ -73,11 +78,26 @@ static grpc_kick_pipe_info *allocate_pipe() {
return info;
}
+static void destroy_pipe(void) {
+ /* assumes pipe_freelist_mu is held */
+ grpc_kick_pipe_info *current = pipe_freelist;
+ pipe_freelist = pipe_freelist->next;
+ pipe_freelist_count--;
+ close(current->pipe_read_fd);
+ close(current->pipe_write_fd);
+ gpr_free(current);
+}
+
static void free_pipe(grpc_kick_pipe_info *pipe_info) {
- /* TODO(klempner): Start closing pipes if the free list gets too large */
gpr_mu_lock(&pipe_freelist_mu);
pipe_info->next = pipe_freelist;
pipe_freelist = pipe_info;
+ pipe_freelist_count++;
+ if (pipe_freelist_count > GRPC_MAX_CACHED_PIPES) {
+ while (pipe_freelist_count > GRPC_PIPE_LOW_WATERMARK) {
+ destroy_pipe();
+ }
+ }
gpr_mu_unlock(&pipe_freelist_mu);
}
@@ -88,11 +108,7 @@ void grpc_pollset_kick_global_init() {
void grpc_pollset_kick_global_destroy() {
while (pipe_freelist != NULL) {
- grpc_kick_pipe_info *current = pipe_freelist;
- pipe_freelist = pipe_freelist->next;
- close(current->pipe_read_fd);
- close(current->pipe_write_fd);
- gpr_free(current);
+ destroy_pipe();
}
gpr_mu_destroy(&pipe_freelist_mu);
}
diff --git a/test/core/iomgr/poll_kick_test.c b/test/core/iomgr/poll_kick_test.c
index 9f0d0f38b3..c30a7b9ee0 100644
--- a/test/core/iomgr/poll_kick_test.c
+++ b/test/core/iomgr/poll_kick_test.c
@@ -33,16 +33,17 @@
#include "src/core/iomgr/pollset_kick.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
-static void test_allocation() {
+static void test_allocation(void) {
grpc_pollset_kick_state state;
grpc_pollset_kick_init(&state);
grpc_pollset_kick_destroy(&state);
}
-static void test_non_kick() {
+static void test_non_kick(void) {
grpc_pollset_kick_state state;
int fd;
@@ -54,7 +55,7 @@ static void test_non_kick() {
grpc_pollset_kick_destroy(&state);
}
-static void test_basic_kick() {
+static void test_basic_kick(void) {
/* Kicked during poll */
grpc_pollset_kick_state state;
int fd;
@@ -73,7 +74,7 @@ static void test_basic_kick() {
grpc_pollset_kick_destroy(&state);
}
-static void test_non_poll_kick() {
+static void test_non_poll_kick(void) {
/* Kick before entering poll */
grpc_pollset_kick_state state;
int fd;
@@ -86,6 +87,26 @@ static void test_non_poll_kick() {
grpc_pollset_kick_destroy(&state);
}
+#define GRPC_MAX_CACHED_PIPES 50
+
+static void test_over_free(void) {
+ /* Check high watermark pipe free logic */
+ int i;
+ struct grpc_pollset_kick_state *kick_state =
+ gpr_malloc(sizeof(grpc_pollset_kick_state) * GRPC_MAX_CACHED_PIPES);
+ for (i = 0; i < GRPC_MAX_CACHED_PIPES; ++i) {
+ int fd;
+ grpc_pollset_kick_init(&kick_state[i]);
+ fd = grpc_pollset_kick_pre_poll(&kick_state[i]);
+ GPR_ASSERT(fd >= 0);
+ }
+
+ for (i = 0; i < GRPC_MAX_CACHED_PIPES; ++i) {
+ grpc_pollset_kick_post_poll(&kick_state[i]);
+ grpc_pollset_kick_destroy(&kick_state[i]);
+ }
+}
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
@@ -95,6 +116,7 @@ int main(int argc, char **argv) {
test_basic_kick();
test_non_poll_kick();
test_non_kick();
+ test_over_free();
grpc_pollset_kick_global_destroy();
return 0;