aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/combiner.c11
-rw-r--r--src/core/lib/iomgr/resource_quota.c12
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c6
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c8
-rw-r--r--src/core/lib/iomgr/tcp_uv.c2
-rw-r--r--src/core/lib/iomgr/tcp_windows.c1
-rw-r--r--src/core/lib/iomgr/udp_server.c6
7 files changed, 34 insertions, 12 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 60ee14eb23..cfc67020ae 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -90,6 +90,12 @@ static bool is_covered_by_poller(grpc_combiner *lock) {
gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
}
+#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d"
+#define IS_COVERED_BY_POLLER_ARGS(lock) \
+ (lock)->final_list_covered_by_poller, \
+ gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
+ is_covered_by_poller((lock))
+
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL;
@@ -197,9 +203,10 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
- "is_covered_by_poller=%d exec_ctx_ready_to_finish=%d "
+ "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT
+ " exec_ctx_ready_to_finish=%d "
"time_to_execute_final_list=%d",
- lock, lock->optional_workqueue, is_covered_by_poller(lock),
+ lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock),
grpc_exec_ctx_ready_to_finish(exec_ctx),
lock->time_to_execute_final_list));
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 1862223b77..3dbc810102 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -144,6 +144,12 @@ struct grpc_resource_quota {
/* Closure around rq_reclamation_done */
grpc_closure rq_reclamation_done_closure;
+ /* This is only really usable for debugging: it's always a stale pointer, but
+ a stale pointer that might just be fresh enough to guide us to where the
+ reclamation system is stuck */
+ grpc_closure *debug_only_last_initiated_reclaimer;
+ grpc_resource_user *debug_only_last_reclaimer_resource_user;
+
/* Roots of all resource user lists */
grpc_resource_user *roots[GRPC_RULIST_COUNT];
@@ -225,6 +231,7 @@ static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) {
resource_user->links[list].prev;
resource_user->links[list].prev->links[list].next =
resource_user->links[list].next;
+ resource_user->links[list].next = resource_user->links[list].prev = NULL;
}
/*******************************************************************************
@@ -340,6 +347,9 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
resource_quota->reclaiming = true;
grpc_resource_quota_ref_internal(resource_quota);
grpc_closure *c = resource_user->reclaimers[destructive];
+ GPR_ASSERT(c);
+ resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
+ resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = NULL;
grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
return true;
@@ -469,6 +479,8 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
GRPC_ERROR_CANCELLED, NULL);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
+ rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
+ rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
}
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index b07f9ceffa..5f2ccfee76 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -59,7 +59,7 @@ typedef struct grpc_uv_tcp_connect {
static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx,
grpc_uv_tcp_connect *connect) {
- grpc_resource_quota_internal_unref(exec_ctx, connect->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota);
gpr_free(connect);
}
@@ -128,8 +128,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
- resource_quota = grpc_resource_quota_internal_ref(
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index b5b9b92a20..050bb9e141 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -89,11 +89,11 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
- grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
@@ -136,7 +136,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp->handle);
gpr_free(sp);
}
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
}
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 6e2ad1dbe9..257a4778c8 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -181,7 +181,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_slices = read_slices;
- grpc_slice_buffer_reset_and_unref(read_slices);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,read_slices);
TCP_REF(tcp, "read");
// TODO(murgatroid99): figure out what the return value here means
status =
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 56fdaa5d82..8b3b44aaaa 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -53,6 +53,7 @@
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_internal.h"
#if defined(__MSYS__) && defined(GPR_ARCH_64)
/* Nasty workaround for nasty bug when using the 64 bits msys compiler
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index fd0c7a0f9d..3c24ea9afa 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -388,7 +388,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
/* Try listening on IPv6 first. */
addr = &wild6;
// TODO(rjshade): Test and propagate the returned grpc_error*:
- grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
+ &dsmode, &fd));
allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
@@ -402,7 +403,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
}
// TODO(rjshade): Test and propagate the returned grpc_error*:
- grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
+ &dsmode, &fd));
if (fd < 0) {
gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
}