diff options
author | 2017-08-31 10:54:11 -0700 | |
---|---|---|
committer | 2017-08-31 10:54:11 -0700 | |
commit | 8fd40d5ed9303c693865ff431b7565173a4b898e (patch) | |
tree | 5d3f6ce958de74e79a8bbaf71b7eb8bbd8acfecd /src/core/lib/surface | |
parent | c9c78ee96c5a73234513decd398a63d48f14aa89 (diff) | |
parent | 8d51e8d17e012f81ca8e94c18f525e1781130481 (diff) |
Merge github.com:grpc/grpc into wc
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/alarm.c | 76 | ||||
-rw-r--r-- | src/core/lib/surface/alarm_internal.h | 40 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 323 | ||||
-rw-r--r-- | src/core/lib/surface/call_test_only.h | 12 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 29 | ||||
-rw-r--r-- | src/core/lib/surface/channel_ping.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 151 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 5 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 14 | ||||
-rw-r--r-- | src/core/lib/surface/version.c | 2 |
11 files changed, 529 insertions, 127 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index ef8405cca8..7d60b1de17 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -15,13 +15,21 @@ * limitations under the License. * */ +#include "src/core/lib/surface/alarm_internal.h" #include <grpc/grpc.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" +#ifndef NDEBUG +grpc_tracer_flag grpc_trace_alarm_refcount = + GRPC_TRACER_INITIALIZER(false, "alarm_refcount"); +#endif + struct grpc_alarm { + gpr_refcount refs; grpc_timer alarm; grpc_closure on_alarm; grpc_cq_completion completion; @@ -31,13 +39,58 @@ struct grpc_alarm { void *tag; }; -static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, - grpc_cq_completion *c) {} +static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); } + +static void alarm_unref(grpc_alarm *alarm) { + if (gpr_unref(&alarm->refs)) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(alarm); + } +} + +#ifndef NDEBUG +static void alarm_ref_dbg(grpc_alarm *alarm, const char *reason, + const char *file, int line) { + if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { + gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "Alarm:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", alarm, val, + val + 1, reason); + } + + alarm_ref(alarm); +} + +static void alarm_unref_dbg(grpc_alarm *alarm, const char *reason, + const char *file, int line) { + if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { + gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "Alarm:%p Unref %" PRIdPTR " -> %" PRIdPTR " %s", alarm, val, + val - 1, reason); + } + + alarm_unref(alarm); +} +#endif + +static void alarm_end_completion(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *c) { + grpc_alarm *alarm = arg; + GRPC_ALARM_UNREF(alarm, "dequeue-end-op"); +} static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_alarm *alarm = arg; - grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, - do_nothing_end_completion, NULL, &alarm->completion); + + /* We are queuing an op on completion queue. This means, the alarm's structure + cannot be destroyed until the op is dequeued. Adding an extra ref + here and unref'ing when the op is dequeued will achieve this */ + GRPC_ALARM_REF(alarm, "queue-end-op"); + grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, alarm_end_completion, + (void *)alarm, &alarm->completion); } grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, @@ -45,11 +98,19 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm)); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_ref_init(&alarm->refs, 1); + +#ifndef NDEBUG + if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { + gpr_log(GPR_DEBUG, "Alarm:%p created (ref: 1)", alarm); + } +#endif + GRPC_CQ_INTERNAL_REF(cq, "alarm"); alarm->cq = cq; alarm->tag = tag; - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, @@ -66,9 +127,6 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { } void grpc_alarm_destroy(grpc_alarm *alarm) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_alarm_cancel(alarm); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); - gpr_free(alarm); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_ALARM_UNREF(alarm, "alarm_destroy"); } diff --git a/src/core/lib/surface/alarm_internal.h b/src/core/lib/surface/alarm_internal.h new file mode 100644 index 0000000000..7f2126c5c9 --- /dev/null +++ b/src/core/lib/surface/alarm_internal.h @@ -0,0 +1,40 @@ +/* + * + * Copyright 2015-2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H +#define GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H + +#include <grpc/support/log.h> +#include "src/core/lib/debug/trace.h" + +#ifndef NDEBUG + +extern grpc_tracer_flag grpc_trace_alarm_refcount; + +#define GRPC_ALARM_REF(a, reason) alarm_ref_dbg(a, reason, __FILE__, __LINE__) +#define GRPC_ALARM_UNREF(a, reason) \ + alarm_unref_dbg(a, reason, __FILE__, __LINE__) + +#else /* !defined(NDEBUG) */ + +#define GRPC_ALARM_REF(a, reason) alarm_ref(a) +#define GRPC_ALARM_UNREF(a, reason) alarm_unref(a) + +#endif /* defined(NDEBUG) */ + +#endif /* GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H */ diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2365d27307..62d542d2c5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -144,6 +144,9 @@ typedef struct { grpc_call *sibling_prev; } child_call; +#define RECV_NONE ((gpr_atm)0) +#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1) + struct grpc_call { gpr_refcount ext_ref; gpr_arena *arena; @@ -170,9 +173,6 @@ struct grpc_call { gpr_atm any_ops_sent_atm; gpr_atm received_final_op_atm; - /* have we received initial metadata */ - bool has_initial_md_been_received; - batch_control *active_batches[MAX_CONCURRENT_BATCHES]; grpc_transport_stream_op_batch_payload stream_op_payload; @@ -192,8 +192,12 @@ struct grpc_call { /* Compression algorithm for *incoming* data */ grpc_compression_algorithm incoming_compression_algorithm; + /* Stream compression algorithm for *incoming* data */ + grpc_stream_compression_algorithm incoming_stream_compression_algorithm; /* Supported encodings (compression algorithms), a bitset */ uint32_t encodings_accepted_by_peer; + /* Supported stream encodings (stream compression algorithms), a bitset */ + uint32_t stream_encodings_accepted_by_peer; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -226,7 +230,23 @@ struct grpc_call { } server; } final_op; - void *saved_receiving_stream_ready_bctlp; + /* recv_state can contain one of the following values: + RECV_NONE : : no initial metadata and messages received + RECV_INITIAL_METADATA_FIRST : received initial metadata first + a batch_control* : received messages first + + +------1------RECV_NONE------3-----+ + | | + | | + v v + RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp + | ^ | ^ + | | | | + +-----2-----+ +-----4-----+ + + For 1, 4: See receiving_initial_metadata_ready() function + For 2, 3: See receiving_stream_ready() function */ + gpr_atm recv_state; }; grpc_tracer_flag grpc_call_error_trace = @@ -644,6 +664,8 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, static grpc_error *error_from_status(grpc_status_code status, const char *description) { + // copying 'description' is needed to ensure the grpc_call_cancel_with_status + // guarantee that can be short-lived. return grpc_error_set_int( grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description), GRPC_ERROR_STR_GRPC_MESSAGE, @@ -750,6 +772,12 @@ static void set_incoming_compression_algorithm( call->incoming_compression_algorithm = algo; } +static void set_incoming_stream_compression_algorithm( + grpc_call *call, grpc_stream_compression_algorithm algo) { + GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT); + call->incoming_stream_compression_algorithm = algo; +} + grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_call *call) { grpc_compression_algorithm algorithm; @@ -763,6 +791,13 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked( call->encodings_accepted_by_peer); } +static grpc_stream_compression_algorithm +stream_compression_algorithm_for_level_locked( + grpc_call *call, grpc_stream_compression_level level) { + return grpc_stream_compression_algorithm_for_level( + level, call->stream_encodings_accepted_by_peer); +} + uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { uint32_t flags; flags = call->test_only_last_message_flags; @@ -817,13 +852,71 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1)); } +static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, + grpc_call *call, + grpc_mdelem mdel) { + size_t i; + grpc_stream_compression_algorithm algorithm; + grpc_slice_buffer accept_encoding_parts; + grpc_slice accept_encoding_slice; + void *accepted_user_data; + + accepted_user_data = + grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer); + if (accepted_user_data != NULL) { + call->stream_encodings_accepted_by_peer = + (uint32_t)(((uintptr_t)accepted_user_data) - 1); + return; + } + + accept_encoding_slice = GRPC_MDVALUE(mdel); + grpc_slice_buffer_init(&accept_encoding_parts); + grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); + + /* Always support no compression */ + GPR_BITSET(&call->stream_encodings_accepted_by_peer, + GRPC_STREAM_COMPRESS_NONE); + for (i = 0; i < accept_encoding_parts.count; i++) { + grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i]; + if (grpc_stream_compression_algorithm_parse(accept_encoding_entry_slice, + &algorithm)) { + GPR_BITSET(&call->stream_encodings_accepted_by_peer, algorithm); + } else { + char *accept_encoding_entry_str = + grpc_slice_to_c_string(accept_encoding_entry_slice); + gpr_log(GPR_ERROR, + "Invalid entry in accept encoding metadata: '%s'. Ignoring.", + accept_encoding_entry_str); + gpr_free(accept_encoding_entry_str); + } + } + + grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts); + + grpc_mdelem_set_user_data( + mdel, destroy_encodings_accepted_by_peer, + (void *)(((uintptr_t)call->stream_encodings_accepted_by_peer) + 1)); +} + uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { uint32_t encodings_accepted_by_peer; encodings_accepted_by_peer = call->encodings_accepted_by_peer; return encodings_accepted_by_peer; } -static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { +uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer( + grpc_call *call) { + uint32_t stream_encodings_accepted_by_peer; + stream_encodings_accepted_by_peer = call->stream_encodings_accepted_by_peer; + return stream_encodings_accepted_by_peer; +} + +grpc_stream_compression_algorithm +grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) { + return call->incoming_stream_compression_algorithm; +} + +static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) { return (grpc_linked_mdelem *)&md->internal_data; } @@ -847,7 +940,7 @@ static int prepare_application_metadata( for (i = 0; i < total_count; i++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); - grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + grpc_linked_mdelem *l = linked_from_md(md); GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); if (!GRPC_LOG_IF_ERROR("validate_metadata", grpc_validate_header_key_is_legal(md->key))) { @@ -864,7 +957,7 @@ static int prepare_application_metadata( for (int j = 0; j < i; j++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, j, count); - grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + grpc_linked_mdelem *l = linked_from_md(md); GRPC_MDELEM_UNREF(exec_ctx, l->md); } return 0; @@ -882,9 +975,12 @@ static int prepare_application_metadata( } for (i = 0; i < total_count; i++) { grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); - GRPC_LOG_IF_ERROR( - "prepare_application_metadata", - grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md))); + grpc_linked_mdelem *l = linked_from_md(md); + grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l); + if (error != GRPC_ERROR_NONE) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + } + GRPC_LOG_IF_ERROR("prepare_application_metadata", error); } call->send_extra_metadata_count = 0; @@ -931,6 +1027,22 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) { return algorithm; } +static grpc_stream_compression_algorithm decode_stream_compression( + grpc_mdelem md) { + grpc_stream_compression_algorithm algorithm = + grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md)); + if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) { + char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + gpr_log(GPR_ERROR, + "Invalid incoming stream compression algorithm: '%s'. Interpreting " + "incoming data as uncompressed.", + md_c_str); + gpr_free(md_c_str); + return GRPC_STREAM_COMPRESS_NONE; + } + return algorithm; +} + static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, int is_trailing) { if (b->list.count == 0) return; @@ -955,7 +1067,19 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch *b) { - if (b->idx.named.grpc_encoding != NULL) { + if (b->idx.named.content_encoding != NULL) { + if (b->idx.named.grpc_encoding != NULL) { + gpr_log(GPR_ERROR, + "Received both content-encoding and grpc-encoding header. " + "Ignoring grpc-encoding."); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); + } + GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0); + set_incoming_stream_compression_algorithm( + call, decode_stream_compression(b->idx.named.content_encoding->md)); + GPR_TIMER_END("incoming_stream_compression_algorithm", 0); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding); + } else if (b->idx.named.grpc_encoding != NULL) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); set_incoming_compression_algorithm( call, decode_compression(b->idx.named.grpc_encoding->md)); @@ -969,6 +1093,13 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding); GPR_TIMER_END("encodings_accepted_by_peer", 0); } + if (b->idx.named.accept_encoding != NULL) { + GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0); + set_stream_encodings_accepted_by_peer(exec_ctx, call, + b->idx.named.accept_encoding->md); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding); + GPR_TIMER_END("stream_encodings_accepted_by_peer", 0); + } publish_app_metadata(call, b, false); } @@ -1285,19 +1416,61 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } - if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || - call->receiving_stream == NULL) { + /* If recv_state is RECV_NONE, we will save the batch_control + * object with rel_cas, and will not use it after the cas. Its corresponding + * acq_load is in receiving_initial_metadata_ready() */ + if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL || + !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) { process_data_after_md(exec_ctx, bctlp); - } else { - call->saved_receiving_stream_ready_bctlp = bctlp; } } static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; - /* validate call->incoming_compression_algorithm */ - if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) { + /* validate compression algorithms */ + if (call->incoming_stream_compression_algorithm != + GRPC_STREAM_COMPRESS_NONE) { + const grpc_stream_compression_algorithm algo = + call->incoming_stream_compression_algorithm; + char *error_msg = NULL; + const grpc_compression_options compression_options = + grpc_channel_compression_options(call->channel); + if (algo >= GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) { + gpr_asprintf(&error_msg, + "Invalid stream compression algorithm value '%d'.", algo); + gpr_log(GPR_ERROR, "%s", error_msg); + cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, + GRPC_STATUS_UNIMPLEMENTED, error_msg); + } else if (grpc_compression_options_is_stream_compression_algorithm_enabled( + &compression_options, algo) == 0) { + /* check if algorithm is supported by current channel config */ + char *algo_name = NULL; + grpc_stream_compression_algorithm_name(algo, &algo_name); + gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.", + algo_name); + gpr_log(GPR_ERROR, "%s", error_msg); + cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, + GRPC_STATUS_UNIMPLEMENTED, error_msg); + } + gpr_free(error_msg); + + GPR_ASSERT(call->stream_encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->stream_encodings_accepted_by_peer, + call->incoming_stream_compression_algorithm)) { + if (GRPC_TRACER_ON(grpc_compression_trace)) { + char *algo_name = NULL; + grpc_stream_compression_algorithm_name( + call->incoming_stream_compression_algorithm, &algo_name); + gpr_log( + GPR_ERROR, + "Stream compression algorithm (content-encoding = '%s') not " + "present in the bitset of accepted encodings (accept-encodings: " + "'0x%x')", + algo_name, call->stream_encodings_accepted_by_peer); + } + } + } else if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) { const grpc_compression_algorithm algo = call->incoming_compression_algorithm; char *error_msg = NULL; @@ -1324,22 +1497,20 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, call->incoming_compression_algorithm = algo; } gpr_free(error_msg); - } - /* make sure the received grpc-encoding is amongst the ones listed in - * grpc-accept-encoding */ - GPR_ASSERT(call->encodings_accepted_by_peer != 0); - if (!GPR_BITGET(call->encodings_accepted_by_peer, - call->incoming_compression_algorithm)) { - if (GRPC_TRACER_ON(grpc_compression_trace)) { - char *algo_name = NULL; - grpc_compression_algorithm_name(call->incoming_compression_algorithm, - &algo_name); - gpr_log(GPR_ERROR, - "Compression algorithm (grpc-encoding = '%s') not present in " - "the bitset of accepted encodings (grpc-accept-encodings: " - "'0x%x')", - algo_name, call->encodings_accepted_by_peer); + GPR_ASSERT(call->encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->encodings_accepted_by_peer, + call->incoming_compression_algorithm)) { + if (GRPC_TRACER_ON(grpc_compression_trace)) { + char *algo_name = NULL; + grpc_compression_algorithm_name(call->incoming_compression_algorithm, + &algo_name); + gpr_log(GPR_ERROR, + "Compression algorithm (grpc-encoding = '%s') not present in " + "the bitset of accepted encodings (grpc-accept-encodings: " + "'0x%x')", + algo_name, call->encodings_accepted_by_peer); + } } } } @@ -1379,12 +1550,31 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } } - call->has_initial_md_been_received = true; - if (call->saved_receiving_stream_ready_bctlp != NULL) { - grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE( - receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, - grpc_schedule_on_exec_ctx); - call->saved_receiving_stream_ready_bctlp = NULL; + grpc_closure *saved_rsr_closure = NULL; + while (true) { + gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state); + /* Should only receive initial metadata once */ + GPR_ASSERT(rsr_bctlp != 1); + if (rsr_bctlp == 0) { + /* We haven't seen initial metadata and messages before, thus initial + * metadata is received first. + * no_barrier_cas is used, as this function won't access the batch_control + * object saved by receiving_stream_ready() if the initial metadata is + * received first. */ + if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE, + RECV_INITIAL_METADATA_FIRST)) { + break; + } + } else { + /* Already received messages */ + saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready, + (batch_control *)rsr_bctlp, + grpc_schedule_on_exec_ctx); + /* No need to modify recv_state */ + break; + } + } + if (saved_rsr_closure != NULL) { GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } @@ -1422,7 +1612,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1466,29 +1656,56 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, /* process compression level */ memset(&compression_md, 0, sizeof(compression_md)); size_t additional_metadata_count = 0; - grpc_compression_level effective_compression_level; + grpc_compression_level effective_compression_level = + GRPC_COMPRESS_LEVEL_NONE; + grpc_stream_compression_level effective_stream_compression_level = + GRPC_STREAM_COMPRESS_LEVEL_NONE; bool level_set = false; - if (op->data.send_initial_metadata.maybe_compression_level.is_set) { + bool stream_compression = false; + if (op->data.send_initial_metadata.maybe_stream_compression_level + .is_set) { + effective_stream_compression_level = + op->data.send_initial_metadata.maybe_stream_compression_level + .level; + level_set = true; + stream_compression = true; + } else if (op->data.send_initial_metadata.maybe_compression_level + .is_set) { effective_compression_level = op->data.send_initial_metadata.maybe_compression_level.level; level_set = true; } else { const grpc_compression_options copts = grpc_channel_compression_options(call->channel); - level_set = copts.default_level.is_set; - if (level_set) { + if (copts.default_stream_compression_level.is_set) { + level_set = true; + effective_stream_compression_level = + copts.default_stream_compression_level.level; + stream_compression = true; + } else if (copts.default_level.is_set) { + level_set = true; effective_compression_level = copts.default_level.level; } } if (level_set && !call->is_client) { - const grpc_compression_algorithm calgo = - compression_algorithm_for_level_locked( - call, effective_compression_level); - // the following will be picked up by the compress filter and used as - // the call's compression algorithm. - compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; - compression_md.value = grpc_compression_algorithm_slice(calgo); - additional_metadata_count++; + if (stream_compression) { + const grpc_stream_compression_algorithm calgo = + stream_compression_algorithm_for_level_locked( + call, effective_stream_compression_level); + compression_md.key = + GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST; + compression_md.value = + grpc_stream_compression_algorithm_slice(calgo); + } else { + const grpc_compression_algorithm calgo = + compression_algorithm_for_level_locked( + call, effective_compression_level); + /* the following will be picked up by the compress filter and used + * as the call's compression algorithm. */ + compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; + compression_md.value = grpc_compression_algorithm_slice(calgo); + additional_metadata_count++; + } } if (op->data.send_initial_metadata.count + additional_metadata_count > @@ -1723,7 +1940,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); @@ -1844,6 +2061,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) { return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; + case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN: + return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN"; case GRPC_CALL_OK: return "GRPC_CALL_OK"; } diff --git a/src/core/lib/surface/call_test_only.h b/src/core/lib/surface/call_test_only.h index 2f1b80bfd7..a5a01b3679 100644 --- a/src/core/lib/surface/call_test_only.h +++ b/src/core/lib/surface/call_test_only.h @@ -42,6 +42,18 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call); * To be indexed by grpc_compression_algorithm enum values. */ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call); +/** Returns a bitset for the stream encodings (stream compression algorithms) + * supported by \a call's peer. + * + * To be indexed by grpc_stream_compression_algorithm enum values. */ +uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer( + grpc_call *call); + +/** Returns the incoming stream compression algorithm (content-encoding header) + * received by a call. */ +grpc_stream_compression_algorithm +grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call); + #ifdef __cplusplus } #endif diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 5780a18ce8..850fbe6a69 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -142,6 +142,16 @@ grpc_channel *grpc_channel_create_with_builder( GRPC_COMPRESS_LEVEL_NONE, GRPC_COMPRESS_LEVEL_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, + GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) { + channel->compression_options.default_stream_compression_level.is_set = + true; + channel->compression_options.default_stream_compression_level.level = + (grpc_stream_compression_level)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){GRPC_STREAM_COMPRESS_LEVEL_NONE, + GRPC_STREAM_COMPRESS_LEVEL_NONE, + GRPC_STREAM_COMPRESS_LEVEL_COUNT - 1}); + } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { channel->compression_options.default_algorithm.is_set = true; channel->compression_options.default_algorithm.algorithm = @@ -149,12 +159,31 @@ grpc_channel *grpc_channel_create_with_builder( &args->args[i], (grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_ALGORITHMS_COUNT - 1}); + } else if (0 == strcmp(args->args[i].key, + GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { + channel->compression_options.default_stream_compression_algorithm.is_set = + true; + channel->compression_options.default_stream_compression_algorithm + .algorithm = + (grpc_stream_compression_algorithm)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){ + GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE, + GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { channel->compression_options.enabled_algorithms_bitset = (uint32_t)args->args[i].value.integer | 0x1; /* always support no compression */ + } else if (0 == + strcmp( + args->args[i].key, + GRPC_STREAM_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { + channel->compression_options + .enabled_stream_compression_algorithms_bitset = + (uint32_t)args->args[i].value.integer | + 0x1; /* always support no compression */ } } diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 80eb80af78..e85b308850 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 978d7b4171..10e4e5ab0c 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -196,7 +196,7 @@ typedef struct cq_vtable { void (*init)(void *data); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); void (*destroy)(void *data); - void (*begin_op)(grpc_completion_queue *cq, void *tag); + bool (*begin_op)(grpc_completion_queue *cq, void *tag); void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -235,7 +235,8 @@ typedef struct cq_next_data { /* Number of outstanding events (+1 if not shut down) */ gpr_atm pending_events; - int shutdown_called; + /** 0 initially. 1 once we initiated shutdown */ + bool shutdown_called; } cq_next_data; typedef struct cq_pluck_data { @@ -244,15 +245,20 @@ typedef struct cq_pluck_data { grpc_cq_completion *completed_tail; /** Number of pending events (+1 if we're not shutdown) */ - gpr_refcount pending_events; + gpr_atm pending_events; /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; - /** 0 initially, 1 once we've begun shutting down */ + /** 0 initially. 1 once we completed shutting */ + /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if + * (pending_events == 0). So consider removing this in future and use + * pending_events */ gpr_atm shutdown; - int shutdown_called; + + /** 0 initially. 1 once we initiated shutdown */ + bool shutdown_called; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; @@ -288,8 +294,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -436,7 +442,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( static void cq_init_next(void *ptr) { cq_next_data *cqd = ptr; - /* Initial ref is dropped by grpc_completion_queue_shutdown */ + /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); cqd->shutdown_called = false; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); @@ -451,12 +457,12 @@ static void cq_destroy_next(void *ptr) { static void cq_init_pluck(void *ptr) { cq_pluck_data *cqd = ptr; - /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cqd->pending_events, 1); + /* Initial count is dropped by grpc_completion_queue_shutdown */ + gpr_atm_no_barrier_store(&cqd->pending_events, 1); cqd->completed_tail = &cqd->completed_head; cqd->completed_head.next = (uintptr_t)cqd->completed_tail; gpr_atm_no_barrier_store(&cqd->shutdown, 0); - cqd->shutdown_called = 0; + cqd->shutdown_called = false; cqd->num_pluckers = 0; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); } @@ -522,33 +528,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, } } -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { - cq_next_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); -} - -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_ref(&cqd->pending_events); -} - -void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { -#ifndef NDEBUG - gpr_mu_lock(cq->mu); - if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { - cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); - cq->outstanding_tags = - gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * - cq->outstanding_tag_capacity); - } - cq->outstanding_tags[cq->outstanding_tag_count++] = tag; - gpr_mu_unlock(cq->mu); -#endif - cq->vtable->begin_op(cq, tag); -} - #ifndef NDEBUG static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; @@ -576,6 +555,49 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif +/* Atomically increments a counter only if the counter is not zero. Returns + * true if the increment was successful; false if the counter is zero */ +static bool atm_inc_if_nonzero(gpr_atm *counter) { + while (true) { + gpr_atm count = gpr_atm_no_barrier_load(counter); + /* If zero, we are done. If not, we must to a CAS (instead of an atomic + * increment) to maintain the contract: do not increment the counter if it + * is zero. */ + if (count == 0) { + return false; + } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) { + break; + } + } + + return true; +} + +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + return atm_inc_if_nonzero(&cqd->pending_events); +} + +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); + return atm_inc_if_nonzero(&cqd->pending_events); +} + +bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + return cq->vtable->begin_op(cq, tag); +} + /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ @@ -696,8 +718,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next); cqd->completed_tail = storage; - int shutdown = gpr_unref(&cqd->pending_events); - if (!shutdown) { + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_pluck(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + } else { grpc_pollset_worker *pluck_worker = NULL; for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag) { @@ -717,9 +741,6 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } - } else { - cq_finish_shutdown_pluck(exec_ctx, cq); - gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_pluck", 0); @@ -855,8 +876,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second attempt at popping. Not doing this can potentially deadlock this - thread - forever (if the deadline is infinity) */ + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } @@ -869,10 +889,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because (cq->shutdown == true) is only possible when there is no pending - work - (i.e cq->pending_events == 0) and any outstanding - grpc_cq_completion - events are already queued on this cq */ + work (i.e cq->pending_events == 0) and any outstanding completion + events should have already been queued on this cq */ continue; } @@ -909,11 +927,6 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, is_finished_arg.first_loop = false; } - GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(is_finished_arg.stolen_completion == NULL); - if (cq_event_queue_num_items(&cqd->queue) > 0 && gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); @@ -921,6 +934,11 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, gpr_mu_unlock(cq->mu); } + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + GPR_TIMER_END("grpc_completion_queue_next", 0); return ret; @@ -947,15 +965,20 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { cq_next_data *cqd = DATA_FROM_CQ(cq); + /* Need an extra ref for cq here because: + * We call cq_finish_shutdown_next() below, that would call pollset shutdown. + * Pollset shutdown decrements the cq ref count which can potentially destroy + * the cq (if that happens to be the last ref). + * Creating an extra ref here prevents the cq from getting destroyed while + * this function is still active */ GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); - GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } - cqd->shutdown_called = 1; + cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { cq_finish_shutdown_next(exec_ctx, cq); } @@ -1167,21 +1190,31 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, &cq->pollset_shutdown_done); } +/* NOTE: This function is almost exactly identical to cq_shutdown_next() but + * merging them is a bit tricky and probably not worth it */ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { cq_pluck_data *cqd = DATA_FROM_CQ(cq); + /* Need an extra ref for cq here because: + * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. + * Pollset shutdown decrements the cq ref count which can potentially destroy + * the cq (if that happens to be the last ref). + * Creating an extra ref here prevents the cq from getting destroyed while + * this function is still active */ + GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)"); gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); - GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); return; } - cqd->shutdown_called = 1; - if (gpr_unref(&cqd->pending_events)) { + cqd->shutdown_called = true; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { cq_finish_shutdown_pluck(exec_ctx, cq); } gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); } /* Shutdown simply drops a ref that we reserved at creation time; if we drop diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index af44482513..69d144bd95 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. - \a tag is currently used only in debug builds. */ -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); + \a tag is currently used only in debug builds. Return true on success, and + false if completion_queue has been shutdown. */ +bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index db111e597f..d199ac060e 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -36,6 +36,7 @@ #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/alarm_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" @@ -135,6 +136,7 @@ void grpc_init(void) { grpc_register_tracer(&grpc_call_error_trace); #ifndef NDEBUG grpc_register_tracer(&grpc_trace_pending_tags); + grpc_register_tracer(&grpc_trace_alarm_refcount); grpc_register_tracer(&grpc_trace_cq_refcount); grpc_register_tracer(&grpc_trace_closure); grpc_register_tracer(&grpc_trace_error_refcount); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 4bdc03052f..f8c84fb4a8 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1261,7 +1261,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, } /* stay locked, and gather up some stuff to do */ - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1448,7 +1448,11 @@ grpc_call_error grpc_server_request_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } details->reserved = NULL; rc->cq_idx = cq_idx; rc->type = BATCH_CALL; @@ -1498,7 +1502,11 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 8cef15da80..96c16105e7 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -23,4 +23,4 @@ const char *grpc_version_string(void) { return "4.0.0-dev"; } -const char *grpc_g_stands_for(void) { return "gregarious"; } +const char *grpc_g_stands_for(void) { return "gambit"; } |