From ef6b97659edb575a002d574db89d90f7ebf4b979 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 28 Sep 2016 08:37:51 -0700 Subject: Add tracing, fix some transport bugs wrt buffer_pools --- .../transport/chttp2/transport/chttp2_transport.c | 24 ++++++-- src/core/lib/iomgr/buffer_pool.c | 66 ++++++++++++++++++++-- src/core/lib/iomgr/buffer_pool.h | 6 +- src/core/lib/iomgr/tcp_client_posix.c | 2 +- src/core/lib/iomgr/tcp_posix.c | 2 +- src/core/lib/iomgr/tcp_server_posix.c | 2 +- .../google_default/google_default_credentials.c | 3 +- .../lib/security/credentials/jwt/jwt_verifier.c | 4 +- .../credentials/oauth2/oauth2_credentials.c | 5 +- src/core/lib/surface/init.c | 2 + 10 files changed, 97 insertions(+), 19 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 3ebb467332..13241f6abe 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -257,7 +257,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t); grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t); grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t); - grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t); + grpc_closure_init(&t->destructive_reclaimer_locked, + destructive_reclaimer_locked, t); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); @@ -2124,10 +2125,21 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport *t = arg; if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", + t->peer_string); + } send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM, gpr_slice_from_static_string("Buffers full")); + } else if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, + "HTTP2: %s - skip benign reclaimation, there are still %" PRIdPTR + " streams", + t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); } t->benign_reclaimer_registered = false; + grpc_buffer_user_finish_reclaimation(exec_ctx, + grpc_endpoint_get_buffer_user(t->ep)); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); } @@ -2138,18 +2150,20 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, t->destructive_reclaimer_registered = false; if (error == GRPC_ERROR_NONE && n > 0) { grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map); + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string, + s->id); + } grpc_chttp2_cancel_stream( exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), GRPC_ERROR_INT_HTTP2_ERROR, GRPC_CHTTP2_ENHANCE_YOUR_CALM)); if (n > 1) { post_destructive_reclaimer(exec_ctx, t); - t->destructive_reclaimer_registered = true; - grpc_buffer_user_post_reclaimer(exec_ctx, - grpc_endpoint_get_buffer_user(t->ep), - true, &t->destructive_reclaimer); } } + grpc_buffer_user_finish_reclaimation(exec_ctx, + grpc_endpoint_get_buffer_user(t->ep)); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); } diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c index 88f37c9fc1..cfa171684b 100644 --- a/src/core/lib/iomgr/buffer_pool.c +++ b/src/core/lib/iomgr/buffer_pool.c @@ -37,6 +37,7 @@ #include #include +#include #include #include "src/core/lib/iomgr/combiner.h" @@ -62,6 +63,8 @@ struct grpc_buffer_pool { grpc_closure bpreclaimation_done_closure; grpc_buffer_user *roots[GRPC_BULIST_COUNT]; + + char *name; }; /******************************************************************************* @@ -175,8 +178,18 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { gpr_mu_lock(&buffer_user->mu); if (buffer_user->free_pool < 0 && -buffer_user->free_pool <= buffer_pool->free_pool) { - buffer_pool->free_pool += buffer_user->free_pool; + int64_t amt = -buffer_user->free_pool; buffer_user->free_pool = 0; + buffer_pool->free_pool -= amt; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64 + " bytes; bp_free_pool -> %" PRId64, + buffer_pool->name, buffer_user->name, amt, + buffer_pool->free_pool); + } + } else if (grpc_buffer_pool_trace && buffer_user->free_pool >= 0) { + gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request", + buffer_pool->name, buffer_user->name); } if (buffer_user->free_pool >= 0) { buffer_user->allocating = false; @@ -198,8 +211,15 @@ static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { bulist_pop(buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL))) { gpr_mu_lock(&buffer_user->mu); if (buffer_user->free_pool > 0) { - buffer_pool->free_pool += buffer_user->free_pool; + int64_t amt = buffer_user->free_pool; buffer_user->free_pool = 0; + buffer_pool->free_pool += amt; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64 + " bytes; bp_free_pool -> %" PRId64, + buffer_pool->name, buffer_user->name, amt, + buffer_pool->free_pool); + } gpr_mu_unlock(&buffer_user->mu); return true; } else { @@ -217,6 +237,10 @@ static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool, : GRPC_BULIST_RECLAIMER_BENIGN; grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list); if (buffer_user == NULL) return false; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", buffer_pool->name, + buffer_user->name, destructive ? "destructive" : "benign"); + } buffer_pool->reclaiming = true; grpc_closure *c = buffer_user->reclaimers[destructive]; buffer_user->reclaimers[destructive] = NULL; @@ -384,7 +408,7 @@ static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp, * grpc_buffer_pool api */ -grpc_buffer_pool *grpc_buffer_pool_create(void) { +grpc_buffer_pool *grpc_buffer_pool_create(const char *name) { grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool)); gpr_ref_init(&buffer_pool->refs, 1); buffer_pool->combiner = grpc_combiner_create(NULL); @@ -392,6 +416,12 @@ grpc_buffer_pool *grpc_buffer_pool_create(void) { buffer_pool->size = INT64_MAX; buffer_pool->step_scheduled = false; buffer_pool->reclaiming = false; + if (name != NULL) { + buffer_pool->name = gpr_strdup(name); + } else { + gpr_asprintf(&buffer_pool->name, "anonymous_pool_%" PRIxPTR, + (intptr_t)buffer_pool); + } grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool); grpc_closure_init(&buffer_pool->bpreclaimation_done_closure, bp_reclaimation_done, buffer_pool); @@ -451,7 +481,7 @@ grpc_buffer_pool *grpc_buffer_pool_from_channel_args( } } } - return grpc_buffer_pool_create(); + return grpc_buffer_pool_create(NULL); } static void *bp_copy(void *bp) { @@ -473,7 +503,7 @@ const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) { */ void grpc_buffer_user_init(grpc_buffer_user *buffer_user, - grpc_buffer_pool *buffer_pool) { + grpc_buffer_pool *buffer_pool, const char *name) { buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user); grpc_closure_init(&buffer_user->add_to_free_pool_closure, @@ -498,6 +528,12 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user, #ifndef NDEBUG buffer_user->asan_canary = gpr_malloc(1); #endif + if (name != NULL) { + buffer_user->name = gpr_strdup(name); + } else { + gpr_asprintf(&buffer_user->name, "anonymous_buffer_user_%" PRIxPTR, + (intptr_t)buffer_user); + } } void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, @@ -533,6 +569,10 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, &buffer_user->on_done_destroy_closure); if (on_done_destroy != NULL) { /* already shutdown */ + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown", + buffer_user->buffer_pool->name, buffer_user->name, size); + } grpc_exec_ctx_sched( exec_ctx, optional_on_done, GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL); @@ -541,6 +581,12 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, } buffer_user->allocated += (int64_t)size; buffer_user->free_pool -= (int64_t)size; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 + ", free_pool -> %" PRId64, + buffer_user->buffer_pool->name, buffer_user->name, size, + buffer_user->allocated, buffer_user->free_pool); + } if (buffer_user->free_pool < 0) { grpc_closure_list_append(&buffer_user->on_allocated, optional_on_done, GRPC_ERROR_NONE); @@ -563,6 +609,12 @@ void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, bool was_zero_or_negative = buffer_user->free_pool <= 0; buffer_user->free_pool += (int64_t)size; buffer_user->allocated -= (int64_t)size; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64 + ", free_pool -> %" PRId64, + buffer_user->buffer_pool->name, buffer_user->name, size, + buffer_user->allocated, buffer_user->free_pool); + } bool is_bigger_than_zero = buffer_user->free_pool > 0; if (is_bigger_than_zero && was_zero_or_negative && !buffer_user->added_to_free_pool) { @@ -597,6 +649,10 @@ void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx, void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user) { + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete", + buffer_user->buffer_pool->name, buffer_user->name); + } grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, &buffer_user->buffer_pool->bpreclaimation_done_closure, GRPC_ERROR_NONE, false); diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h index 301e059c18..1564872b5d 100644 --- a/src/core/lib/iomgr/buffer_pool.h +++ b/src/core/lib/iomgr/buffer_pool.h @@ -38,6 +38,8 @@ #include "src/core/lib/iomgr/exec_ctx.h" +extern int grpc_buffer_pool_trace; + grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool); void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool); @@ -83,10 +85,12 @@ struct grpc_buffer_user { gpr_atm on_done_destroy_closure; grpc_buffer_user_link links[GRPC_BULIST_COUNT]; + + char *name; }; void grpc_buffer_user_init(grpc_buffer_user *buffer_user, - grpc_buffer_pool *buffer_pool); + grpc_buffer_pool *buffer_pool, const char *name); void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, grpc_closure *on_done); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index cf01623f09..4e8f9e53bb 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -125,7 +125,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd( grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, const char *addr_str) { size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(NULL); if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 812c39235b..120622e817 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -544,7 +544,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool, tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; gpr_slice_buffer_init(&tcp->last_read_buffer); - grpc_buffer_user_init(&tcp->buffer_user, buffer_pool); + grpc_buffer_user_init(&tcp->buffer_user, buffer_pool, peer_string); grpc_buffer_user_slice_allocator_init( &tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 3304152385..4d36fd4caf 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -163,7 +163,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; - s->buffer_pool = grpc_buffer_pool_create(); + s->buffer_pool = grpc_buffer_pool_create(NULL); for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index 3bcde3da8b..4e703aa9f4 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -124,7 +124,8 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = + grpc_buffer_pool_create("google_default_credentials"); grpc_httpcli_get( &exec_ctx, &context, &detector.pollent, buffer_pool, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index c1a3eb7eab..ffcd0b3910 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -660,7 +660,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier"); grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), @@ -772,7 +772,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier"); grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index e9e83d1468..61c0815b2a 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -310,7 +310,7 @@ static void compute_engine_fetch_oauth2( /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("oauth2_credentials"); grpc_httpcli_get(exec_ctx, httpcli_context, pollent, buffer_pool, &request, deadline, grpc_closure_create(response_cb, metadata_req), &metadata_req->response); @@ -365,7 +365,8 @@ static void refresh_token_fetch_oauth2( /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = + grpc_buffer_pool_create("oauth2_credentials_refresh"); grpc_httpcli_post(exec_ctx, httpcli_context, pollent, buffer_pool, &request, body, strlen(body), deadline, grpc_closure_create(response_cb, metadata_req), diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 3cbbaa7b0c..de913af4ee 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -48,6 +48,7 @@ #include "src/core/lib/channel/message_size_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/buffer_pool.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" @@ -184,6 +185,7 @@ void grpc_init(void) { // Default timeout trace to 1 grpc_cq_event_timeout_trace = 1; grpc_register_tracer("op_failure", &grpc_trace_operation_failures); + grpc_register_tracer("buffer_pool", &grpc_buffer_pool_trace); #ifndef NDEBUG grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); #endif -- cgit v1.2.3