aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c37
-rw-r--r--src/core/lib/surface/channel.c32
-rw-r--r--src/core/lib/surface/channel.h3
-rw-r--r--src/core/lib/surface/lame_client.c4
-rw-r--r--src/core/lib/surface/server.c2
5 files changed, 67 insertions, 11 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index c2547c5147..00719be9ab 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -51,6 +51,7 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/arena.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -138,6 +139,7 @@ typedef struct batch_control {
} batch_control;
struct grpc_call {
+ gpr_arena *arena;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -212,6 +214,8 @@ struct grpc_call {
grpc_closure receiving_initial_metadata_ready;
uint32_t test_only_last_message_flags;
+ grpc_closure release_call;
+
union {
struct {
grpc_status_code *status;
@@ -273,7 +277,11 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
grpc_channel_get_channel_stack(args->channel);
grpc_call *call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
- call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size);
+ gpr_arena *arena =
+ gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel));
+ call = gpr_arena_alloc(arena,
+ sizeof(grpc_call) + channel_stack->call_stack_size);
+ call->arena = arena;
*out_call = call;
gpr_mu_init(&call->mu);
call->channel = args->channel;
@@ -360,11 +368,16 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
+ grpc_call_element_args call_args = {
+ .call_stack = CALL_STACK_FROM_CALL(call),
+ .server_transport_data = args->server_transport_data,
+ .context = call->context,
+ .path = path,
+ .start_time = call->start_time,
+ .deadline = send_deadline,
+ .arena = call->arena};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
- destroy_call, call, call->context,
- args->server_transport_data, path,
- call->start_time, send_deadline,
- CALL_STACK_FROM_CALL(call)));
+ destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
@@ -421,6 +434,14 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
+static void release_call(grpc_exec_ctx *exec_ctx, void *call,
+ grpc_error *error) {
+ grpc_call *c = call;
+ grpc_channel_update_call_size_estimate(c->channel,
+ gpr_arena_destroy(c->arena));
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
+}
+
static void set_status_value_directly(grpc_status_code status, void *dest);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
@@ -447,7 +468,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->cq) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- grpc_channel *channel = c->channel;
get_final_status(call, set_status_value_directly, &c->final_info.final_status,
NULL);
@@ -459,8 +479,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error);
}
- grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info,
+ grpc_closure_init(&c->release_call, release_call, c,
+ grpc_schedule_on_exec_ctx));
GPR_TIMER_END("destroy_call", 0);
}
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index d6acd392c1..56beb7d96e 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -68,6 +68,8 @@ struct grpc_channel {
grpc_compression_options compression_options;
grpc_mdelem default_authority;
+ gpr_atm call_size_estimate;
+
gpr_mu registered_call_mu;
registered_call *registered_calls;
@@ -115,6 +117,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
+ gpr_atm_no_barrier_store(
+ &channel->call_size_estimate,
+ CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size);
+
grpc_compression_options_init(&channel->compression_options);
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
@@ -177,6 +183,32 @@ done:
return channel;
}
+size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) {
+#define ROUND_UP_SIZE 256
+ return ((size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate) +
+ ROUND_UP_SIZE) &
+ ~(size_t)(ROUND_UP_SIZE - 1);
+}
+
+void grpc_channel_update_call_size_estimate(grpc_channel *channel,
+ size_t size) {
+ size_t cur = (size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate);
+ if (cur < size) {
+ /* size grew: update estimate */
+ gpr_atm_no_barrier_cas(&channel->call_size_estimate, (gpr_atm)cur,
+ (gpr_atm)size);
+ /* if we lose: never mind, something else will likely update soon enough */
+ } else if (cur == size) {
+ /* no change: holding pattern */
+ } else if (cur > 0) {
+ /* size shrank: decrease estimate */
+ gpr_atm_no_barrier_cas(
+ &channel->call_size_estimate, (gpr_atm)cur,
+ (gpr_atm)(GPR_MIN(cur - 1, (255 * cur + size) / 256)));
+ /* if we lose: never mind, something else will likely update soon enough */
+ }
+}
+
char *grpc_channel_get_target(grpc_channel *channel) {
GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel));
return gpr_strdup(channel->target);
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 3a441d7add..c4aebd8b9b 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -66,6 +66,9 @@ grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx *exec_ctx,
grpc_channel *channel,
int status_code);
+size_t grpc_channel_get_call_size_estimate(grpc_channel *channel);
+void grpc_channel_update_call_size_estimate(grpc_channel *channel, size_t size);
+
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 49bc4c114b..9ddb88bd11 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -130,8 +130,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *and_free_memory) {
- gpr_free(and_free_memory);
+ grpc_closure *then_schedule_closure) {
+ grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index b360579553..1186a4af63 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -898,7 +898,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;