aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-09-28 08:37:51 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-09-28 08:37:51 -0700
commitef6b97659edb575a002d574db89d90f7ebf4b979 (patch)
tree249ed65682669483185f532e6b79da0f6922d995
parent82509936ae1de5620825e8a5b27c34b47d9f1af8 (diff)
Add tracing, fix some transport bugs wrt buffer_pools
-rw-r--r--include/grpc/grpc.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c24
-rw-r--r--src/core/lib/iomgr/buffer_pool.c66
-rw-r--r--src/core/lib/iomgr/buffer_pool.h6
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c2
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/lib/security/credentials/google_default/google_default_credentials.c3
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.c4
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c5
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h2
-rw-r--r--test/core/end2end/tests/buffer_pool_server.c228
-rw-r--r--test/core/util/mock_endpoint.c12
-rw-r--r--test/core/util/passthru_endpoint.c16
-rw-r--r--test/core/util/port_server_client.c11
16 files changed, 354 insertions, 33 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index a6e02cd072..4bdf744d91 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -402,7 +402,7 @@ GRPCAPI int grpc_is_binary_header(const char *key, size_t length);
GRPCAPI const char *grpc_call_error_to_string(grpc_call_error error);
/** Create a buffer pool */
-GRPCAPI grpc_buffer_pool *grpc_buffer_pool_create(void);
+GRPCAPI grpc_buffer_pool *grpc_buffer_pool_create(const char *trace_name);
/** Add a reference to a buffer pool */
GRPCAPI void grpc_buffer_pool_ref(grpc_buffer_pool *buffer_pool);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 3ebb467332..13241f6abe 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -257,7 +257,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t);
- grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t);
+ grpc_closure_init(&t->destructive_reclaimer_locked,
+ destructive_reclaimer_locked, t);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(&t->hpack_parser);
@@ -2124,10 +2125,21 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport *t = arg;
if (error == GRPC_ERROR_NONE &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
+ t->peer_string);
+ }
send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM,
gpr_slice_from_static_string("Buffers full"));
+ } else if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG,
+ "HTTP2: %s - skip benign reclaimation, there are still %" PRIdPTR
+ " streams",
+ t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
}
t->benign_reclaimer_registered = false;
+ grpc_buffer_user_finish_reclaimation(exec_ctx,
+ grpc_endpoint_get_buffer_user(t->ep));
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer");
}
@@ -2138,18 +2150,20 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
t->destructive_reclaimer_registered = false;
if (error == GRPC_ERROR_NONE && n > 0) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map);
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string,
+ s->id);
+ }
grpc_chttp2_cancel_stream(
exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR,
GRPC_CHTTP2_ENHANCE_YOUR_CALM));
if (n > 1) {
post_destructive_reclaimer(exec_ctx, t);
- t->destructive_reclaimer_registered = true;
- grpc_buffer_user_post_reclaimer(exec_ctx,
- grpc_endpoint_get_buffer_user(t->ep),
- true, &t->destructive_reclaimer);
}
}
+ grpc_buffer_user_finish_reclaimation(exec_ctx,
+ grpc_endpoint_get_buffer_user(t->ep));
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");
}
diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c
index 88f37c9fc1..cfa171684b 100644
--- a/src/core/lib/iomgr/buffer_pool.c
+++ b/src/core/lib/iomgr/buffer_pool.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/combiner.h"
@@ -62,6 +63,8 @@ struct grpc_buffer_pool {
grpc_closure bpreclaimation_done_closure;
grpc_buffer_user *roots[GRPC_BULIST_COUNT];
+
+ char *name;
};
/*******************************************************************************
@@ -175,8 +178,18 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) {
gpr_mu_lock(&buffer_user->mu);
if (buffer_user->free_pool < 0 &&
-buffer_user->free_pool <= buffer_pool->free_pool) {
- buffer_pool->free_pool += buffer_user->free_pool;
+ int64_t amt = -buffer_user->free_pool;
buffer_user->free_pool = 0;
+ buffer_pool->free_pool -= amt;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64
+ " bytes; bp_free_pool -> %" PRId64,
+ buffer_pool->name, buffer_user->name, amt,
+ buffer_pool->free_pool);
+ }
+ } else if (grpc_buffer_pool_trace && buffer_user->free_pool >= 0) {
+ gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request",
+ buffer_pool->name, buffer_user->name);
}
if (buffer_user->free_pool >= 0) {
buffer_user->allocating = false;
@@ -198,8 +211,15 @@ static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) {
bulist_pop(buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL))) {
gpr_mu_lock(&buffer_user->mu);
if (buffer_user->free_pool > 0) {
- buffer_pool->free_pool += buffer_user->free_pool;
+ int64_t amt = buffer_user->free_pool;
buffer_user->free_pool = 0;
+ buffer_pool->free_pool += amt;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64
+ " bytes; bp_free_pool -> %" PRId64,
+ buffer_pool->name, buffer_user->name, amt,
+ buffer_pool->free_pool);
+ }
gpr_mu_unlock(&buffer_user->mu);
return true;
} else {
@@ -217,6 +237,10 @@ static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool,
: GRPC_BULIST_RECLAIMER_BENIGN;
grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list);
if (buffer_user == NULL) return false;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", buffer_pool->name,
+ buffer_user->name, destructive ? "destructive" : "benign");
+ }
buffer_pool->reclaiming = true;
grpc_closure *c = buffer_user->reclaimers[destructive];
buffer_user->reclaimers[destructive] = NULL;
@@ -384,7 +408,7 @@ static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp,
* grpc_buffer_pool api
*/
-grpc_buffer_pool *grpc_buffer_pool_create(void) {
+grpc_buffer_pool *grpc_buffer_pool_create(const char *name) {
grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool));
gpr_ref_init(&buffer_pool->refs, 1);
buffer_pool->combiner = grpc_combiner_create(NULL);
@@ -392,6 +416,12 @@ grpc_buffer_pool *grpc_buffer_pool_create(void) {
buffer_pool->size = INT64_MAX;
buffer_pool->step_scheduled = false;
buffer_pool->reclaiming = false;
+ if (name != NULL) {
+ buffer_pool->name = gpr_strdup(name);
+ } else {
+ gpr_asprintf(&buffer_pool->name, "anonymous_pool_%" PRIxPTR,
+ (intptr_t)buffer_pool);
+ }
grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool);
grpc_closure_init(&buffer_pool->bpreclaimation_done_closure,
bp_reclaimation_done, buffer_pool);
@@ -451,7 +481,7 @@ grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
}
}
}
- return grpc_buffer_pool_create();
+ return grpc_buffer_pool_create(NULL);
}
static void *bp_copy(void *bp) {
@@ -473,7 +503,7 @@ const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) {
*/
void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
- grpc_buffer_pool *buffer_pool) {
+ grpc_buffer_pool *buffer_pool, const char *name) {
buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user);
grpc_closure_init(&buffer_user->add_to_free_pool_closure,
@@ -498,6 +528,12 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
#ifndef NDEBUG
buffer_user->asan_canary = gpr_malloc(1);
#endif
+ if (name != NULL) {
+ buffer_user->name = gpr_strdup(name);
+ } else {
+ gpr_asprintf(&buffer_user->name, "anonymous_buffer_user_%" PRIxPTR,
+ (intptr_t)buffer_user);
+ }
}
void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
@@ -533,6 +569,10 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
&buffer_user->on_done_destroy_closure);
if (on_done_destroy != NULL) {
/* already shutdown */
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown",
+ buffer_user->buffer_pool->name, buffer_user->name, size);
+ }
grpc_exec_ctx_sched(
exec_ctx, optional_on_done,
GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL);
@@ -541,6 +581,12 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
}
buffer_user->allocated += (int64_t)size;
buffer_user->free_pool -= (int64_t)size;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
+ ", free_pool -> %" PRId64,
+ buffer_user->buffer_pool->name, buffer_user->name, size,
+ buffer_user->allocated, buffer_user->free_pool);
+ }
if (buffer_user->free_pool < 0) {
grpc_closure_list_append(&buffer_user->on_allocated, optional_on_done,
GRPC_ERROR_NONE);
@@ -563,6 +609,12 @@ void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx,
bool was_zero_or_negative = buffer_user->free_pool <= 0;
buffer_user->free_pool += (int64_t)size;
buffer_user->allocated -= (int64_t)size;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64
+ ", free_pool -> %" PRId64,
+ buffer_user->buffer_pool->name, buffer_user->name, size,
+ buffer_user->allocated, buffer_user->free_pool);
+ }
bool is_bigger_than_zero = buffer_user->free_pool > 0;
if (is_bigger_than_zero && was_zero_or_negative &&
!buffer_user->added_to_free_pool) {
@@ -597,6 +649,10 @@ void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
grpc_buffer_user *buffer_user) {
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete",
+ buffer_user->buffer_pool->name, buffer_user->name);
+ }
grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
&buffer_user->buffer_pool->bpreclaimation_done_closure,
GRPC_ERROR_NONE, false);
diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h
index 301e059c18..1564872b5d 100644
--- a/src/core/lib/iomgr/buffer_pool.h
+++ b/src/core/lib/iomgr/buffer_pool.h
@@ -38,6 +38,8 @@
#include "src/core/lib/iomgr/exec_ctx.h"
+extern int grpc_buffer_pool_trace;
+
grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool);
void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx,
grpc_buffer_pool *buffer_pool);
@@ -83,10 +85,12 @@ struct grpc_buffer_user {
gpr_atm on_done_destroy_closure;
grpc_buffer_user_link links[GRPC_BULIST_COUNT];
+
+ char *name;
};
void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
- grpc_buffer_pool *buffer_pool);
+ grpc_buffer_pool *buffer_pool, const char *name);
void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_buffer_user *buffer_user,
grpc_closure *on_done);
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index cf01623f09..4e8f9e53bb 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -125,7 +125,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
const char *addr_str) {
size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 ==
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 812c39235b..120622e817 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -544,7 +544,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
- grpc_buffer_user_init(&tcp->buffer_user, buffer_pool);
+ grpc_buffer_user_init(&tcp->buffer_user, buffer_pool, peer_string);
grpc_buffer_user_slice_allocator_init(
&tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 3304152385..4d36fd4caf 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -163,7 +163,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
- s->buffer_pool = grpc_buffer_pool_create();
+ s->buffer_pool = grpc_buffer_pool_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c
index 3bcde3da8b..4e703aa9f4 100644
--- a/src/core/lib/security/credentials/google_default/google_default_credentials.c
+++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c
@@ -124,7 +124,8 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_context_init(&context);
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool =
+ grpc_buffer_pool_create("google_default_credentials");
grpc_httpcli_get(
&exec_ctx, &context, &detector.pollent, buffer_pool, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index c1a3eb7eab..ffcd0b3910 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -660,7 +660,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
/* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host
channel. This would allow us to cancel an authentication query when under
extreme memory pressure. */
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier");
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
@@ -772,7 +772,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
/* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host
channel. This would allow us to cancel an authentication query when under
extreme memory pressure. */
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier");
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index e9e83d1468..61c0815b2a 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -310,7 +310,7 @@ static void compute_engine_fetch_oauth2(
/* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host
channel. This would allow us to cancel an authentication query when under
extreme memory pressure. */
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("oauth2_credentials");
grpc_httpcli_get(exec_ctx, httpcli_context, pollent, buffer_pool, &request,
deadline, grpc_closure_create(response_cb, metadata_req),
&metadata_req->response);
@@ -365,7 +365,8 @@ static void refresh_token_fetch_oauth2(
/* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host
channel. This would allow us to cancel an authentication query when under
extreme memory pressure. */
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool =
+ grpc_buffer_pool_create("oauth2_credentials_refresh");
grpc_httpcli_post(exec_ctx, httpcli_context, pollent, buffer_pool, &request,
body, strlen(body), deadline,
grpc_closure_create(response_cb, metadata_req),
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 3cbbaa7b0c..de913af4ee 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -48,6 +48,7 @@
#include "src/core/lib/channel/message_size_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/buffer_pool.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -184,6 +185,7 @@ void grpc_init(void) {
// Default timeout trace to 1
grpc_cq_event_timeout_trace = 1;
grpc_register_tracer("op_failure", &grpc_trace_operation_failures);
+ grpc_register_tracer("buffer_pool", &grpc_buffer_pool_trace);
#ifndef NDEBUG
grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
#endif
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 4926275fa2..703ea59e6e 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -347,7 +347,7 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
-typedef grpc_buffer_pool *(*grpc_buffer_pool_create_type)(void);
+typedef grpc_buffer_pool *(*grpc_buffer_pool_create_type)(const char *trace_name);
extern grpc_buffer_pool_create_type grpc_buffer_pool_create_import;
#define grpc_buffer_pool_create grpc_buffer_pool_create_import
typedef void(*grpc_buffer_pool_ref_type)(grpc_buffer_pool *buffer_pool);
diff --git a/test/core/end2end/tests/buffer_pool_server.c b/test/core/end2end/tests/buffer_pool_server.c
index 0b06efd02e..7f07ec79d5 100644
--- a/test/core/end2end/tests/buffer_pool_server.c
+++ b/test/core/end2end/tests/buffer_pool_server.c
@@ -95,9 +95,235 @@ static void end_test(grpc_end2end_test_fixture *f) {
grpc_completion_queue_destroy(f->cq);
}
+/* Creates and returns a gpr_slice containing random alphanumeric characters. */
+static gpr_slice generate_random_slice() {
+ size_t i;
+ static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
+ char output[1024 * 1024];
+ for (i = 0; i < GPR_ARRAY_SIZE(output) - 1; ++i) {
+ output[i] = chars[rand() % (int)(sizeof(chars) - 1)];
+ }
+ output[GPR_ARRAY_SIZE(output) - 1] = '\0';
+ return gpr_slice_from_copied_string(output);
+}
+
void buffer_pool_server(grpc_end2end_test_config config) {
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("test_server");
+ grpc_buffer_pool_resize(buffer_pool, 5 * 1024 * 1024);
+
+#define NUM_CALLS 100
+#define CLIENT_BASE_TAG 1000
+#define SERVER_START_BASE_TAG 2000
+#define SERVER_RECV_BASE_TAG 3000
+#define SERVER_END_BASE_TAG 4000
+
+ grpc_arg arg;
+ arg.key = GRPC_ARG_BUFFER_POOL;
+ arg.type = GRPC_ARG_POINTER;
+ arg.value.pointer.p = buffer_pool;
+ arg.value.pointer.vtable = grpc_buffer_pool_arg_vtable();
+ grpc_channel_args args = {1, &arg};
+
grpc_end2end_test_fixture f =
- begin_test(config, "buffer_pool_server", NULL, NULL);
+ begin_test(config, "buffer_pool_server", NULL, &args);
+
+ /* Create large request and response bodies. These are big enough to require
+ * multiple round trips to deliver to the peer, and their exact contents of
+ * will be verified on completion. */
+ gpr_slice request_payload_slice = generate_random_slice();
+
+ grpc_call *client_calls[NUM_CALLS];
+ grpc_call *server_calls[NUM_CALLS];
+ grpc_metadata_array initial_metadata_recv[NUM_CALLS];
+ grpc_metadata_array trailing_metadata_recv[NUM_CALLS];
+ grpc_metadata_array request_metadata_recv[NUM_CALLS];
+ grpc_call_details call_details[NUM_CALLS];
+ grpc_status_code status[NUM_CALLS];
+ char *details[NUM_CALLS];
+ size_t details_capacity[NUM_CALLS];
+ grpc_byte_buffer *request_payload_recv[NUM_CALLS];
+ int was_cancelled[NUM_CALLS];
+ grpc_call_error error;
+ int pending_client_calls = 0;
+ int pending_server_start_calls = 0;
+ int pending_server_recv_calls = 0;
+ int pending_server_end_calls = 0;
+ int cancelled_calls_on_client = 0;
+ int cancelled_calls_on_server = 0;
+
+ grpc_byte_buffer *request_payload =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+
+ grpc_op ops[6];
+ grpc_op *op;
+
+ for (int i = 0; i < NUM_CALLS; i++) {
+ grpc_metadata_array_init(&initial_metadata_recv[i]);
+ grpc_metadata_array_init(&trailing_metadata_recv[i]);
+ grpc_metadata_array_init(&request_metadata_recv[i]);
+ grpc_call_details_init(&call_details[i]);
+ details[i] = NULL;
+ details_capacity[i] = 0;
+ request_payload_recv[i] = NULL;
+ was_cancelled[i] = 0;
+ }
+
+ for (int i = 0; i < NUM_CALLS; i++) {
+ error = grpc_server_request_call(
+ f.server, &server_calls[i], &call_details[i], &request_metadata_recv[i],
+ f.cq, f.cq, tag(SERVER_START_BASE_TAG + i));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ pending_server_start_calls++;
+ }
+
+ for (int i = 0; i < NUM_CALLS; i++) {
+ client_calls[i] = grpc_channel_create_call(
+ f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo",
+ "foo.test.google.fr", n_seconds_time(60), NULL);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = request_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv[i];
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata =
+ &trailing_metadata_recv[i];
+ op->data.recv_status_on_client.status = &status[i];
+ op->data.recv_status_on_client.status_details = &details[i];
+ op->data.recv_status_on_client.status_details_capacity =
+ &details_capacity[i];
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(client_calls[i], ops, (size_t)(op - ops),
+ tag(CLIENT_BASE_TAG + i), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ pending_client_calls++;
+ }
+
+ while (pending_client_calls + pending_server_recv_calls +
+ pending_server_end_calls >
+ 0) {
+ gpr_log(GPR_DEBUG,
+ "pending: client_calls=%d server_start_calls=%d "
+ "server_recv_calls=%d server_end_calls=%d",
+ pending_client_calls, pending_server_start_calls,
+ pending_server_recv_calls, pending_server_end_calls);
+
+ grpc_event ev = grpc_completion_queue_next(f.cq, n_seconds_time(10), NULL);
+ GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
+
+ int ev_tag = (int)(intptr_t)ev.tag;
+ if (ev_tag < CLIENT_BASE_TAG) {
+ abort(); /* illegal tag */
+ } else if (ev_tag < SERVER_START_BASE_TAG) {
+ /* client call finished */
+ int call_id = ev_tag - CLIENT_BASE_TAG;
+ GPR_ASSERT(call_id >= 0);
+ GPR_ASSERT(call_id < NUM_CALLS);
+ switch (status[call_id]) {
+ case GRPC_STATUS_RESOURCE_EXHAUSTED:
+ cancelled_calls_on_client++;
+ break;
+ case GRPC_STATUS_OK:
+ break;
+ default:
+ gpr_log(GPR_ERROR, "Unexpected status code: %d", status[call_id]);
+ abort();
+ }
+ GPR_ASSERT(pending_client_calls > 0);
+ pending_client_calls--;
+ } else if (ev_tag < SERVER_RECV_BASE_TAG) {
+ /* new incoming call to the server */
+ int call_id = ev_tag - SERVER_START_BASE_TAG;
+ GPR_ASSERT(call_id >= 0);
+ GPR_ASSERT(call_id < NUM_CALLS);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &request_payload_recv[call_id];
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error =
+ grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops),
+ tag(SERVER_RECV_BASE_TAG + call_id), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ GPR_ASSERT(pending_server_start_calls > 0);
+ pending_server_start_calls--;
+ pending_server_recv_calls++;
+ } else if (ev_tag < SERVER_END_BASE_TAG) {
+ /* finished read on the server */
+ int call_id = ev_tag - SERVER_RECV_BASE_TAG;
+ GPR_ASSERT(call_id >= 0);
+ GPR_ASSERT(call_id < NUM_CALLS);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled[call_id];
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ op->data.send_status_from_server.status_details = "xyz";
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error =
+ grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops),
+ tag(SERVER_END_BASE_TAG + call_id), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ GPR_ASSERT(pending_server_recv_calls > 0);
+ pending_server_recv_calls--;
+ pending_server_end_calls++;
+ } else {
+ int call_id = ev_tag - SERVER_END_BASE_TAG;
+ GPR_ASSERT(call_id >= 0);
+ GPR_ASSERT(call_id < NUM_CALLS);
+
+ if (was_cancelled[call_id]) {
+ cancelled_calls_on_server++;
+ }
+ GPR_ASSERT(pending_server_end_calls > 0);
+ pending_server_end_calls--;
+ }
+ }
+
+ gpr_log(
+ GPR_INFO,
+ "Done. %d total calls: %d cancelled at server, %d cancelled at client.",
+ NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client);
+
end_test(&f);
config.tear_down_data(&f);
}
diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c
index dcb4e5896f..a70de7678c 100644
--- a/test/core/util/mock_endpoint.c
+++ b/test/core/util/mock_endpoint.c
@@ -33,6 +33,8 @@
#include "test/core/util/mock_endpoint.h"
+#include <inttypes.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
@@ -88,7 +90,8 @@ static void unref(grpc_exec_ctx *exec_ctx, grpc_mock_endpoint *m) {
}
}
-static void me_finish_shutdown(grpc_exec_ctx *exec_ctx, void *me, grpc_error *error) {
+static void me_finish_shutdown(grpc_exec_ctx *exec_ctx, void *me,
+ grpc_error *error) {
grpc_mock_endpoint *m = me;
unref(exec_ctx, m);
}
@@ -108,7 +111,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
- unref(exec_ctx,m);
+ unref(exec_ctx, m);
}
static char *me_get_peer(grpc_endpoint *ep) {
@@ -139,7 +142,10 @@ grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice),
grpc_mock_endpoint *m = gpr_malloc(sizeof(*m));
m->base.vtable = &vtable;
m->refs = 2;
- grpc_buffer_user_init(&m->buffer_user, buffer_pool);
+ char *name;
+ gpr_asprintf(&name, "mock_endpoint_%" PRIxPTR, (intptr_t)m);
+ grpc_buffer_user_init(&m->buffer_user, buffer_pool, name);
+ gpr_free(name);
gpr_slice_buffer_init(&m->read_buffer);
gpr_mu_init(&m->mu);
m->on_write = on_write;
diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c
index bdf75ce587..a1aaeda916 100644
--- a/test/core/util/passthru_endpoint.c
+++ b/test/core/util/passthru_endpoint.c
@@ -33,6 +33,8 @@
#include "test/core/util/passthru_endpoint.h"
+#include <inttypes.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
@@ -141,7 +143,7 @@ static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep,
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
half *m = (half *)ep;
grpc_buffer_user_shutdown(exec_ctx, &m->buffer_user,
- grpc_closure_create(me_really_destroy, m));
+ grpc_closure_create(me_really_destroy, m));
}
static char *me_get_peer(grpc_endpoint *ep) {
@@ -168,12 +170,16 @@ static const grpc_endpoint_vtable vtable = {
};
static void half_init(half *m, passthru_endpoint *parent,
- grpc_buffer_pool *buffer_pool) {
+ grpc_buffer_pool *buffer_pool, const char *half_name) {
m->base.vtable = &vtable;
m->parent = parent;
gpr_slice_buffer_init(&m->read_buffer);
m->on_read = NULL;
- grpc_buffer_user_init(&m->buffer_user, buffer_pool);
+ char *name;
+ gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name,
+ (intptr_t)parent);
+ grpc_buffer_user_init(&m->buffer_user, buffer_pool, name);
+ gpr_free(name);
}
void grpc_passthru_endpoint_create(grpc_endpoint **client,
@@ -182,8 +188,8 @@ void grpc_passthru_endpoint_create(grpc_endpoint **client,
passthru_endpoint *m = gpr_malloc(sizeof(*m));
m->halves = 2;
m->shutdown = 0;
- half_init(&m->client, m, buffer_pool);
- half_init(&m->server, m, buffer_pool);
+ half_init(&m->client, m, buffer_pool, "client");
+ half_init(&m->server, m, buffer_pool, "server");
gpr_mu_init(&m->mu);
*client = &m->client.base;
*server = &m->server.base;
diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c
index dd444236e9..9bd34677cb 100644
--- a/test/core/util/port_server_client.c
+++ b/test/core/util/port_server_client.c
@@ -49,6 +49,8 @@
#include "src/core/lib/http/httpcli.h"
+int grpc_buffer_pool_trace = 0;
+
typedef struct freereq {
gpr_mu *mu;
grpc_polling_entity pops;
@@ -99,7 +101,8 @@ void grpc_free_port_using_server(char *server, int port) {
req.http.path = path;
grpc_httpcli_context_init(&context);
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool =
+ grpc_buffer_pool_create("port_server_client/free");
grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
grpc_closure_create(freed_port_from_server, &pr), &rsp);
@@ -169,7 +172,8 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg,
req.http.path = "/get";
grpc_http_response_destroy(&pr->response);
memset(&pr->response, 0, sizeof(pr->response));
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool =
+ grpc_buffer_pool_create("port_server_client/pick_retry");
grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, buffer_pool, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
grpc_closure_create(got_port_from_server, pr),
@@ -215,7 +219,8 @@ int grpc_pick_port_using_server(char *server) {
req.http.path = "/get";
grpc_httpcli_context_init(&context);
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_buffer_pool *buffer_pool =
+ grpc_buffer_pool_create("port_server_client/pick");
grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
grpc_closure_create(got_port_from_server, &pr),