diff options
author | Craig Tiller <ctiller@google.com> | 2017-07-05 08:36:04 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-07-05 08:36:04 -0700 |
commit | 39a7327c5f9ad210232b66f755c988760fac91f9 (patch) | |
tree | 80b0d8fd8f35431b846c5ee6cf8fb8b443864e9b /src/core/lib/surface | |
parent | 97e3c5829e0b35068653a0890f0dd0636d364a24 (diff) | |
parent | 7405347cd848f27067b9ce3c029325799ebaa888 (diff) |
Merge github.com:grpc/grpc into cq-drop
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 55 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 2 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/channel.h | 2 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 46 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 6 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 13 | ||||
-rw-r--r-- | src/core/lib/surface/init_secure.c | 10 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 2 |
9 files changed, 79 insertions, 59 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index b499219e17..c769866ceb 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -457,7 +457,7 @@ void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } -#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#ifndef NDEBUG #define REF_REASON reason #define REF_ARG , const char *reason #else @@ -929,33 +929,6 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) { return algorithm; } -static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *b) { - if (b->idx.named.grpc_status != NULL) { - uint32_t status_code = decode_status(b->idx.named.grpc_status->md); - grpc_error *error = - status_code == GRPC_STATUS_OK - ? GRPC_ERROR_NONE - : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Error received from peer"), - GRPC_ERROR_INT_GRPC_STATUS, - (intptr_t)status_code); - - if (b->idx.named.grpc_message != NULL) { - error = grpc_error_set_str( - error, GRPC_ERROR_STR_GRPC_MESSAGE, - grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md))); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); - } else if (error != GRPC_ERROR_NONE) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, - grpc_empty_slice()); - } - - set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status); - } -} - static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, int is_trailing) { if (b->list.count == 0) return; @@ -980,8 +953,6 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch *b) { - recv_common_filter(exec_ctx, call, b); - if (b->idx.named.grpc_encoding != NULL) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); set_incoming_compression_algorithm( @@ -989,7 +960,6 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, GPR_TIMER_END("incoming_compression_algorithm", 0); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); } - if (b->idx.named.grpc_accept_encoding != NULL) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); set_encodings_accepted_by_peer(exec_ctx, call, @@ -997,14 +967,33 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding); GPR_TIMER_END("encodings_accepted_by_peer", 0); } - publish_app_metadata(call, b, false); } static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, grpc_metadata_batch *b) { grpc_call *call = args; - recv_common_filter(exec_ctx, call, b); + if (b->idx.named.grpc_status != NULL) { + uint32_t status_code = decode_status(b->idx.named.grpc_status->md); + grpc_error *error = + status_code == GRPC_STATUS_OK + ? GRPC_ERROR_NONE + : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Error received from peer"), + GRPC_ERROR_INT_GRPC_STATUS, + (intptr_t)status_code); + if (b->idx.named.grpc_message != NULL) { + error = grpc_error_set_str( + error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md))); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); + } else if (error != GRPC_ERROR_NONE) { + error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_empty_slice()); + } + set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status); + } publish_app_metadata(call, b, true); } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 60b661cf8c..185bfccb77 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -62,7 +62,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_completion_queue *cq); -#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#ifndef NDEBUG void grpc_call_internal_ref(grpc_call *call, const char *reason); void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *call, const char *reason); diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 5647dff28b..5780a18ce8 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -345,7 +345,7 @@ grpc_call *grpc_channel_create_registered_call( return call; } -#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#ifndef NDEBUG #define REF_REASON reason #define REF_ARG , const char *reason #else diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 848debc7c5..528bb868e2 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -59,7 +59,7 @@ grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx *exec_ctx, size_t grpc_channel_get_call_size_estimate(grpc_channel *channel); void grpc_channel_update_call_size_estimate(grpc_channel *channel, size_t size); -#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#ifndef NDEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, const char *reason); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 5f83bb5a6b..e5e67a1cfe 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -38,6 +38,7 @@ grpc_tracer_flag grpc_trace_operation_failures = GRPC_TRACER_INITIALIZER(false); #ifndef NDEBUG grpc_tracer_flag grpc_trace_pending_tags = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false); #endif typedef struct { @@ -472,11 +473,15 @@ int grpc_get_cq_poll_num(grpc_completion_queue *cq) { return cur_num_polls; } -#ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, +#ifndef NDEBUG +void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cq, - (int)cq->owning_refs.count, (int)cq->owning_refs.count + 1, reason); + if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1, + reason); + } #else void grpc_cq_internal_ref(grpc_completion_queue *cq) { #endif @@ -489,12 +494,15 @@ static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); } -#ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_unref(grpc_completion_queue *cq, const char *reason, - const char *file, int line) { - cq_data *cqd = &cq->data; - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cq, - (int)cqd->owning_refs.count, (int)cqd->owning_refs.count - 1, reason); +#ifndef NDEBUG +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, + const char *reason, const char *file, int line) { + if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1, + reason); + } #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { @@ -563,7 +571,8 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_NEXT) */ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -639,7 +648,8 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_PLUCK) */ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -743,7 +753,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { gpr_atm_no_barrier_load(&cqd->things_queued_ever); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but that + * might return NULL in some cases even if the queue is not empty; but + * that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ a->stolen_completion = cq_event_queue_pop(&cqd->queue); @@ -838,7 +849,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, /* If c == NULL it means either the queue is empty OR in an transient inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second - attempt at popping. Not doing this can potentially deadlock this thread + attempt at popping. Not doing this can potentially deadlock this + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); @@ -851,8 +863,10 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because - (cq->shutdown == true) is only possible when there is no pending work - (i.e cq->pending_events == 0) and any outstanding grpc_cq_completion + (cq->shutdown == true) is only possible when there is no pending + work + (i.e cq->pending_events == 0) and any outstanding + grpc_cq_completion events are already queued on this cq */ continue; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index e947233307..af44482513 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -30,8 +30,10 @@ extern grpc_tracer_flag grpc_cq_pluck_trace; extern grpc_tracer_flag grpc_cq_event_timeout_trace; extern grpc_tracer_flag grpc_trace_operation_failures; + #ifndef NDEBUG extern grpc_tracer_flag grpc_trace_pending_tags; +extern grpc_tracer_flag grpc_trace_cq_refcount; #endif #ifdef __cplusplus @@ -52,9 +54,7 @@ typedef struct grpc_cq_completion { uintptr_t next; } grpc_cq_completion; -//#define GRPC_CQ_REF_COUNT_DEBUG - -#ifdef GRPC_CQ_REF_COUNT_DEBUG +#ifndef NDEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 01a1f33db2..14a86bfa0a 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -126,18 +126,23 @@ void grpc_init(void) { grpc_register_tracer("channel_stack_builder", &grpc_trace_channel_stack_builder); grpc_register_tracer("http1", &grpc_http1_trace); - grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); + grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); // default on grpc_register_tracer("combiner", &grpc_combiner_trace); grpc_register_tracer("server_channel", &grpc_server_channel_trace); grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace); - // Default pluck trace to 1 - grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace); - // Default timeout trace to 1 + grpc_register_tracer("queue_timeout", + &grpc_cq_event_timeout_trace); // default on grpc_register_tracer("op_failure", &grpc_trace_operation_failures); grpc_register_tracer("resource_quota", &grpc_resource_quota_trace); grpc_register_tracer("call_error", &grpc_call_error_trace); #ifndef NDEBUG grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); + grpc_register_tracer("queue_refcount", &grpc_trace_cq_refcount); + grpc_register_tracer("closure", &grpc_trace_closure); + grpc_register_tracer("error_refcount", &grpc_trace_error_refcount); + grpc_register_tracer("stream_refcount", &grpc_trace_stream_refcount); + grpc_register_tracer("fd_refcount", &grpc_trace_fd_refcount); + grpc_register_tracer("metadata", &grpc_trace_metadata); #endif grpc_security_pre_init(); grpc_iomgr_init(&exec_ctx); diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index fb6635716d..7dbea581d0 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -32,9 +32,19 @@ #include "src/core/lib/surface/channel_init.h" #include "src/core/tsi/transport_security_interface.h" +#ifndef NDEBUG +#include "src/core/lib/security/context/security_context.h" +#endif + void grpc_security_pre_init(void) { grpc_register_tracer("secure_endpoint", &grpc_trace_secure_endpoint); grpc_register_tracer("transport_security", &tsi_tracing_enabled); +#ifndef NDEBUG + grpc_register_tracer("auth_context_refcount", + &grpc_trace_auth_context_refcount); + grpc_register_tracer("security_connector_refcount", + &grpc_trace_security_connector_refcount); +#endif } static bool maybe_prepend_client_auth_filter( diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 592ee1ff4d..19177a46c9 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -475,6 +475,7 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, *rc->data.registered.deadline = calld->deadline; if (rc->data.registered.optional_payload) { *rc->data.registered.optional_payload = calld->payload; + calld->payload = NULL; } break; default: @@ -878,6 +879,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_slice_unref_internal(exec_ctx, calld->path); } grpc_metadata_array_destroy(&calld->initial_metadata); + grpc_byte_buffer_destroy(calld->payload); gpr_mu_destroy(&calld->mu_state); |