diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-01 17:04:17 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-01 17:04:17 -0700 |
commit | 8e0b08a33d819d0a2523ec439d545296e1ad2086 (patch) | |
tree | 5613a61a51db9c5a5f7ecc5fec8cfd0eb0b2c436 | |
parent | 3c1331f920569ba3a182e13321db26796d6e920e (diff) | |
parent | fa275a97b968060383fe27c26b1d85f08d9582f9 (diff) |
Merge branch 'count-the-things' into we-dont-need-no-backup
-rw-r--r-- | src/core/channel/client_setup.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_posix.c | 17 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 13 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 78 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_internal.h | 10 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c | 14 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 20 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 19 | ||||
-rw-r--r-- | test/core/bad_client/bad_client.c | 2 | ||||
-rw-r--r-- | test/core/end2end/fixtures/chttp2_socket_pair.c | 2 | ||||
-rw-r--r-- | test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c | 2 | ||||
-rw-r--r-- | test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c | 2 | ||||
-rw-r--r-- | test/core/iomgr/fd_posix_test.c | 8 | ||||
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 18 | ||||
-rw-r--r-- | test/core/security/secure_endpoint_test.c | 2 | ||||
-rwxr-xr-x | tools/run_tests/jobset.py | 16 | ||||
-rwxr-xr-x | tools/run_tests/run_tests.py | 30 |
19 files changed, 194 insertions, 70 deletions
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index 6782ef515a..f305d8ba9e 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -96,7 +96,7 @@ static void setup_initiate(grpc_transport_setup *sp) { r->setup = s; grpc_pollset_set_init(&r->interested_parties); /* TODO(klempner): Actually set a deadline */ - r->deadline = gpr_inf_future; + r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); gpr_mu_lock(&s->mu); GPR_ASSERT(s->refs > 0); diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h index dffbd36d4c..25087be0c7 100644 --- a/src/core/iomgr/endpoint_pair.h +++ b/src/core/iomgr/endpoint_pair.h @@ -41,6 +41,7 @@ typedef struct { grpc_endpoint *server; } grpc_endpoint_pair; -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size); +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + size_t read_slice_size); #endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index ac511b97b2..9b3b63f1e7 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -44,6 +44,8 @@ #include <sys/socket.h> #include "src/core/iomgr/tcp_posix.h" +#include "src/core/support/string.h" +#include <grpc/support/alloc.h> #include <grpc/support/log.h> static void create_sockets(int sv[2]) { @@ -55,12 +57,21 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + size_t read_slice_size) { int sv[2]; grpc_endpoint_pair p; + char *final_name; create_sockets(sv); - p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size); - p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size); + + gpr_asprintf(&final_name, "%s:client", name); + p.client = + grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size); + gpr_free(final_name); + gpr_asprintf(&final_name, "%s:server", name); + p.server = + grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size); + gpr_free(final_name); return p; } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 65dff84a68..4a55049b0b 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -41,7 +41,6 @@ #include <sys/socket.h> #include <unistd.h> -#include "src/core/iomgr/iomgr_internal.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -113,7 +112,8 @@ static void destroy(grpc_fd *fd) { #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { - gpr_log(GPR_DEBUG, "FD %d ref %d %d -> %d [%s; %s:%d]", fd->fd, n, fd->refst, fd->refst + n, reason, file, line); + gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, + fd->refst, fd->refst + n, reason, file, line); #else #define REF_BY(fd, n, reason) ref_by(fd, n) #define UNREF_BY(fd, n, reason) unref_by(fd, n) @@ -125,7 +125,8 @@ static void ref_by(grpc_fd *fd, int n) { #ifdef GRPC_FD_REF_COUNT_DEBUG static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_atm old; - gpr_log(GPR_DEBUG, "FD %d unref %d %d -> %d [%s; %s:%d]", fd->fd, n, fd->refst, fd->refst - n, reason, file, line); + gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, + fd->refst, fd->refst - n, reason, file, line); #else static void unref_by(grpc_fd *fd, int n) { gpr_atm old; @@ -135,7 +136,7 @@ static void unref_by(grpc_fd *fd, int n) { close(fd->fd); grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); freelist_fd(fd); - grpc_iomgr_unref(); + grpc_iomgr_unregister_object(&fd->iomgr_object); } else { GPR_ASSERT(old > n); } @@ -154,9 +155,9 @@ void grpc_fd_global_shutdown(void) { static void do_nothing(void *ignored, int success) {} -grpc_fd *grpc_fd_create(int fd) { +grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); - grpc_iomgr_ref(); + grpc_iomgr_register_object(&r->iomgr_object, name); return r; } diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 7873ce6128..d1d6c8fe1f 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H -#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset.h" #include <grpc/support/atm.h> #include <grpc/support/sync.h> @@ -99,12 +99,14 @@ struct grpc_fd { grpc_iomgr_cb_func on_done; void *on_done_user_data; struct grpc_fd *freelist_next; + + grpc_iomgr_object iomgr_object; }; /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. This takes ownership of closing fd. */ -grpc_fd *grpc_fd_create(int fd); +grpc_fd *grpc_fd_create(int fd, const char *name); /* Releases fd to be asynchronously destroyed. on_done is called when the underlying file descriptor is definitely close()d. diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index ec31de24e0..8266b9247e 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -37,6 +37,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/alarm_internal.h" +#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> @@ -54,8 +55,8 @@ static gpr_cv g_rcv; static delayed_callback *g_cbs_head = NULL; static delayed_callback *g_cbs_tail = NULL; static int g_shutdown; -static int g_refs; static gpr_event g_background_callback_executor_done; +static grpc_iomgr_object g_root_object; /* Execute followup callbacks continuously. Other threads may check in and help during pollset_work() */ @@ -96,40 +97,60 @@ void grpc_iomgr_init(void) { gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_alarm_list_init(gpr_now()); - g_refs = 0; + g_root_object.next = g_root_object.prev = &g_root_object; + g_root_object.name = "root"; grpc_iomgr_platform_init(); gpr_event_init(&g_background_callback_executor_done); gpr_thd_new(&id, background_callback_executor, NULL, NULL); } +static size_t count_objects(void) { + grpc_iomgr_object *obj; + size_t n = 0; + for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) { + n++; + } + return n; +} + void grpc_iomgr_shutdown(void) { delayed_callback *cb; + grpc_iomgr_object *obj; gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); - grpc_alarm_list_shutdown(); - gpr_mu_lock(&g_mu); g_shutdown = 1; - while (g_cbs_head != NULL || g_refs > 0) { - if (g_cbs_head != NULL && g_refs > 0) { - gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed and executing final callbacks", g_refs); + while (g_cbs_head != NULL || g_root_object.next != &g_root_object) { + if (g_cbs_head != NULL && g_root_object.next != &g_root_object) { + gpr_log(GPR_DEBUG, + "Waiting for %d iomgr objects to be destroyed and executing " + "final callbacks", + count_objects()); } else if (g_cbs_head != NULL) { gpr_log(GPR_DEBUG, "Executing final iomgr callbacks"); } else { - gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed", g_refs); + gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed", + count_objects()); } - while (g_cbs_head) { - cb = g_cbs_head; - g_cbs_head = cb->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); + if (g_cbs_head) { + do { + cb = g_cbs_head; + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); - cb->cb(cb->cb_arg, 0); - gpr_free(cb); - gpr_mu_lock(&g_mu); + cb->cb(cb->cb_arg, 0); + gpr_free(cb); + gpr_mu_lock(&g_mu); + } while (g_cbs_head); + continue; } - if (g_refs) { + if (grpc_alarm_check(&g_mu, gpr_inf_future, NULL)) { + gpr_log(GPR_DEBUG, "got late alarm"); + continue; + } + if (g_root_object.next != &g_root_object) { int timeout = 0; gpr_timespec short_deadline = gpr_time_add(gpr_now(), gpr_time_from_millis(100)); @@ -143,7 +164,10 @@ void grpc_iomgr_shutdown(void) { gpr_log(GPR_DEBUG, "Failed to free %d iomgr objects before shutdown deadline: " "memory leaks are likely", - g_refs); + count_objects()); + for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) { + gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s", obj->name); + } break; } } @@ -153,22 +177,28 @@ void grpc_iomgr_shutdown(void) { grpc_kick_poller(); gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); + grpc_alarm_list_shutdown(); + grpc_iomgr_platform_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } -void grpc_iomgr_ref(void) { +void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) { gpr_mu_lock(&g_mu); - ++g_refs; + obj->name = gpr_strdup(name); + obj->next = &g_root_object; + obj->prev = obj->next->prev; + obj->next->prev = obj->prev->next = obj; gpr_mu_unlock(&g_mu); } -void grpc_iomgr_unref(void) { +void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_mu_lock(&g_mu); - if (0 == --g_refs) { - gpr_cv_signal(&g_rcv); - } + obj->next->prev = obj->prev; + obj->prev->next = obj->next; + gpr_free(obj->name); + gpr_cv_signal(&g_rcv); gpr_mu_unlock(&g_mu); } diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index 07923258b9..54eadf1edc 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -38,12 +38,18 @@ #include "src/core/iomgr/iomgr_internal.h" #include <grpc/support/sync.h> +typedef struct grpc_iomgr_object { + char *name; + struct grpc_iomgr_object *next; + struct grpc_iomgr_object *prev; +} grpc_iomgr_object; + int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, int success); -void grpc_iomgr_ref(void); -void grpc_iomgr_unref(void); +void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); +void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); void grpc_iomgr_platform_init(void); void grpc_iomgr_platform_shutdown(void); diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index 43fd704a6d..fcf48fe0d7 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -55,6 +55,7 @@ typedef struct { char *default_port; grpc_resolve_cb cb; void *arg; + grpc_iomgr_object iomgr_object; } request; grpc_resolved_addresses *grpc_blocking_resolve_address( @@ -153,9 +154,9 @@ static void do_request(void *rp) { grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); + grpc_iomgr_unregister_object(&r->iomgr_object); gpr_free(r); cb(arg, resolved); - grpc_iomgr_unref(); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -166,14 +167,17 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); - /*gpr_thd_id id;*/ - grpc_iomgr_ref(); + gpr_thd_id id; + char *tmp; + gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name, + default_port); + grpc_iomgr_register_object(&r->iomgr_object, tmp); + gpr_free(tmp); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; r->arg = arg; - /*gpr_thd_new(&id, do_request, r, NULL);*/ - do_request(r); + gpr_thd_new(&id, do_request, r, NULL); } #endif diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index f88ce19448..981c326511 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -48,6 +48,7 @@ #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -187,6 +188,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in addr4_copy; grpc_fd *fdobj; + char *name; + char *addr_str; /* Use dualstack sockets where available. */ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { @@ -213,20 +216,23 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), err = connect(fd, addr, addr_len); } while (err < 0 && errno == EINTR); - fdobj = grpc_fd_create(fd); + grpc_sockaddr_to_string(&addr_str, addr, 1); + gpr_asprintf(&name, "tcp-client:%s", addr_str); + + fdobj = grpc_fd_create(fd, name); grpc_pollset_set_add_fd(interested_parties, fdobj); if (err >= 0) { cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); - return; + goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - gpr_log(GPR_ERROR, "connect error: %s", strerror(errno)); + gpr_log(GPR_ERROR, "connect error to '%s': %s", strerror(errno)); grpc_fd_orphan(fdobj, NULL, NULL); cb(arg, NULL); - return; + goto done; } ac = gpr_malloc(sizeof(async_connect)); @@ -238,8 +244,12 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; - grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); + grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); + +done: + gpr_free(name); + gpr_free(addr_str); } #endif diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 002756f115..5c0203c3e3 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -60,6 +60,7 @@ #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -301,6 +302,8 @@ static void on_read(void *arg, int success) { for (;;) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); + char *addr_str; + char *name; /* Note: If we ever decide to return this address to the user, remember to strip off the ::ffff:0.0.0.0/96 prefix first. */ int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1); @@ -319,7 +322,10 @@ static void on_read(void *arg, int success) { grpc_set_socket_no_sigpipe_if_possible(fd); - fdobj = grpc_fd_create(fd); + grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); + + fdobj = grpc_fd_create(fd, name); /* TODO(ctiller): revise this when we have server-side sharding of channels -- we certainly should not be automatically adding every incoming channel to every pollset owned by the server */ @@ -329,6 +335,9 @@ static void on_read(void *arg, int success) { sp->server->cb( sp->server->cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + + gpr_free(name); + gpr_free(addr_str); } abort(); @@ -347,9 +356,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, const struct sockaddr *addr, int addr_len) { server_port *sp; int port; + char *addr_str; + char *name; port = prepare_socket(fd, addr, addr_len); if (port >= 0) { + grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); GPR_ASSERT(!s->cb && "must add ports before starting server"); /* append it to the list under a lock */ @@ -360,11 +373,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd); + sp->emfd = grpc_fd_create(fd, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); + gpr_free(addr_str); + gpr_free(name); } return port; diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 862bbb7364..e81e0eb850 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -88,7 +88,7 @@ void grpc_run_bad_client_test(const char *name, const char *client_payload, grpc_init(); /* Create endpoints */ - sfd = grpc_iomgr_create_endpoint_pair(65536); + sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536); /* Create server, completion events */ a.server = grpc_server_create_from_filters(NULL, 0, NULL); diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index a40f139442..d84405224b 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -97,7 +97,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( f.fixture_data = sfd; f.cq = grpc_completion_queue_create(); - *sfd = grpc_iomgr_create_endpoint_pair(65536); + *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536); return f; } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index 3d48b91af0..ac8b5eb86d 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -97,7 +97,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( f.fixture_data = sfd; f.cq = grpc_completion_queue_create(); - *sfd = grpc_iomgr_create_endpoint_pair(1); + *sfd = grpc_iomgr_create_endpoint_pair("fixture", 1); return f; } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c index 907c61b4ae..fd4f533f6e 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c @@ -98,7 +98,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( f.fixture_data = sfd; f.cq = grpc_completion_queue_create(); - *sfd = grpc_iomgr_create_endpoint_pair(65536); + *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536); return f; } diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 207862bd8d..4faa888ca5 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -206,7 +206,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ fcntl(fd, F_SETFL, flags | O_NONBLOCK); se = gpr_malloc(sizeof(*se)); se->sv = sv; - se->em_fd = grpc_fd_create(fd); + se->em_fd = grpc_fd_create(fd, "listener"); grpc_pollset_add_fd(&g_pollset, se->em_fd); se->session_read_closure.cb = session_read_cb; se->session_read_closure.cb_arg = se; @@ -235,7 +235,7 @@ static int server_start(server *sv) { port = ntohs(sin.sin_port); GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); - sv->em_fd = grpc_fd_create(fd); + sv->em_fd = grpc_fd_create(fd, "server"); grpc_pollset_add_fd(&g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ sv->listen_closure.cb = listen_cb; @@ -346,7 +346,7 @@ static void client_start(client *cl, int port) { } } - cl->em_fd = grpc_fd_create(fd); + cl->em_fd = grpc_fd_create(fd, "client"); grpc_pollset_add_fd(&g_pollset, cl->em_fd); client_session_write(cl, 1); @@ -436,7 +436,7 @@ static void test_grpc_fd_change(void) { flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); - em_fd = grpc_fd_create(sv[0]); + em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); grpc_pollset_add_fd(&g_pollset, em_fd); /* Register the first callback, then make its FD readable */ diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 04aed132f4..23bcd19fef 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -172,7 +172,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size); grpc_endpoint_add_to_pollset(ep, &g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -207,8 +207,9 @@ static void large_read_test(ssize_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size); grpc_endpoint_add_to_pollset(ep, &g_pollset); + written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -338,7 +339,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), + GRPC_TCP_DEFAULT_READ_SLICE_SIZE); grpc_endpoint_add_to_pollset(ep, &g_pollset); state.ep = ep; @@ -391,8 +393,10 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"), + GRPC_TCP_DEFAULT_READ_SLICE_SIZE); grpc_endpoint_add_to_pollset(ep, &g_pollset); + close(sv[0]); state.ep = ep; @@ -455,8 +459,10 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( grpc_endpoint_test_fixture f; create_sockets(sv); - f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0]), slice_size); - f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); + f.client_ep = + grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), slice_size); + f.server_ep = + grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size); grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 28c909f34d..9081e14a9b 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -53,7 +53,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( grpc_endpoint_test_fixture f; grpc_endpoint_pair tcp; - tcp = grpc_iomgr_create_endpoint_pair(slice_size); + tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size); grpc_endpoint_add_to_pollset(tcp.client, &g_pollset); grpc_endpoint_add_to_pollset(tcp.server, &g_pollset); diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py index e2b03bd0ab..51d61db7f6 100755 --- a/tools/run_tests/jobset.py +++ b/tools/run_tests/jobset.py @@ -66,6 +66,7 @@ def shuffle_iteratable(it): # p as we take elements - this gives us a somewhat random set of values before # we've seen all the values, but starts producing values without having to # compute ALL of them at once, allowing tests to start a little earlier + LARGE_THRESHOLD = 1000 nextit = [] p = 1 for val in it: @@ -74,6 +75,17 @@ def shuffle_iteratable(it): yield val else: nextit.append(val) + # if the input iterates over a large number of values (potentially + # infinite, we'd be in the loop for a while (again, potentially forever). + # We need to reset "nextit" every so often to, in the case of an infinite + # iterator, avoid growing "nextit" without ever freeing it. + if len(nextit) > LARGE_THRESHOLD: + random.shuffle(nextit) + for val in nextit: + yield val + nextit = [] + p = 1 + # after taking a random sampling, we shuffle the rest of the elements and # yield them random.shuffle(nextit) @@ -339,13 +351,15 @@ def run(cmdlines, maxjobs=None, newline_on_success=False, travis=False, + infinite_runs=False, stop_on_failure=False, cache=None): js = Jobset(check_cancelled, maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS, newline_on_success, travis, stop_on_failure, cache if cache is not None else NoCache()) - if not travis: + # We can't sort an infinite sequence of runs. + if not travis or infinite_runs: cmdlines = shuffle_iteratable(cmdlines) else: cmdlines = sorted(cmdlines, key=lambda x: x.shortname) diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index f34a6c9c07..cb50e38ca1 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -330,7 +330,28 @@ argp.add_argument('-c', '--config', choices=['all'] + sorted(_CONFIGS.keys()), nargs='+', default=_DEFAULT) -argp.add_argument('-n', '--runs_per_test', default=1, type=int) + +def runs_per_test_type(arg_str): + """Auxilary function to parse the "runs_per_test" flag. + + Returns: + A positive integer or 0, the latter indicating an infinite number of + runs. + + Raises: + argparse.ArgumentTypeError: Upon invalid input. + """ + if arg_str == 'inf': + return 0 + try: + n = int(arg_str) + if n <= 0: raise ValueError + except: + msg = "'{}' isn't a positive integer or 'inf'".format(arg_str) + raise argparse.ArgumentTypeError(msg) +argp.add_argument('-n', '--runs_per_test', default=1, type=runs_per_test_type, + help='A positive integer or "inf". If "inf", all tests will run in an ' + 'infinite loop. Especially useful in combination with "-f"') argp.add_argument('-r', '--regex', default='.*', type=str) argp.add_argument('-j', '--jobs', default=2 * multiprocessing.cpu_count(), type=int) argp.add_argument('-s', '--slowdown', default=1.0, type=float) @@ -456,11 +477,14 @@ def _build_and_run(check_cancelled, newline_on_success, travis, cache): antagonists = [subprocess.Popen(['tools/run_tests/antagonist.py']) for _ in range(0, args.antagonists)] try: + infinite_runs = runs_per_test == 0 # run all the tests - all_runs = itertools.chain.from_iterable( - itertools.repeat(one_run, runs_per_test)) + runs_sequence = (itertools.repeat(one_run) if infinite_runs + else itertools.repeat(one_run, runs_per_test)) + all_runs = itertools.chain.from_iterable(runs_sequence) if not jobset.run(all_runs, check_cancelled, newline_on_success=newline_on_success, travis=travis, + infinite_runs=infinite_runs, maxjobs=args.jobs, stop_on_failure=args.stop_on_failure, cache=cache): |