aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/resource_quota.c45
-rw-r--r--src/core/lib/iomgr/resource_quota.h4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c10
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c8
-rw-r--r--src/core/lib/iomgr/tcp_posix.c24
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c10
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c8
-rw-r--r--src/core/lib/iomgr/tcp_windows.c6
8 files changed, 55 insertions, 60 deletions
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index ddc7a88c5b..eac7c52c51 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -169,14 +169,14 @@ static void rq_step(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) {
rq_reclaim(exec_ctx, resource_quota, false) ||
rq_reclaim(exec_ctx, resource_quota, true);
done:
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
}
static void rq_step_sched(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
if (resource_quota->step_scheduled) return;
resource_quota->step_scheduled = true;
- grpc_resource_quota_internal_ref(resource_quota);
+ grpc_resource_quota_ref_internal(resource_quota);
grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner,
&resource_quota->rq_step_closure,
GRPC_ERROR_NONE, false);
@@ -257,7 +257,7 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
destructive ? "destructive" : "benign");
}
resource_quota->reclaiming = true;
- grpc_resource_quota_internal_ref(resource_quota);
+ grpc_resource_quota_ref_internal(resource_quota);
grpc_closure *c = resource_user->reclaimers[destructive];
resource_user->reclaimers[destructive] = NULL;
grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
@@ -280,21 +280,10 @@ static void ru_slice_ref(void *p) {
gpr_ref(&rc->refs);
}
-static void ru_slice_unref(void *p) {
+static void ru_slice_unref(grpc_exec_ctx *exec_ctx, void *p) {
ru_slice_refcount *rc = p;
if (gpr_unref(&rc->refs)) {
- /* TODO(ctiller): this is dangerous, but I think safe for now:
- we have no guarantee here that we're at a safe point for creating an
- execution context, but we have no way of writing this code otherwise.
- In the future: consider lifting grpc_slice to grpc, and offering an
- internal_{ref,unref} pair that is execution context aware.
- Alternatively,
- make exec_ctx be thread local and 'do the right thing' (whatever that
- is)
- if NULL */
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_resource_user_free(&exec_ctx, rc->resource_user, rc->size);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_resource_user_free(exec_ctx, rc->resource_user, rc->size);
gpr_free(rc);
}
}
@@ -419,7 +408,7 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
a->resource_quota->size += delta;
a->resource_quota->free_pool += delta;
rq_step_sched(exec_ctx, a->resource_quota);
- grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, a->resource_quota);
gpr_free(a);
}
@@ -428,7 +417,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq,
grpc_resource_quota *resource_quota = rq;
resource_quota->reclaiming = false;
rq_step_sched(exec_ctx, resource_quota);
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
}
/*******************************************************************************
@@ -459,7 +448,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
return resource_quota;
}
-void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
+void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
if (gpr_unref(&resource_quota->refs)) {
grpc_combiner_destroy(exec_ctx, resource_quota->combiner);
@@ -471,11 +460,11 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
/* Public API */
void grpc_resource_quota_unref(grpc_resource_quota *resource_quota) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_exec_ctx_finish(&exec_ctx);
}
-grpc_resource_quota *grpc_resource_quota_internal_ref(
+grpc_resource_quota *grpc_resource_quota_ref_internal(
grpc_resource_quota *resource_quota) {
gpr_ref(&resource_quota->refs);
return resource_quota;
@@ -483,7 +472,7 @@ grpc_resource_quota *grpc_resource_quota_internal_ref(
/* Public API */
void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) {
- grpc_resource_quota_internal_ref(resource_quota);
+ grpc_resource_quota_ref_internal(resource_quota);
}
/* Public API */
@@ -491,7 +480,7 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
size_t size) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
rq_resize_args *a = gpr_malloc(sizeof(*a));
- a->resource_quota = grpc_resource_quota_internal_ref(resource_quota);
+ a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
a->size = (int64_t)size;
grpc_closure_init(&a->closure, rq_resize, a);
grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure,
@@ -508,7 +497,7 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args(
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
if (channel_args->args[i].type == GRPC_ARG_POINTER) {
- return grpc_resource_quota_internal_ref(
+ return grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
} else {
gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
@@ -523,7 +512,9 @@ static void *rq_copy(void *rq) {
return rq;
}
-static void rq_destroy(void *rq) { grpc_resource_quota_unref(rq); }
+static void rq_destroy(grpc_exec_ctx *exec_ctx, void *rq) {
+ grpc_resource_quota_unref_internal(exec_ctx, rq);
+}
static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
@@ -540,7 +531,7 @@ void grpc_resource_user_init(grpc_resource_user *resource_user,
grpc_resource_quota *resource_quota,
const char *name) {
resource_user->resource_quota =
- grpc_resource_quota_internal_ref(resource_quota);
+ grpc_resource_quota_ref_internal(resource_quota);
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
resource_user);
grpc_closure_init(&resource_user->add_to_free_pool_closure,
@@ -589,7 +580,7 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
- grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_user->resource_quota);
gpr_mu_destroy(&resource_user->mu);
gpr_free(resource_user->name);
}
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index f7e5ca6494..f1da73933e 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -77,9 +77,9 @@
extern int grpc_resource_quota_trace;
-grpc_resource_quota *grpc_resource_quota_internal_ref(
+grpc_resource_quota *grpc_resource_quota_ref_internal(
grpc_resource_quota *resource_quota);
-void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
+void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
grpc_resource_quota *grpc_resource_quota_from_channel_args(
const grpc_channel_args *channel_args);
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index bc08c94ee0..3b0fe3bc15 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -116,7 +116,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
- grpc_channel_args_destroy(ac->channel_args);
+ grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_free(ac);
}
}
@@ -136,8 +136,8 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
&channel_args->args[i], options);
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
- resource_quota = grpc_resource_quota_internal_ref(
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
@@ -145,7 +145,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
grpc_endpoint *ep =
grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str);
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
return ep;
}
@@ -247,7 +247,7 @@ finish:
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
- grpc_channel_args_destroy(ac->channel_args);
+ grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_free(ac);
}
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 4d1e809872..4ad417f77d 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -71,7 +71,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
- grpc_resource_quota_internal_unref(exec_ctx, ac->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, ac->resource_quota);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
@@ -153,8 +153,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
- resource_quota = grpc_resource_quota_internal_ref(
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
@@ -242,7 +242,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 584fc2fe2e..4bf13bee27 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -56,6 +56,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@@ -131,7 +132,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
- grpc_slice_buffer_destroy(&tcp->last_read_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer);
grpc_resource_user_destroy(exec_ctx, &tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
@@ -178,7 +179,7 @@ static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
- grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
@@ -245,13 +246,14 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
- grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed"));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -276,8 +278,9 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
grpc_error *error) {
grpc_tcp *tcp = tcpp;
if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &tcp->last_read_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -302,8 +305,9 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
GPR_ASSERT(!tcp->finished_edge);
if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &tcp->last_read_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -317,7 +321,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->incoming_buffer = incoming_buffer;
- grpc_slice_buffer_reset_and_unref(incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
@@ -578,7 +582,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->release_fd = fd;
tcp->release_fd_cb = done;
tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
- grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index b6fc1e4ca2..1a753d1231 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -167,18 +167,18 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
- grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
@@ -219,7 +219,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp);
}
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index ae54c70d2d..c2a6d1736e 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -115,11 +115,11 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
- grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
@@ -155,7 +155,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_winsocket_destroy(sp->socket);
gpr_free(sp);
}
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
}
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index f825057c0e..a97b1b21fe 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -190,13 +190,13 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
char *utf8_message = gpr_format_message(info->wsa_error);
error = GRPC_ERROR_CREATE(utf8_message);
gpr_free(utf8_message);
- grpc_slice_unref(tcp->read_slice);
+ grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
} else {
if (info->bytes_transfered != 0 && !tcp->shutting_down) {
sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
grpc_slice_buffer_add(tcp->read_slices, sub);
} else {
- grpc_slice_unref(tcp->read_slice);
+ grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
error = GRPC_ERROR_CREATE("End of TCP stream");
}
}
@@ -225,7 +225,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->read_cb = cb;
tcp->read_slices = read_slices;
- grpc_slice_buffer_reset_and_unref(read_slices);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices);
tcp->read_slice = grpc_slice_malloc(8192);