aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/channel/client_setup.c2
-rw-r--r--src/core/iomgr/endpoint_pair.h3
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c17
-rw-r--r--src/core/iomgr/fd_posix.c13
-rw-r--r--src/core/iomgr/fd_posix.h6
-rw-r--r--src/core/iomgr/iomgr.c78
-rw-r--r--src/core/iomgr/iomgr_internal.h10
-rw-r--r--src/core/iomgr/resolve_address_posix.c14
-rw-r--r--src/core/iomgr/tcp_client_posix.c20
-rw-r--r--src/core/iomgr/tcp_server_posix.c19
-rw-r--r--test/core/bad_client/bad_client.c2
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair.c2
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c2
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c2
-rw-r--r--test/core/iomgr/fd_posix_test.c8
-rw-r--r--test/core/iomgr/tcp_posix_test.c18
-rw-r--r--test/core/security/secure_endpoint_test.c2
-rwxr-xr-xtools/run_tests/jobset.py16
-rwxr-xr-xtools/run_tests/run_tests.py30
19 files changed, 194 insertions, 70 deletions
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index 6782ef515a..f305d8ba9e 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -96,7 +96,7 @@ static void setup_initiate(grpc_transport_setup *sp) {
r->setup = s;
grpc_pollset_set_init(&r->interested_parties);
/* TODO(klempner): Actually set a deadline */
- r->deadline = gpr_inf_future;
+ r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->refs > 0);
diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h
index dffbd36d4c..25087be0c7 100644
--- a/src/core/iomgr/endpoint_pair.h
+++ b/src/core/iomgr/endpoint_pair.h
@@ -41,6 +41,7 @@ typedef struct {
grpc_endpoint *server;
} grpc_endpoint_pair;
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size);
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ size_t read_slice_size);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
index ac511b97b2..9b3b63f1e7 100644
--- a/src/core/iomgr/endpoint_pair_posix.c
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -44,6 +44,8 @@
#include <sys/socket.h>
#include "src/core/iomgr/tcp_posix.h"
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
static void create_sockets(int sv[2]) {
@@ -55,12 +57,21 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ size_t read_slice_size) {
int sv[2];
grpc_endpoint_pair p;
+ char *final_name;
create_sockets(sv);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size);
+
+ gpr_asprintf(&final_name, "%s:client", name);
+ p.client =
+ grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size);
+ gpr_free(final_name);
+ gpr_asprintf(&final_name, "%s:server", name);
+ p.server =
+ grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size);
+ gpr_free(final_name);
return p;
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 65dff84a68..4a55049b0b 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -41,7 +41,6 @@
#include <sys/socket.h>
#include <unistd.h>
-#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -113,7 +112,8 @@ static void destroy(grpc_fd *fd) {
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) {
- gpr_log(GPR_DEBUG, "FD %d ref %d %d -> %d [%s; %s:%d]", fd->fd, n, fd->refst, fd->refst + n, reason, file, line);
+ gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ fd->refst, fd->refst + n, reason, file, line);
#else
#define REF_BY(fd, n, reason) ref_by(fd, n)
#define UNREF_BY(fd, n, reason) unref_by(fd, n)
@@ -125,7 +125,8 @@ static void ref_by(grpc_fd *fd, int n) {
#ifdef GRPC_FD_REF_COUNT_DEBUG
static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) {
gpr_atm old;
- gpr_log(GPR_DEBUG, "FD %d unref %d %d -> %d [%s; %s:%d]", fd->fd, n, fd->refst, fd->refst - n, reason, file, line);
+ gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ fd->refst, fd->refst - n, reason, file, line);
#else
static void unref_by(grpc_fd *fd, int n) {
gpr_atm old;
@@ -135,7 +136,7 @@ static void unref_by(grpc_fd *fd, int n) {
close(fd->fd);
grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
freelist_fd(fd);
- grpc_iomgr_unref();
+ grpc_iomgr_unregister_object(&fd->iomgr_object);
} else {
GPR_ASSERT(old > n);
}
@@ -154,9 +155,9 @@ void grpc_fd_global_shutdown(void) {
static void do_nothing(void *ignored, int success) {}
-grpc_fd *grpc_fd_create(int fd) {
+grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
- grpc_iomgr_ref();
+ grpc_iomgr_register_object(&r->iomgr_object, name);
return r;
}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 7873ce6128..d1d6c8fe1f 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H
-#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
@@ -99,12 +99,14 @@ struct grpc_fd {
grpc_iomgr_cb_func on_done;
void *on_done_user_data;
struct grpc_fd *freelist_next;
+
+ grpc_iomgr_object iomgr_object;
};
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
This takes ownership of closing fd. */
-grpc_fd *grpc_fd_create(int fd);
+grpc_fd *grpc_fd_create(int fd, const char *name);
/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index ec31de24e0..8266b9247e 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -37,6 +37,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/alarm_internal.h"
+#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
@@ -54,8 +55,8 @@ static gpr_cv g_rcv;
static delayed_callback *g_cbs_head = NULL;
static delayed_callback *g_cbs_tail = NULL;
static int g_shutdown;
-static int g_refs;
static gpr_event g_background_callback_executor_done;
+static grpc_iomgr_object g_root_object;
/* Execute followup callbacks continuously.
Other threads may check in and help during pollset_work() */
@@ -96,40 +97,60 @@ void grpc_iomgr_init(void) {
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_alarm_list_init(gpr_now());
- g_refs = 0;
+ g_root_object.next = g_root_object.prev = &g_root_object;
+ g_root_object.name = "root";
grpc_iomgr_platform_init();
gpr_event_init(&g_background_callback_executor_done);
gpr_thd_new(&id, background_callback_executor, NULL, NULL);
}
+static size_t count_objects(void) {
+ grpc_iomgr_object *obj;
+ size_t n = 0;
+ for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
+ n++;
+ }
+ return n;
+}
+
void grpc_iomgr_shutdown(void) {
delayed_callback *cb;
+ grpc_iomgr_object *obj;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
- grpc_alarm_list_shutdown();
-
gpr_mu_lock(&g_mu);
g_shutdown = 1;
- while (g_cbs_head != NULL || g_refs > 0) {
- if (g_cbs_head != NULL && g_refs > 0) {
- gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed and executing final callbacks", g_refs);
+ while (g_cbs_head != NULL || g_root_object.next != &g_root_object) {
+ if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
+ gpr_log(GPR_DEBUG,
+ "Waiting for %d iomgr objects to be destroyed and executing "
+ "final callbacks",
+ count_objects());
} else if (g_cbs_head != NULL) {
gpr_log(GPR_DEBUG, "Executing final iomgr callbacks");
} else {
- gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed", g_refs);
+ gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
+ count_objects());
}
- while (g_cbs_head) {
- cb = g_cbs_head;
- g_cbs_head = cb->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
+ if (g_cbs_head) {
+ do {
+ cb = g_cbs_head;
+ g_cbs_head = cb->next;
+ if (!g_cbs_head) g_cbs_tail = NULL;
+ gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, 0);
- gpr_free(cb);
- gpr_mu_lock(&g_mu);
+ cb->cb(cb->cb_arg, 0);
+ gpr_free(cb);
+ gpr_mu_lock(&g_mu);
+ } while (g_cbs_head);
+ continue;
}
- if (g_refs) {
+ if (grpc_alarm_check(&g_mu, gpr_inf_future, NULL)) {
+ gpr_log(GPR_DEBUG, "got late alarm");
+ continue;
+ }
+ if (g_root_object.next != &g_root_object) {
int timeout = 0;
gpr_timespec short_deadline = gpr_time_add(gpr_now(),
gpr_time_from_millis(100));
@@ -143,7 +164,10 @@ void grpc_iomgr_shutdown(void) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
- g_refs);
+ count_objects());
+ for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
+ gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s", obj->name);
+ }
break;
}
}
@@ -153,22 +177,28 @@ void grpc_iomgr_shutdown(void) {
grpc_kick_poller();
gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future);
+ grpc_alarm_list_shutdown();
+
grpc_iomgr_platform_shutdown();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_rcv);
}
-void grpc_iomgr_ref(void) {
+void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) {
gpr_mu_lock(&g_mu);
- ++g_refs;
+ obj->name = gpr_strdup(name);
+ obj->next = &g_root_object;
+ obj->prev = obj->next->prev;
+ obj->next->prev = obj->prev->next = obj;
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_unref(void) {
+void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_mu_lock(&g_mu);
- if (0 == --g_refs) {
- gpr_cv_signal(&g_rcv);
- }
+ obj->next->prev = obj->prev;
+ obj->prev->next = obj->next;
+ gpr_free(obj->name);
+ gpr_cv_signal(&g_rcv);
gpr_mu_unlock(&g_mu);
}
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h
index 07923258b9..54eadf1edc 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_internal.h
@@ -38,12 +38,18 @@
#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/sync.h>
+typedef struct grpc_iomgr_object {
+ char *name;
+ struct grpc_iomgr_object *next;
+ struct grpc_iomgr_object *prev;
+} grpc_iomgr_object;
+
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
int success);
-void grpc_iomgr_ref(void);
-void grpc_iomgr_unref(void);
+void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name);
+void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);
void grpc_iomgr_platform_init(void);
void grpc_iomgr_platform_shutdown(void);
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index 43fd704a6d..fcf48fe0d7 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -55,6 +55,7 @@ typedef struct {
char *default_port;
grpc_resolve_cb cb;
void *arg;
+ grpc_iomgr_object iomgr_object;
} request;
grpc_resolved_addresses *grpc_blocking_resolve_address(
@@ -153,9 +154,9 @@ static void do_request(void *rp) {
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
+ grpc_iomgr_unregister_object(&r->iomgr_object);
gpr_free(r);
cb(arg, resolved);
- grpc_iomgr_unref();
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -166,14 +167,17 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
- /*gpr_thd_id id;*/
- grpc_iomgr_ref();
+ gpr_thd_id id;
+ char *tmp;
+ gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name,
+ default_port);
+ grpc_iomgr_register_object(&r->iomgr_object, tmp);
+ gpr_free(tmp);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
r->arg = arg;
- /*gpr_thd_new(&id, do_request, r, NULL);*/
- do_request(r);
+ gpr_thd_new(&id, do_request, r, NULL);
}
#endif
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index f88ce19448..981c326511 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -48,6 +48,7 @@
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
+#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -187,6 +188,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in addr4_copy;
grpc_fd *fdobj;
+ char *name;
+ char *addr_str;
/* Use dualstack sockets where available. */
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
@@ -213,20 +216,23 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
err = connect(fd, addr, addr_len);
} while (err < 0 && errno == EINTR);
- fdobj = grpc_fd_create(fd);
+ grpc_sockaddr_to_string(&addr_str, addr, 1);
+ gpr_asprintf(&name, "tcp-client:%s", addr_str);
+
+ fdobj = grpc_fd_create(fd, name);
grpc_pollset_set_add_fd(interested_parties, fdobj);
if (err >= 0) {
cb(arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
- return;
+ goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
- gpr_log(GPR_ERROR, "connect error: %s", strerror(errno));
+ gpr_log(GPR_ERROR, "connect error to '%s': %s", strerror(errno));
grpc_fd_orphan(fdobj, NULL, NULL);
cb(arg, NULL);
- return;
+ goto done;
}
ac = gpr_malloc(sizeof(async_connect));
@@ -238,8 +244,12 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
+ grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+
+done:
+ gpr_free(name);
+ gpr_free(addr_str);
}
#endif
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 002756f115..5c0203c3e3 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -60,6 +60,7 @@
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
+#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -301,6 +302,8 @@ static void on_read(void *arg, int success) {
for (;;) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
+ char *addr_str;
+ char *name;
/* Note: If we ever decide to return this address to the user, remember to
strip off the ::ffff:0.0.0.0/96 prefix first. */
int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1);
@@ -319,7 +322,10 @@ static void on_read(void *arg, int success) {
grpc_set_socket_no_sigpipe_if_possible(fd);
- fdobj = grpc_fd_create(fd);
+ grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
+
+ fdobj = grpc_fd_create(fd, name);
/* TODO(ctiller): revise this when we have server-side sharding
of channels -- we certainly should not be automatically adding every
incoming channel to every pollset owned by the server */
@@ -329,6 +335,9 @@ static void on_read(void *arg, int success) {
sp->server->cb(
sp->server->cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+
+ gpr_free(name);
+ gpr_free(addr_str);
}
abort();
@@ -347,9 +356,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr, int addr_len) {
server_port *sp;
int port;
+ char *addr_str;
+ char *name;
port = prepare_socket(fd, addr, addr_len);
if (port >= 0) {
+ grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->cb && "must add ports before starting server");
/* append it to the list under a lock */
@@ -360,11 +373,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd);
+ sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
+ gpr_free(addr_str);
+ gpr_free(name);
}
return port;
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 862bbb7364..e81e0eb850 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -88,7 +88,7 @@ void grpc_run_bad_client_test(const char *name, const char *client_payload,
grpc_init();
/* Create endpoints */
- sfd = grpc_iomgr_create_endpoint_pair(65536);
+ sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
/* Create server, completion events */
a.server = grpc_server_create_from_filters(NULL, 0, NULL);
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c
index a40f139442..d84405224b 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair.c
@@ -97,7 +97,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create();
- *sfd = grpc_iomgr_create_endpoint_pair(65536);
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
return f;
}
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
index 3d48b91af0..ac8b5eb86d 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
@@ -97,7 +97,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create();
- *sfd = grpc_iomgr_create_endpoint_pair(1);
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", 1);
return f;
}
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c
index 907c61b4ae..fd4f533f6e 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c
@@ -98,7 +98,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create();
- *sfd = grpc_iomgr_create_endpoint_pair(65536);
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
return f;
}
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 207862bd8d..4faa888ca5 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -206,7 +206,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = gpr_malloc(sizeof(*se));
se->sv = sv;
- se->em_fd = grpc_fd_create(fd);
+ se->em_fd = grpc_fd_create(fd, "listener");
grpc_pollset_add_fd(&g_pollset, se->em_fd);
se->session_read_closure.cb = session_read_cb;
se->session_read_closure.cb_arg = se;
@@ -235,7 +235,7 @@ static int server_start(server *sv) {
port = ntohs(sin.sin_port);
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
- sv->em_fd = grpc_fd_create(fd);
+ sv->em_fd = grpc_fd_create(fd, "server");
grpc_pollset_add_fd(&g_pollset, sv->em_fd);
/* Register to be interested in reading from listen_fd. */
sv->listen_closure.cb = listen_cb;
@@ -346,7 +346,7 @@ static void client_start(client *cl, int port) {
}
}
- cl->em_fd = grpc_fd_create(fd);
+ cl->em_fd = grpc_fd_create(fd, "client");
grpc_pollset_add_fd(&g_pollset, cl->em_fd);
client_session_write(cl, 1);
@@ -436,7 +436,7 @@ static void test_grpc_fd_change(void) {
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
- em_fd = grpc_fd_create(sv[0]);
+ em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
grpc_pollset_add_fd(&g_pollset, em_fd);
/* Register the first callback, then make its FD readable */
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 04aed132f4..23bcd19fef 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -172,7 +172,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
create_sockets(sv);
- ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size);
grpc_endpoint_add_to_pollset(ep, &g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@@ -207,8 +207,9 @@ static void large_read_test(ssize_t slice_size) {
create_sockets(sv);
- ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size);
grpc_endpoint_add_to_pollset(ep, &g_pollset);
+
written_bytes = fill_socket(sv[0]);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
@@ -338,7 +339,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
create_sockets(sv);
- ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"),
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
grpc_endpoint_add_to_pollset(ep, &g_pollset);
state.ep = ep;
@@ -391,8 +393,10 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
create_sockets(sv);
- ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
grpc_endpoint_add_to_pollset(ep, &g_pollset);
+
close(sv[0]);
state.ep = ep;
@@ -455,8 +459,10 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
grpc_endpoint_test_fixture f;
create_sockets(sv);
- f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0]), slice_size);
- f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
+ f.client_ep =
+ grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), slice_size);
+ f.server_ep =
+ grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size);
grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset);
grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset);
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index 28c909f34d..9081e14a9b 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -53,7 +53,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
grpc_endpoint_test_fixture f;
grpc_endpoint_pair tcp;
- tcp = grpc_iomgr_create_endpoint_pair(slice_size);
+ tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size);
grpc_endpoint_add_to_pollset(tcp.client, &g_pollset);
grpc_endpoint_add_to_pollset(tcp.server, &g_pollset);
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py
index e2b03bd0ab..51d61db7f6 100755
--- a/tools/run_tests/jobset.py
+++ b/tools/run_tests/jobset.py
@@ -66,6 +66,7 @@ def shuffle_iteratable(it):
# p as we take elements - this gives us a somewhat random set of values before
# we've seen all the values, but starts producing values without having to
# compute ALL of them at once, allowing tests to start a little earlier
+ LARGE_THRESHOLD = 1000
nextit = []
p = 1
for val in it:
@@ -74,6 +75,17 @@ def shuffle_iteratable(it):
yield val
else:
nextit.append(val)
+ # if the input iterates over a large number of values (potentially
+ # infinite, we'd be in the loop for a while (again, potentially forever).
+ # We need to reset "nextit" every so often to, in the case of an infinite
+ # iterator, avoid growing "nextit" without ever freeing it.
+ if len(nextit) > LARGE_THRESHOLD:
+ random.shuffle(nextit)
+ for val in nextit:
+ yield val
+ nextit = []
+ p = 1
+
# after taking a random sampling, we shuffle the rest of the elements and
# yield them
random.shuffle(nextit)
@@ -339,13 +351,15 @@ def run(cmdlines,
maxjobs=None,
newline_on_success=False,
travis=False,
+ infinite_runs=False,
stop_on_failure=False,
cache=None):
js = Jobset(check_cancelled,
maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
newline_on_success, travis, stop_on_failure,
cache if cache is not None else NoCache())
- if not travis:
+ # We can't sort an infinite sequence of runs.
+ if not travis or infinite_runs:
cmdlines = shuffle_iteratable(cmdlines)
else:
cmdlines = sorted(cmdlines, key=lambda x: x.shortname)
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index f34a6c9c07..cb50e38ca1 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -330,7 +330,28 @@ argp.add_argument('-c', '--config',
choices=['all'] + sorted(_CONFIGS.keys()),
nargs='+',
default=_DEFAULT)
-argp.add_argument('-n', '--runs_per_test', default=1, type=int)
+
+def runs_per_test_type(arg_str):
+ """Auxilary function to parse the "runs_per_test" flag.
+
+ Returns:
+ A positive integer or 0, the latter indicating an infinite number of
+ runs.
+
+ Raises:
+ argparse.ArgumentTypeError: Upon invalid input.
+ """
+ if arg_str == 'inf':
+ return 0
+ try:
+ n = int(arg_str)
+ if n <= 0: raise ValueError
+ except:
+ msg = "'{}' isn't a positive integer or 'inf'".format(arg_str)
+ raise argparse.ArgumentTypeError(msg)
+argp.add_argument('-n', '--runs_per_test', default=1, type=runs_per_test_type,
+ help='A positive integer or "inf". If "inf", all tests will run in an '
+ 'infinite loop. Especially useful in combination with "-f"')
argp.add_argument('-r', '--regex', default='.*', type=str)
argp.add_argument('-j', '--jobs', default=2 * multiprocessing.cpu_count(), type=int)
argp.add_argument('-s', '--slowdown', default=1.0, type=float)
@@ -456,11 +477,14 @@ def _build_and_run(check_cancelled, newline_on_success, travis, cache):
antagonists = [subprocess.Popen(['tools/run_tests/antagonist.py'])
for _ in range(0, args.antagonists)]
try:
+ infinite_runs = runs_per_test == 0
# run all the tests
- all_runs = itertools.chain.from_iterable(
- itertools.repeat(one_run, runs_per_test))
+ runs_sequence = (itertools.repeat(one_run) if infinite_runs
+ else itertools.repeat(one_run, runs_per_test))
+ all_runs = itertools.chain.from_iterable(runs_sequence)
if not jobset.run(all_runs, check_cancelled,
newline_on_success=newline_on_success, travis=travis,
+ infinite_runs=infinite_runs,
maxjobs=args.jobs,
stop_on_failure=args.stop_on_failure,
cache=cache):