aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.cc29
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc29
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.h3
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.cc2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/lib/channel/channel_stack_builder.cc16
-rw-r--r--src/core/lib/channel/channel_stack_builder.h8
-rw-r--r--src/core/lib/iomgr/resource_quota.cc74
-rw-r--r--src/core/lib/iomgr/resource_quota.h27
-rw-r--r--src/core/lib/surface/channel.cc20
-rw-r--r--src/core/lib/surface/channel.h3
-rw-r--r--src/core/lib/surface/server.cc28
-rw-r--r--src/core/lib/surface/server.h5
13 files changed, 213 insertions, 33 deletions
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc
index 07b304b320..bdb2339e40 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.cc
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc
@@ -39,6 +39,7 @@
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
@@ -114,9 +115,16 @@ static void on_handshake_done(void* arg, grpc_error* error) {
server_connection_state* connection_state =
static_cast<server_connection_state*>(args->user_data);
gpr_mu_lock(&connection_state->svr_state->mu);
+ grpc_resource_user* resource_user = grpc_server_get_default_resource_user(
+ connection_state->svr_state->server);
if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
const char* error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
+ grpc_resource_user* resource_user = grpc_server_get_default_resource_user(
+ connection_state->svr_state->server);
+ if (resource_user != nullptr) {
+ grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
+ }
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
@@ -134,13 +142,14 @@ static void on_handshake_done(void* arg, grpc_error* error) {
// If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external
// code, so we can just clean up here without creating a transport.
+ // TODO(juanlishen): Do we need to free the memory to resource user?
if (args->endpoint != nullptr) {
- grpc_transport* transport =
- grpc_create_chttp2_transport(args->args, args->endpoint, false);
+ grpc_transport* transport = grpc_create_chttp2_transport(
+ args->args, args->endpoint, false, resource_user);
grpc_server_setup_transport(
connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args,
- grpc_chttp2_transport_get_socket_uuid(transport));
+ grpc_chttp2_transport_get_socket_uuid(transport), resource_user);
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
connection_state->transport =
@@ -183,6 +192,20 @@ static void on_accept(void* arg, grpc_endpoint* tcp,
gpr_free(acceptor);
return;
}
+ grpc_resource_user* resource_user =
+ grpc_server_get_default_resource_user(state->server);
+ if (resource_user != nullptr &&
+ !grpc_resource_user_safe_alloc(resource_user,
+ GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) {
+ gpr_log(
+ GPR_ERROR,
+ "Memory quota exhausted, rejecting the connection, no handshaking.");
+ gpr_mu_unlock(&state->mu);
+ grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE);
+ grpc_endpoint_destroy(tcp);
+ gpr_free(acceptor);
+ return;
+ }
grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create();
grpc_handshake_manager_pending_list_add(&state->pending_handshake_mgrs,
handshake_mgr);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 8a481bb7d5..04874a9494 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -478,7 +478,8 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
static void init_transport(grpc_chttp2_transport* t,
const grpc_channel_args* channel_args,
- grpc_endpoint* ep, bool is_client) {
+ grpc_endpoint* ep, bool is_client,
+ grpc_resource_user* resource_user) {
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
@@ -491,6 +492,7 @@ static void init_transport(grpc_chttp2_transport* t,
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client;
+ t->resource_user = resource_user;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@@ -778,6 +780,10 @@ static void destroy_stream_locked(void* sp, grpc_error* error) {
s->flow_control.Destroy();
+ if (t->resource_user != nullptr) {
+ grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
+ }
+
GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE);
@@ -816,7 +822,21 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
if (t->channel_callback.accept_stream == nullptr) {
return nullptr;
}
- grpc_chttp2_stream* accepting;
+ // Don't accept the stream if memory quota doesn't allow. Note that we should
+ // simply refuse the stream here instead of canceling the stream after it's
+ // accepted since the latter will create the call which costs much memory.
+ if (t->resource_user != nullptr &&
+ !grpc_resource_user_safe_alloc(t->resource_user,
+ GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
+ gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
+ grpc_slice_buffer_add(
+ &t->qbuf,
+ grpc_chttp2_rst_stream_create(
+ id, static_cast<uint32_t>(GRPC_HTTP2_REFUSED_STREAM), nullptr));
+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
+ return nullptr;
+ }
+ grpc_chttp2_stream* accepting = nullptr;
GPR_ASSERT(t->accepting_stream == nullptr);
t->accepting_stream = &accepting;
t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
@@ -3185,10 +3205,11 @@ intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport) {
}
grpc_transport* grpc_create_chttp2_transport(
- const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) {
+ const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
+ grpc_resource_user* resource_user) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(
gpr_zalloc(sizeof(grpc_chttp2_transport)));
- init_transport(t, channel_args, ep, is_client);
+ init_transport(t, channel_args, ep, is_client, resource_user);
return &t->base;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
index e5872fee43..b3fe1c082e 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
@@ -32,7 +32,8 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount;
extern bool g_flow_control_enabled;
grpc_transport* grpc_create_chttp2_transport(
- const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client);
+ const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
+ grpc_resource_user* resource_user = nullptr);
intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport);
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc
index 4bdd4309a4..a0a7534594 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc
@@ -32,7 +32,7 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
grpc_transport_one_way_stats* stats) {
static const size_t frame_size = 13;
grpc_slice slice = GRPC_SLICE_MALLOC(frame_size);
- stats->framing_bytes += frame_size;
+ if (stats != nullptr) stats->framing_bytes += frame_size;
uint8_t* p = GRPC_SLICE_START_PTR(slice);
// Frame size.
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ff26dd9255..202017641b 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -285,6 +285,8 @@ struct grpc_chttp2_transport {
grpc_endpoint* ep;
char* peer_string;
+ grpc_resource_user* resource_user;
+
grpc_combiner* combiner;
grpc_closure* notify_on_receive_settings;
diff --git a/src/core/lib/channel/channel_stack_builder.cc b/src/core/lib/channel/channel_stack_builder.cc
index df5a783631..8b3008f221 100644
--- a/src/core/lib/channel/channel_stack_builder.cc
+++ b/src/core/lib/channel/channel_stack_builder.cc
@@ -40,6 +40,7 @@ struct grpc_channel_stack_builder {
// various set/get-able parameters
grpc_channel_args* args;
grpc_transport* transport;
+ grpc_resource_user* resource_user;
char* target;
const char* name;
};
@@ -157,6 +158,11 @@ void grpc_channel_stack_builder_set_channel_arguments(
builder->args = grpc_channel_args_copy(args);
}
+const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments(
+ grpc_channel_stack_builder* builder) {
+ return builder->args;
+}
+
void grpc_channel_stack_builder_set_transport(
grpc_channel_stack_builder* builder, grpc_transport* transport) {
GPR_ASSERT(builder->transport == nullptr);
@@ -168,9 +174,15 @@ grpc_transport* grpc_channel_stack_builder_get_transport(
return builder->transport;
}
-const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments(
+void grpc_channel_stack_builder_set_resource_user(
+ grpc_channel_stack_builder* builder, grpc_resource_user* resource_user) {
+ GPR_ASSERT(builder->resource_user == nullptr);
+ builder->resource_user = resource_user;
+}
+
+grpc_resource_user* grpc_channel_stack_builder_get_resource_user(
grpc_channel_stack_builder* builder) {
- return builder->args;
+ return builder->resource_user;
}
bool grpc_channel_stack_builder_append_filter(
diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h
index 9196de9378..89c30e0c5e 100644
--- a/src/core/lib/channel/channel_stack_builder.h
+++ b/src/core/lib/channel/channel_stack_builder.h
@@ -54,6 +54,14 @@ void grpc_channel_stack_builder_set_transport(
grpc_transport* grpc_channel_stack_builder_get_transport(
grpc_channel_stack_builder* builder);
+/// Attach \a resource_user to the builder (does not take ownership)
+void grpc_channel_stack_builder_set_resource_user(
+ grpc_channel_stack_builder* builder, grpc_resource_user* resource_user);
+
+/// Fetch attached resource user
+grpc_resource_user* grpc_channel_stack_builder_get_resource_user(
+ grpc_channel_stack_builder* builder);
+
/// Set channel arguments: copies args
void grpc_channel_stack_builder_set_channel_arguments(
grpc_channel_stack_builder* builder, const grpc_channel_args* args);
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc
index b6fc7579f7..7e4b3c9b2f 100644
--- a/src/core/lib/iomgr/resource_quota.cc
+++ b/src/core/lib/iomgr/resource_quota.cc
@@ -90,7 +90,8 @@ struct grpc_resource_user {
grpc_closure_list on_allocated;
/* True if we are currently trying to allocate from the quota, false if not */
bool allocating;
- /* How many bytes of allocations are outstanding */
+ /* The amount of memory (in bytes) that has been requested from this user
+ * asynchronously but hasn't been granted yet. */
int64_t outstanding_allocations;
/* True if we are currently trying to add ourselves to the non-free quota
list, false otherwise */
@@ -135,6 +136,9 @@ struct grpc_resource_quota {
int64_t size;
/* Amount of free memory in the resource quota */
int64_t free_pool;
+ /* Used size of memory in the resource quota. Updated as soon as the resource
+ * users start to allocate or free the memory. */
+ gpr_atm used;
gpr_atm last_size;
@@ -371,6 +375,7 @@ static bool rq_reclaim_from_per_user_free_pool(
while ((resource_user = rulist_pop_head(resource_quota,
GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
gpr_mu_lock(&resource_user->mu);
+ resource_user->added_to_free_pool = false;
if (resource_user->free_pool > 0) {
int64_t amt = resource_user->free_pool;
resource_user->free_pool = 0;
@@ -386,6 +391,13 @@ static bool rq_reclaim_from_per_user_free_pool(
gpr_mu_unlock(&resource_user->mu);
return true;
} else {
+ if (grpc_resource_quota_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
+ "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
+ resource_quota->name, resource_user->name,
+ resource_user->free_pool, resource_quota->free_pool);
+ }
gpr_mu_unlock(&resource_user->mu);
}
}
@@ -622,6 +634,7 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
resource_quota->combiner = grpc_combiner_create();
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
+ resource_quota->used = 0;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
gpr_mu_init(&resource_quota->thread_count_mu);
resource_quota->max_threads = INT_MAX;
@@ -712,7 +725,7 @@ size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
*/
grpc_resource_quota* grpc_resource_quota_from_channel_args(
- const grpc_channel_args* channel_args) {
+ const grpc_channel_args* channel_args, bool create) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
if (channel_args->args[i].type == GRPC_ARG_POINTER) {
@@ -724,7 +737,7 @@ grpc_resource_quota* grpc_resource_quota_from_channel_args(
}
}
}
- return grpc_resource_quota_create(nullptr);
+ return create ? grpc_resource_quota_create(nullptr) : nullptr;
}
static void* rq_copy(void* rq) {
@@ -863,33 +876,68 @@ void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
}
-void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
- grpc_closure* optional_on_done) {
- gpr_mu_lock(&resource_user->mu);
+static void resource_user_alloc_locked(grpc_resource_user* resource_user,
+ size_t size,
+ grpc_closure* optional_on_done) {
ru_ref_by(resource_user, static_cast<gpr_atm>(size));
resource_user->free_pool -= static_cast<int64_t>(size);
- resource_user->outstanding_allocations += static_cast<int64_t>(size);
if (grpc_resource_quota_trace.enabled()) {
gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
resource_user->free_pool);
}
if (resource_user->free_pool < 0) {
- grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
- GRPC_ERROR_NONE);
+ if (optional_on_done != nullptr) {
+ resource_user->outstanding_allocations += static_cast<int64_t>(size);
+ grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
+ GRPC_ERROR_NONE);
+ }
if (!resource_user->allocating) {
resource_user->allocating = true;
GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
}
} else {
- resource_user->outstanding_allocations -= static_cast<int64_t>(size);
GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
}
+}
+
+bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
+ size_t size) {
+ if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
+ gpr_mu_lock(&resource_user->mu);
+ grpc_resource_quota* resource_quota = resource_user->resource_quota;
+ bool cas_success;
+ do {
+ gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
+ gpr_atm new_used = used + size;
+ if (static_cast<size_t>(new_used) >
+ grpc_resource_quota_peek_size(resource_quota)) {
+ gpr_mu_unlock(&resource_user->mu);
+ return false;
+ }
+ cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
+ } while (!cas_success);
+ resource_user_alloc_locked(resource_user, size, nullptr);
+ gpr_mu_unlock(&resource_user->mu);
+ return true;
+}
+
+void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
+ grpc_closure* optional_on_done) {
+ // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
+ // because some tests become flaky after the change.
+ gpr_mu_lock(&resource_user->mu);
+ grpc_resource_quota* resource_quota = resource_user->resource_quota;
+ gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
+ resource_user_alloc_locked(resource_user, size, optional_on_done);
gpr_mu_unlock(&resource_user->mu);
}
void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
gpr_mu_lock(&resource_user->mu);
+ grpc_resource_quota* resource_quota = resource_user->resource_quota;
+ gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
+ GPR_ASSERT(prior >= static_cast<long>(size));
bool was_zero_or_negative = resource_user->free_pool <= 0;
resource_user->free_pool += static_cast<int64_t>(size);
if (grpc_resource_quota_trace.enabled()) {
@@ -940,6 +988,12 @@ void grpc_resource_user_slice_allocator_init(
void grpc_resource_user_alloc_slices(
grpc_resource_user_slice_allocator* slice_allocator, size_t length,
size_t count, grpc_slice_buffer* dest) {
+ if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) {
+ GRPC_CLOSURE_SCHED(
+ &slice_allocator->on_allocated,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
+ return;
+ }
slice_allocator->length = length;
slice_allocator->count = count;
slice_allocator->dest = dest;
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index 7b0ed7417a..1c79b52e3f 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -65,11 +65,16 @@
extern grpc_core::TraceFlag grpc_resource_quota_trace;
+// TODO(juanlishen): This is a hack. We need to do real accounting instead of
+// hard coding.
+constexpr size_t GRPC_RESOURCE_QUOTA_CALL_SIZE = 15 * 1024;
+constexpr size_t GRPC_RESOURCE_QUOTA_CHANNEL_SIZE = 50 * 1024;
+
grpc_resource_quota* grpc_resource_quota_ref_internal(
grpc_resource_quota* resource_quota);
void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota);
grpc_resource_quota* grpc_resource_quota_from_channel_args(
- const grpc_channel_args* channel_args);
+ const grpc_channel_args* channel_args, bool create = true);
/* Return a number indicating current memory pressure:
0.0 ==> no memory usage
@@ -109,11 +114,21 @@ bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
int thread_count);
-/* Allocate from the resource user (and its quota).
- If optional_on_done is NULL, then allocate immediately. This may push the
- quota over-limit, at which point reclamation will kick in.
- If optional_on_done is non-NULL, it will be scheduled when the allocation has
- been granted by the quota. */
+/* Allocates from the resource user 'size' worth of memory if this won't exceed
+ * the resource quota's total size. Returns whether the allocation is done
+ * successfully. If allocated successfully, the memory should be freed by the
+ * caller eventually. */
+bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
+ size_t size);
+/* Allocates from the resource user 'size' worth of memory.
+ * If optional_on_done is NULL, then allocate immediately. This may push the
+ * quota over-limit, at which point reclamation will kick in. The caller is
+ * always responsible to free the memory eventually.
+ * If optional_on_done is non-NULL, it will be scheduled without error when the
+ * allocation has been granted by the quota, and the caller is responsible to
+ * free the memory eventually. Or it may be scheduled with an error, in which
+ * case the caller fails to allocate the memory and shouldn't free the memory.
+ */
void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
grpc_closure* optional_on_done);
/* Release memory back to the quota */
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index d7095c24d4..123be79c88 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -39,6 +39,7 @@
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -63,6 +64,7 @@ struct grpc_channel {
grpc_compression_options compression_options;
gpr_atm call_size_estimate;
+ grpc_resource_user* resource_user;
gpr_mu registered_call_mu;
registered_call* registered_calls;
@@ -82,6 +84,8 @@ grpc_channel* grpc_channel_create_with_builder(
char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
grpc_channel_args* args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
+ grpc_resource_user* resource_user =
+ grpc_channel_stack_builder_get_resource_user(builder);
grpc_channel* channel;
if (channel_stack_type == GRPC_SERVER_CHANNEL) {
GRPC_STATS_INC_SERVER_CHANNELS_CREATED();
@@ -101,6 +105,7 @@ grpc_channel* grpc_channel_create_with_builder(
}
channel->target = target;
+ channel->resource_user = resource_user;
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
size_t channel_tracer_max_memory = 0; // default to off
@@ -217,7 +222,8 @@ grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* input_args,
grpc_channel_stack_type channel_stack_type,
- grpc_transport* optional_transport) {
+ grpc_transport* optional_transport,
+ grpc_resource_user* resource_user) {
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
const grpc_core::UniquePtr<char> default_authority =
get_default_authority(input_args);
@@ -227,11 +233,17 @@ grpc_channel* grpc_channel_create(const char* target,
grpc_channel_args_destroy(args);
grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport);
+ grpc_channel_stack_builder_set_resource_user(builder, resource_user);
if (!grpc_channel_init_create_stack(builder, channel_stack_type)) {
grpc_channel_stack_builder_destroy(builder);
+ if (resource_user != nullptr) {
+ grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
+ }
return nullptr;
}
- return grpc_channel_create_with_builder(builder, channel_stack_type);
+ grpc_channel* channel =
+ grpc_channel_create_with_builder(builder, channel_stack_type);
+ return channel;
}
size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) {
@@ -441,6 +453,10 @@ static void destroy_channel(void* arg, grpc_error* error) {
GRPC_MDELEM_UNREF(rc->authority);
gpr_free(rc);
}
+ if (channel->resource_user != nullptr) {
+ grpc_resource_user_free(channel->resource_user,
+ GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
+ }
gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel->target);
gpr_free(channel);
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 4ac76b8a29..ab00b8e94f 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -29,7 +29,8 @@
grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* args,
grpc_channel_stack_type channel_stack_type,
- grpc_transport* optional_transport);
+ grpc_transport* optional_transport,
+ grpc_resource_user* resource_user = nullptr);
grpc_channel* grpc_channel_create_with_builder(
grpc_channel_stack_builder* builder,
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 35ab2c3bce..72391ca697 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -189,6 +189,8 @@ typedef struct {
struct grpc_server {
grpc_channel_args* channel_args;
+ grpc_resource_user* default_resource_user;
+
grpc_completion_queue** cqs;
grpc_pollset** pollsets;
size_t cq_count;
@@ -1024,6 +1026,15 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
grpc_slice_from_static_string("Server created"));
}
+ if (args != nullptr) {
+ grpc_resource_quota* resource_quota =
+ grpc_resource_quota_from_channel_args(args, false /* create */);
+ if (resource_quota != nullptr) {
+ server->default_resource_user =
+ grpc_resource_user_create(resource_quota, "default");
+ }
+ }
+
return server;
}
@@ -1122,7 +1133,8 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
- intptr_t socket_uuid) {
+ intptr_t socket_uuid,
+ grpc_resource_user* resource_user) {
size_t num_registered_methods;
size_t alloc;
registered_method* rm;
@@ -1135,7 +1147,8 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
uint32_t max_probes = 0;
grpc_transport_op* op = nullptr;
- channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport);
+ channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
+ resource_user);
chand = static_cast<channel_data*>(
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
->channel_data);
@@ -1330,6 +1343,13 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
GRPC_ERROR_NONE);
+
+ if (server->default_resource_user != nullptr) {
+ grpc_resource_quota_unref(
+ grpc_resource_user_quota(server->default_resource_user));
+ grpc_resource_user_shutdown(server->default_resource_user);
+ grpc_resource_user_unref(server->default_resource_user);
+ }
}
void grpc_server_cancel_all_calls(grpc_server* server) {
@@ -1546,6 +1566,10 @@ const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
return server->channel_args;
}
+grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
+ return server->default_resource_user;
+}
+
int grpc_server_has_open_connections(grpc_server* server) {
int r;
gpr_mu_lock(&server->mu_global);
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index 33c205417e..27038fdb7a 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -47,7 +47,8 @@ void grpc_server_add_listener(grpc_server* server, void* listener,
void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
- intptr_t socket_uuid);
+ intptr_t socket_uuid,
+ grpc_resource_user* resource_user = nullptr);
/* fills in the uuids of all sockets used for connections on this server */
void grpc_server_populate_server_sockets(
@@ -63,6 +64,8 @@ grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
+grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server);
+
int grpc_server_has_open_connections(grpc_server* server);
/* Do not call this before grpc_server_start. Returns the pollsets and the