diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/alarm.c | 37 | ||||
-rw-r--r-- | src/core/lib/surface/byte_buffer.c | 6 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 583 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 12 | ||||
-rw-r--r-- | src/core/lib/surface/call_log_batch.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/call_test_only.h | 12 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 43 | ||||
-rw-r--r-- | src/core/lib/surface/channel_init.c | 12 | ||||
-rw-r--r-- | src/core/lib/surface/channel_ping.c | 4 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 113 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 14 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.cc | 19 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 153 | ||||
-rw-r--r-- | src/core/lib/surface/version.c | 2 |
14 files changed, 692 insertions, 320 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 7d60b1de17..7712f560b9 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -44,7 +44,9 @@ static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); } static void alarm_unref(grpc_alarm *alarm) { if (gpr_unref(&alarm->refs)) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + if (alarm->cq != NULL) { + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + } grpc_exec_ctx_finish(&exec_ctx); gpr_free(alarm); } @@ -78,12 +80,12 @@ static void alarm_unref_dbg(grpc_alarm *alarm, const char *reason, static void alarm_end_completion(grpc_exec_ctx *exec_ctx, void *arg, grpc_cq_completion *c) { - grpc_alarm *alarm = arg; + grpc_alarm *alarm = (grpc_alarm *)arg; GRPC_ALARM_UNREF(alarm, "dequeue-end-op"); } static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_alarm *alarm = arg; + grpc_alarm *alarm = (grpc_alarm *)arg; /* We are queuing an op on completion queue. This means, the alarm's structure cannot be destroyed until the op is dequeued. Adding an extra ref @@ -93,12 +95,8 @@ static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { (void *)alarm, &alarm->completion); } -grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, - void *tag) { - grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm)); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - gpr_ref_init(&alarm->refs, 1); +grpc_alarm *grpc_alarm_create(void *reserved) { + grpc_alarm *alarm = (grpc_alarm *)gpr_malloc(sizeof(grpc_alarm)); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { @@ -106,27 +104,36 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, } #endif + gpr_ref_init(&alarm->refs, 1); + grpc_timer_init_unset(&alarm->alarm); + alarm->cq = NULL; + GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, + grpc_schedule_on_exec_ctx); + return alarm; +} + +void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq, + gpr_timespec deadline, void *tag, void *reserved) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_REF(cq, "alarm"); alarm->cq = cq; alarm->tag = tag; GPR_ASSERT(grpc_cq_begin_op(cq, tag)); - GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, - grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_exec_ctx_finish(&exec_ctx); - return alarm; } -void grpc_alarm_cancel(grpc_alarm *alarm) { +void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_timer_cancel(&exec_ctx, &alarm->alarm); grpc_exec_ctx_finish(&exec_ctx); } -void grpc_alarm_destroy(grpc_alarm *alarm) { - grpc_alarm_cancel(alarm); +void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved) { + grpc_alarm_cancel(alarm, reserved); GRPC_ALARM_UNREF(alarm, "alarm_destroy"); } diff --git a/src/core/lib/surface/byte_buffer.c b/src/core/lib/surface/byte_buffer.c index 0bc990d487..7ed550ef87 100644 --- a/src/core/lib/surface/byte_buffer.c +++ b/src/core/lib/surface/byte_buffer.c @@ -32,7 +32,8 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create( grpc_slice *slices, size_t nslices, grpc_compression_algorithm compression) { size_t i; - grpc_byte_buffer *bb = gpr_malloc(sizeof(grpc_byte_buffer)); + grpc_byte_buffer *bb = + (grpc_byte_buffer *)gpr_malloc(sizeof(grpc_byte_buffer)); bb->type = GRPC_BB_RAW; bb->data.raw.compression = compression; grpc_slice_buffer_init(&bb->data.raw.slice_buffer); @@ -45,7 +46,8 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create( grpc_byte_buffer *grpc_raw_byte_buffer_from_reader( grpc_byte_buffer_reader *reader) { - grpc_byte_buffer *bb = gpr_malloc(sizeof(grpc_byte_buffer)); + grpc_byte_buffer *bb = + (grpc_byte_buffer *)gpr_malloc(sizeof(grpc_byte_buffer)); grpc_slice slice; bb->type = GRPC_BB_RAW; bb->data.raw.compression = GRPC_COMPRESS_NONE; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 00ec9c7c9a..03f47553a1 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -32,6 +32,7 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/compression/algorithm_metadata.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -121,6 +122,7 @@ typedef struct batch_control { bool is_closure; } notify_tag; } completion_data; + grpc_closure start_batch; grpc_closure finish_batch; gpr_refcount steps_to_complete; @@ -144,15 +146,19 @@ typedef struct { grpc_call *sibling_prev; } child_call; +#define RECV_NONE ((gpr_atm)0) +#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1) + struct grpc_call { gpr_refcount ext_ref; gpr_arena *arena; + grpc_call_combiner call_combiner; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; gpr_timespec start_time; /* parent_call* */ gpr_atm parent_call_atm; - child_call *child_call; + child_call *child; /* client or server call */ bool is_client; @@ -170,9 +176,6 @@ struct grpc_call { gpr_atm any_ops_sent_atm; gpr_atm received_final_op_atm; - /* have we received initial metadata */ - bool has_initial_md_been_received; - batch_control *active_batches[MAX_CONCURRENT_BATCHES]; grpc_transport_stream_op_batch_payload stream_op_payload; @@ -183,6 +186,11 @@ struct grpc_call { Element 0 is initial metadata, element 1 is trailing metadata. */ grpc_metadata_array *buffered_metadata[2]; + grpc_metadata compression_md; + + // A char* indicating the peer name. + gpr_atm peer_string; + /* Packed received call statuses from various sources */ gpr_atm status[STATUS_SOURCE_COUNT]; @@ -192,8 +200,12 @@ struct grpc_call { /* Compression algorithm for *incoming* data */ grpc_compression_algorithm incoming_compression_algorithm; + /* Stream compression algorithm for *incoming* data */ + grpc_stream_compression_algorithm incoming_stream_compression_algorithm; /* Supported encodings (compression algorithms), a bitset */ uint32_t encodings_accepted_by_peer; + /* Supported stream encodings (stream compression algorithms), a bitset */ + uint32_t stream_encodings_accepted_by_peer; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -226,7 +238,23 @@ struct grpc_call { } server; } final_op; - void *saved_receiving_stream_ready_bctlp; + /* recv_state can contain one of the following values: + RECV_NONE : : no initial metadata and messages received + RECV_INITIAL_METADATA_FIRST : received initial metadata first + a batch_control* : received messages first + + +------1------RECV_NONE------3-----+ + | | + | | + v v + RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp + | ^ | ^ + | | | | + +-----2-----+ +-----4-----+ + + For 1, 4: See receiving_initial_metadata_ready() function + For 2, 3: See receiving_stream_ready() function */ + gpr_atm recv_state; }; grpc_tracer_flag grpc_call_error_trace = @@ -241,8 +269,9 @@ grpc_tracer_flag grpc_compression_trace = #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) -static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *op); +static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op_batch *op, + grpc_closure *start_batch_closure); static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description); @@ -264,11 +293,11 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl); static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_error *error, bool has_cancelled); -static void add_init_error(grpc_error **composite, grpc_error *new) { - if (new == GRPC_ERROR_NONE) return; +static void add_init_error(grpc_error **composite, grpc_error *new_err) { + if (new_err == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed"); - *composite = grpc_error_add_child(*composite, new); + *composite = grpc_error_add_child(*composite, new_err); } void *grpc_call_arena_alloc(grpc_call *call, size_t size) { @@ -278,7 +307,7 @@ void *grpc_call_arena_alloc(grpc_call *call, size_t size) { static parent_call *get_or_create_parent_call(grpc_call *call) { parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); if (p == NULL) { - p = gpr_arena_alloc(call->arena, sizeof(*p)); + p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p)); gpr_mu_init(&p->child_list_mu); if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) { gpr_mu_destroy(&p->child_list_mu); @@ -301,12 +330,14 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_channel_get_channel_stack(args->channel); grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); - gpr_arena *arena = - gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel)); - call = gpr_arena_alloc(arena, - sizeof(grpc_call) + channel_stack->call_stack_size); + size_t initial_size = grpc_channel_get_call_size_estimate(args->channel); + GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, initial_size); + gpr_arena *arena = gpr_arena_create(initial_size); + call = (grpc_call *)gpr_arena_alloc( + arena, sizeof(grpc_call) + channel_stack->call_stack_size); gpr_ref_init(&call->ext_ref, 1); call->arena = arena; + grpc_call_combiner_init(&call->call_combiner); *out_call = call; call->channel = args->channel; call->cq = args->cq; @@ -314,6 +345,11 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; + if (call->is_client) { + GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx); + } else { + GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx); + } call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); if (call->is_client) { @@ -342,24 +378,24 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, bool immediately_cancel = false; - if (args->parent_call != NULL) { - child_call *cc = call->child_call = - gpr_arena_alloc(arena, sizeof(child_call)); - call->child_call->parent = args->parent_call; + if (args->parent != NULL) { + child_call *cc = call->child = + (child_call *)gpr_arena_alloc(arena, sizeof(child_call)); + call->child->parent = args->parent; - GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); + GRPC_CALL_INTERNAL_REF(args->parent, "child"); GPR_ASSERT(call->is_client); - GPR_ASSERT(!args->parent_call->is_client); + GPR_ASSERT(!args->parent->is_client); - parent_call *pc = get_or_create_parent_call(args->parent_call); + parent_call *pc = get_or_create_parent_call(args->parent); gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( gpr_convert_clock_type(send_deadline, - args->parent_call->send_deadline.clock_type), - args->parent_call->send_deadline); + args->parent->send_deadline.clock_type), + args->parent->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ @@ -371,9 +407,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, "Census tracing propagation requested " "without Census context propagation")); } - grpc_call_context_set( - call, GRPC_CONTEXT_TRACING, - args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, + args->parent->context[GRPC_CONTEXT_TRACING].value, + NULL); } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Census context propagation requested " @@ -381,7 +417,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; - if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { + if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) { immediately_cancel = true; } } @@ -391,9 +427,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, cc->sibling_next = cc->sibling_prev = call; } else { cc->sibling_next = pc->first_child; - cc->sibling_prev = pc->first_child->child_call->sibling_prev; - cc->sibling_next->child_call->sibling_prev = - cc->sibling_prev->child_call->sibling_next = call; + cc->sibling_prev = pc->first_child->child->sibling_prev; + cc->sibling_next->child->sibling_prev = + cc->sibling_prev->child->sibling_next = call; } gpr_mu_unlock(&pc->child_list_mu); @@ -410,7 +446,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, .path = path, .start_time = call->start_time, .deadline = send_deadline, - .arena = call->arena}; + .arena = call->arena, + .call_combiner = &call->call_combiner}; add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, destroy_call, call, &call_args)); if (error != GRPC_ERROR_NONE) { @@ -475,8 +512,10 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { - grpc_call *c = call; + grpc_call *c = (grpc_call *)call; grpc_channel *channel = c->channel; + grpc_call_combiner_destroy(&c->call_combiner); + gpr_free((char *)c->peer_string); grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } @@ -486,7 +525,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { size_t i; int ii; - grpc_call *c = call; + grpc_call *c = (grpc_call *)call; GPR_TIMER_BEGIN("destroy_call", 0); for (i = 0; i < 2; i++) { grpc_metadata_batch_destroy( @@ -511,7 +550,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind"); } - get_final_status(call, set_status_value_directly, &c->final_info.final_status, + get_final_status(c, set_status_value_directly, &c->final_info.final_status, NULL); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -532,7 +571,7 @@ void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); } void grpc_call_unref(grpc_call *c) { if (!gpr_unref(&c->ext_ref)) return; - child_call *cc = c->child_call; + child_call *cc = c->child; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_call_unref", 0); @@ -547,8 +586,8 @@ void grpc_call_unref(grpc_call *c) { pc->first_child = NULL; } } - cc->sibling_prev->child_call->sibling_next = cc->sibling_next; - cc->sibling_next->child_call->sibling_prev = cc->sibling_prev; + cc->sibling_prev->child->sibling_next = cc->sibling_next; + cc->sibling_next->child->sibling_prev = cc->sibling_prev; gpr_mu_unlock(&pc->child_list_mu); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); } @@ -560,6 +599,12 @@ void grpc_call_unref(grpc_call *c) { if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); + } else { + // Unset the call combiner cancellation closure. This has the + // effect of scheduling the previously set cancellation closure, if + // any, so that it can release any internal references it may be + // holding to the call stack. + grpc_call_combiner_set_notify_on_cancel(&exec_ctx, &c->call_combiner, NULL); } GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); grpc_exec_ctx_finish(&exec_ctx); @@ -576,30 +621,37 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { return GRPC_CALL_OK; } -static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *op) { - grpc_call_element *elem; - - GPR_TIMER_BEGIN("execute_op", 0); - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); - GPR_TIMER_END("execute_op", 0); +// This is called via the call combiner to start sending a batch down +// the filter stack. +static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *ignored) { + grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg; + grpc_call *call = (grpc_call *)batch->handler_private.extra_arg; + GPR_TIMER_BEGIN("execute_batch", 0); + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch); + GPR_TIMER_END("execute_batch", 0); +} + +// start_batch_closure points to a caller-allocated closure to be used +// for entering the call combiner. +static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op_batch *batch, + grpc_closure *start_batch_closure) { + batch->handler_private.extra_arg = call; + GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure, + GRPC_ERROR_NONE, "executing batch"); } char *grpc_call_get_peer(grpc_call *call) { - grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *result; - GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); - result = elem->filter->get_peer(&exec_ctx, elem); - if (result == NULL) { - result = grpc_channel_get_target(call->channel); - } - if (result == NULL) { - result = gpr_strdup("unknown"); - } - grpc_exec_ctx_finish(&exec_ctx); - return result; + char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string); + if (peer_string != NULL) return gpr_strdup(peer_string); + peer_string = grpc_channel_get_target(call->channel); + if (peer_string != NULL) return peer_string; + return gpr_strdup("unknown"); } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -626,20 +678,41 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return GRPC_CALL_OK; } -static void done_termination(grpc_exec_ctx *exec_ctx, void *call, +typedef struct { + grpc_call *call; + grpc_closure start_batch; + grpc_closure finish_batch; +} cancel_state; + +// The on_complete callback used when sending a cancel_stream batch down +// the filter stack. Yields the call combiner when the batch is done. +static void done_termination(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination"); + cancel_state *state = (cancel_state *)arg; + GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner, + "on_complete for cancel_stream op"); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination"); + gpr_free(state); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); + // Inform the call combiner of the cancellation, so that it can cancel + // any in-flight asynchronous actions that may be holding the call + // combiner. This ensures that the cancel_stream batch can be sent + // down the filter stack in a timely manner. + grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error)); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( - GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx)); + cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state)); + state->call = c; + GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, + grpc_schedule_on_exec_ctx); + grpc_transport_stream_op_batch *op = + grpc_make_transport_stream_op(&state->finish_batch); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; - execute_op(exec_ctx, c, op); + execute_batch(exec_ctx, c, op, &state->start_batch); } static grpc_error *error_from_status(grpc_status_code status, @@ -752,6 +825,12 @@ static void set_incoming_compression_algorithm( call->incoming_compression_algorithm = algo; } +static void set_incoming_stream_compression_algorithm( + grpc_call *call, grpc_stream_compression_algorithm algo) { + GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT); + call->incoming_stream_compression_algorithm = algo; +} + grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_call *call) { grpc_compression_algorithm algorithm; @@ -765,6 +844,13 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked( call->encodings_accepted_by_peer); } +static grpc_stream_compression_algorithm +stream_compression_algorithm_for_level_locked( + grpc_call *call, grpc_stream_compression_level level) { + return grpc_stream_compression_algorithm_for_level( + level, call->stream_encodings_accepted_by_peer); +} + uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { uint32_t flags; flags = call->test_only_last_message_flags; @@ -819,12 +905,70 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1)); } +static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, + grpc_call *call, + grpc_mdelem mdel) { + size_t i; + grpc_stream_compression_algorithm algorithm; + grpc_slice_buffer accept_encoding_parts; + grpc_slice accept_encoding_slice; + void *accepted_user_data; + + accepted_user_data = + grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer); + if (accepted_user_data != NULL) { + call->stream_encodings_accepted_by_peer = + (uint32_t)(((uintptr_t)accepted_user_data) - 1); + return; + } + + accept_encoding_slice = GRPC_MDVALUE(mdel); + grpc_slice_buffer_init(&accept_encoding_parts); + grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); + + /* Always support no compression */ + GPR_BITSET(&call->stream_encodings_accepted_by_peer, + GRPC_STREAM_COMPRESS_NONE); + for (i = 0; i < accept_encoding_parts.count; i++) { + grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i]; + if (grpc_stream_compression_algorithm_parse(accept_encoding_entry_slice, + &algorithm)) { + GPR_BITSET(&call->stream_encodings_accepted_by_peer, algorithm); + } else { + char *accept_encoding_entry_str = + grpc_slice_to_c_string(accept_encoding_entry_slice); + gpr_log(GPR_ERROR, + "Invalid entry in accept encoding metadata: '%s'. Ignoring.", + accept_encoding_entry_str); + gpr_free(accept_encoding_entry_str); + } + } + + grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts); + + grpc_mdelem_set_user_data( + mdel, destroy_encodings_accepted_by_peer, + (void *)(((uintptr_t)call->stream_encodings_accepted_by_peer) + 1)); +} + uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { uint32_t encodings_accepted_by_peer; encodings_accepted_by_peer = call->encodings_accepted_by_peer; return encodings_accepted_by_peer; } +uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer( + grpc_call *call) { + uint32_t stream_encodings_accepted_by_peer; + stream_encodings_accepted_by_peer = call->stream_encodings_accepted_by_peer; + return stream_encodings_accepted_by_peer; +} + +grpc_stream_compression_algorithm +grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) { + return call->incoming_stream_compression_algorithm; +} + static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) { return (grpc_linked_mdelem *)&md->internal_data; } @@ -936,6 +1080,22 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) { return algorithm; } +static grpc_stream_compression_algorithm decode_stream_compression( + grpc_mdelem md) { + grpc_stream_compression_algorithm algorithm = + grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md)); + if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) { + char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + gpr_log(GPR_ERROR, + "Invalid incoming stream compression algorithm: '%s'. Interpreting " + "incoming data as uncompressed.", + md_c_str); + gpr_free(md_c_str); + return GRPC_STREAM_COMPRESS_NONE; + } + return algorithm; +} + static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, int is_trailing) { if (b->list.count == 0) return; @@ -946,8 +1106,8 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, if (dest->count + b->list.count > dest->capacity) { dest->capacity = GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + dest->metadata = (grpc_metadata *)gpr_realloc( + dest->metadata, sizeof(grpc_metadata) * dest->capacity); } for (grpc_linked_mdelem *l = b->list.head; l != NULL; l = l->next) { mdusr = &dest->metadata[dest->count++]; @@ -960,7 +1120,19 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch *b) { - if (b->idx.named.grpc_encoding != NULL) { + if (b->idx.named.content_encoding != NULL) { + if (b->idx.named.grpc_encoding != NULL) { + gpr_log(GPR_ERROR, + "Received both content-encoding and grpc-encoding header. " + "Ignoring grpc-encoding."); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); + } + GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0); + set_incoming_stream_compression_algorithm( + call, decode_stream_compression(b->idx.named.content_encoding->md)); + GPR_TIMER_END("incoming_stream_compression_algorithm", 0); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding); + } else if (b->idx.named.grpc_encoding != NULL) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); set_incoming_compression_algorithm( call, decode_compression(b->idx.named.grpc_encoding->md)); @@ -974,12 +1146,19 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding); GPR_TIMER_END("encodings_accepted_by_peer", 0); } + if (b->idx.named.accept_encoding != NULL) { + GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0); + set_stream_encodings_accepted_by_peer(exec_ctx, call, + b->idx.named.accept_encoding->md); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding); + GPR_TIMER_END("stream_encodings_accepted_by_peer", 0); + } publish_app_metadata(call, b, false); } static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, grpc_metadata_batch *b) { - grpc_call *call = args; + grpc_call *call = (grpc_call *)args; if (b->idx.named.grpc_status != NULL) { uint32_t status_code = decode_status(b->idx.named.grpc_status->md); grpc_error *error = @@ -1063,7 +1242,8 @@ static batch_control *allocate_batch_control(grpc_call *call, int slot = batch_slot_for_op(ops[0].op); batch_control **pslot = &call->active_batches[slot]; if (*pslot == NULL) { - *pslot = gpr_arena_alloc(call->arena, sizeof(batch_control)); + *pslot = + (batch_control *)gpr_arena_alloc(call->arena, sizeof(batch_control)); } batch_control *bctl = *pslot; if (bctl->call != NULL) { @@ -1077,7 +1257,7 @@ static batch_control *allocate_batch_control(grpc_call *call, static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, grpc_cq_completion *storage) { - batch_control *bctl = user_data; + batch_control *bctl = (batch_control *)user_data; grpc_call *call = bctl->call; bctl->call = NULL; GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); @@ -1137,7 +1317,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, child = pc->first_child; if (child != NULL) { do { - next_child_call = child->child_call->sibling_next; + next_child_call = child->child->sibling_next; if (child->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, @@ -1166,7 +1346,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = NULL; - GRPC_CLOSURE_RUN(exec_ctx, bctl->completion_data.notify_tag.tag, error); + GRPC_CLOSURE_RUN( + exec_ctx, (grpc_closure *)bctl->completion_data.notify_tag.tag, error); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ @@ -1220,7 +1401,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { - batch_control *bctl = bctlp; + batch_control *bctl = (batch_control *)bctlp; grpc_call *call = bctl->call; grpc_byte_stream *bs = call->receiving_stream; bool release_error = false; @@ -1279,7 +1460,7 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { - batch_control *bctl = bctlp; + batch_control *bctl = (batch_control *)bctlp; grpc_call *call = bctl->call; if (error != GRPC_ERROR_NONE) { if (call->receiving_stream != NULL) { @@ -1290,19 +1471,73 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } - if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || - call->receiving_stream == NULL) { - process_data_after_md(exec_ctx, bctlp); - } else { - call->saved_receiving_stream_ready_bctlp = bctlp; + /* If recv_state is RECV_NONE, we will save the batch_control + * object with rel_cas, and will not use it after the cas. Its corresponding + * acq_load is in receiving_initial_metadata_ready() */ + if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL || + !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) { + process_data_after_md(exec_ctx, bctl); } } +// The recv_message_ready callback used when sending a batch containing +// a recv_message op down the filter stack. Yields the call combiner +// before processing the received message. +static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx, + void *bctlp, + grpc_error *error) { + batch_control *bctl = (batch_control *)bctlp; + grpc_call *call = bctl->call; + GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready"); + receiving_stream_ready(exec_ctx, bctlp, error); +} + static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; - /* validate call->incoming_compression_algorithm */ - if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) { + /* validate compression algorithms */ + if (call->incoming_stream_compression_algorithm != + GRPC_STREAM_COMPRESS_NONE) { + const grpc_stream_compression_algorithm algo = + call->incoming_stream_compression_algorithm; + char *error_msg = NULL; + const grpc_compression_options compression_options = + grpc_channel_compression_options(call->channel); + if (algo >= GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) { + gpr_asprintf(&error_msg, + "Invalid stream compression algorithm value '%d'.", algo); + gpr_log(GPR_ERROR, "%s", error_msg); + cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, + GRPC_STATUS_UNIMPLEMENTED, error_msg); + } else if (grpc_compression_options_is_stream_compression_algorithm_enabled( + &compression_options, algo) == 0) { + /* check if algorithm is supported by current channel config */ + const char *algo_name = NULL; + grpc_stream_compression_algorithm_name(algo, &algo_name); + gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.", + algo_name); + gpr_log(GPR_ERROR, "%s", error_msg); + cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, + GRPC_STATUS_UNIMPLEMENTED, error_msg); + } + gpr_free(error_msg); + + GPR_ASSERT(call->stream_encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->stream_encodings_accepted_by_peer, + call->incoming_stream_compression_algorithm)) { + if (GRPC_TRACER_ON(grpc_compression_trace)) { + const char *algo_name = NULL; + grpc_stream_compression_algorithm_name( + call->incoming_stream_compression_algorithm, &algo_name); + gpr_log( + GPR_ERROR, + "Stream compression algorithm (content-encoding = '%s') not " + "present in the bitset of accepted encodings (accept-encodings: " + "'0x%x')", + algo_name, call->stream_encodings_accepted_by_peer); + } + } + } else if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) { const grpc_compression_algorithm algo = call->incoming_compression_algorithm; char *error_msg = NULL; @@ -1318,7 +1553,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ - char *algo_name = NULL; + const char *algo_name = NULL; grpc_compression_algorithm_name(algo, &algo_name); gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); @@ -1329,22 +1564,20 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, call->incoming_compression_algorithm = algo; } gpr_free(error_msg); - } - /* make sure the received grpc-encoding is amongst the ones listed in - * grpc-accept-encoding */ - GPR_ASSERT(call->encodings_accepted_by_peer != 0); - if (!GPR_BITGET(call->encodings_accepted_by_peer, - call->incoming_compression_algorithm)) { - if (GRPC_TRACER_ON(grpc_compression_trace)) { - char *algo_name = NULL; - grpc_compression_algorithm_name(call->incoming_compression_algorithm, - &algo_name); - gpr_log(GPR_ERROR, - "Compression algorithm (grpc-encoding = '%s') not present in " - "the bitset of accepted encodings (grpc-accept-encodings: " - "'0x%x')", - algo_name, call->encodings_accepted_by_peer); + GPR_ASSERT(call->encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->encodings_accepted_by_peer, + call->incoming_compression_algorithm)) { + if (GRPC_TRACER_ON(grpc_compression_trace)) { + const char *algo_name = NULL; + grpc_compression_algorithm_name(call->incoming_compression_algorithm, + &algo_name); + gpr_log(GPR_ERROR, + "Compression algorithm (grpc-encoding = '%s') not present in " + "the bitset of accepted encodings (grpc-accept-encodings: " + "'0x%x')", + algo_name, call->encodings_accepted_by_peer); + } } } } @@ -1362,9 +1595,12 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { - batch_control *bctl = bctlp; + batch_control *bctl = (batch_control *)bctlp; grpc_call *call = bctl->call; + GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, + "recv_initial_metadata_ready"); + add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = @@ -1384,12 +1620,31 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } } - call->has_initial_md_been_received = true; - if (call->saved_receiving_stream_ready_bctlp != NULL) { - grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE( - receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, - grpc_schedule_on_exec_ctx); - call->saved_receiving_stream_ready_bctlp = NULL; + grpc_closure *saved_rsr_closure = NULL; + while (true) { + gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state); + /* Should only receive initial metadata once */ + GPR_ASSERT(rsr_bctlp != 1); + if (rsr_bctlp == 0) { + /* We haven't seen initial metadata and messages before, thus initial + * metadata is received first. + * no_barrier_cas is used, as this function won't access the batch_control + * object saved by receiving_stream_ready() if the initial metadata is + * received first. */ + if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE, + RECV_INITIAL_METADATA_FIRST)) { + break; + } + } else { + /* Already received messages */ + saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready, + (batch_control *)rsr_bctlp, + grpc_schedule_on_exec_ctx); + /* No need to modify recv_state */ + break; + } + } + if (saved_rsr_closure != NULL) { GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } @@ -1398,8 +1653,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { - batch_control *bctl = bctlp; - + batch_control *bctl = (batch_control *)bctlp; + grpc_call *call = bctl->call; + GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete"); add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); finish_batch_step(exec_ctx, bctl); } @@ -1418,9 +1674,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, batch_control *bctl; int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; - - // sent_initial_metadata guards against variable reuse. - grpc_metadata compression_md; + grpc_transport_stream_op_batch *stream_op; + grpc_transport_stream_op_batch_payload *stream_op_payload; GPR_TIMER_BEGIN("grpc_call_start_batch", 0); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); @@ -1428,11 +1683,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); - grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, - free_no_op_completion, NULL, - gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op( + exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, + free_no_op_completion, NULL, + (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion))); } else { - GRPC_CLOSURE_SCHED(exec_ctx, notify_tag, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)notify_tag, GRPC_ERROR_NONE); } error = GRPC_CALL_OK; goto done; @@ -1446,9 +1702,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->completion_data.notify_tag.is_closure = (uint8_t)(is_notify_tag_closure != 0); - grpc_transport_stream_op_batch *stream_op = &bctl->op; - grpc_transport_stream_op_batch_payload *stream_op_payload = - &call->stream_op_payload; + stream_op = &bctl->op; + stream_op_payload = &call->stream_op_payload; /* rewrite batch ops into a transport op */ for (i = 0; i < nops; i++) { @@ -1458,7 +1713,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } switch (op->op) { - case GRPC_OP_SEND_INITIAL_METADATA: + case GRPC_OP_SEND_INITIAL_METADATA: { /* Flag validation: currently allow no flags */ if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; @@ -1469,31 +1724,60 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } /* process compression level */ - memset(&compression_md, 0, sizeof(compression_md)); + memset(&call->compression_md, 0, sizeof(call->compression_md)); size_t additional_metadata_count = 0; - grpc_compression_level effective_compression_level; + grpc_compression_level effective_compression_level = + GRPC_COMPRESS_LEVEL_NONE; + grpc_stream_compression_level effective_stream_compression_level = + GRPC_STREAM_COMPRESS_LEVEL_NONE; bool level_set = false; - if (op->data.send_initial_metadata.maybe_compression_level.is_set) { + bool stream_compression = false; + if (op->data.send_initial_metadata.maybe_stream_compression_level + .is_set) { + effective_stream_compression_level = + op->data.send_initial_metadata.maybe_stream_compression_level + .level; + level_set = true; + stream_compression = true; + } else if (op->data.send_initial_metadata.maybe_compression_level + .is_set) { effective_compression_level = op->data.send_initial_metadata.maybe_compression_level.level; level_set = true; } else { const grpc_compression_options copts = grpc_channel_compression_options(call->channel); - level_set = copts.default_level.is_set; - if (level_set) { + if (copts.default_stream_compression_level.is_set) { + level_set = true; + effective_stream_compression_level = + copts.default_stream_compression_level.level; + stream_compression = true; + } else if (copts.default_level.is_set) { + level_set = true; effective_compression_level = copts.default_level.level; } } if (level_set && !call->is_client) { - const grpc_compression_algorithm calgo = - compression_algorithm_for_level_locked( - call, effective_compression_level); - // the following will be picked up by the compress filter and used as - // the call's compression algorithm. - compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; - compression_md.value = grpc_compression_algorithm_slice(calgo); - additional_metadata_count++; + if (stream_compression) { + const grpc_stream_compression_algorithm calgo = + stream_compression_algorithm_for_level_locked( + call, effective_stream_compression_level); + call->compression_md.key = + GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST; + call->compression_md.value = + grpc_stream_compression_algorithm_slice(calgo); + } else { + const grpc_compression_algorithm calgo = + compression_algorithm_for_level_locked( + call, effective_compression_level); + /* the following will be picked up by the compress filter and used + * as the call's compression algorithm. */ + call->compression_md.key = + GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; + call->compression_md.value = + grpc_compression_algorithm_slice(calgo); + additional_metadata_count++; + } } if (op->data.send_initial_metadata.count + additional_metadata_count > @@ -1506,7 +1790,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, - &compression_md, (int)additional_metadata_count)) { + &call->compression_md, (int)additional_metadata_count)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } @@ -1518,8 +1802,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->send_initial_metadata.send_initial_metadata_flags = op->flags; + if (call->is_client) { + stream_op_payload->send_initial_metadata.peer_string = + &call->peer_string; + } break; - case GRPC_OP_SEND_MESSAGE: + } + case GRPC_OP_SEND_MESSAGE: { if (!are_write_flags_valid(op->flags)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; @@ -1548,7 +1837,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op_payload->send_message.send_message = &call->sending_stream.base; break; - case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + } + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; @@ -1567,7 +1857,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; - case GRPC_OP_SEND_STATUS_FROM_SERVER: + } + case GRPC_OP_SEND_STATUS_FROM_SERVER: { /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; @@ -1629,7 +1920,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; - case GRPC_OP_RECV_INITIAL_METADATA: + } + case GRPC_OP_RECV_INITIAL_METADATA: { /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; @@ -1650,9 +1942,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; + if (!call->is_client) { + stream_op_payload->recv_initial_metadata.peer_string = + &call->peer_string; + } num_completion_callbacks_needed++; break; - case GRPC_OP_RECV_MESSAGE: + } + case GRPC_OP_RECV_MESSAGE: { /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; @@ -1666,13 +1963,15 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->recv_message = true; call->receiving_buffer = op->data.recv_message.recv_message; stream_op_payload->recv_message.recv_message = &call->receiving_stream; - GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready, - bctl, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&call->receiving_stream_ready, + receiving_stream_ready_in_call_combiner, bctl, + grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; - case GRPC_OP_RECV_STATUS_ON_CLIENT: + } + case GRPC_OP_RECV_STATUS_ON_CLIENT: { /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; @@ -1699,7 +1998,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; break; - case GRPC_OP_RECV_CLOSE_ON_SERVER: + } + case GRPC_OP_RECV_CLOSE_ON_SERVER: { /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; @@ -1723,6 +2023,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; break; + } } } @@ -1737,7 +2038,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); - execute_op(exec_ctx, call, stream_op); + execute_batch(exec_ctx, call, stream_op, &bctl->start_batch); done: GPR_TIMER_END("grpc_call_start_batch", 0); diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 185bfccb77..c680139cf6 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -19,6 +19,10 @@ #ifndef GRPC_CORE_LIB_SURFACE_CALL_H #define GRPC_CORE_LIB_SURFACE_CALL_H +#ifdef __cplusplus +extern "C" { +#endif + #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/surface/api_trace.h" @@ -26,10 +30,6 @@ #include <grpc/grpc.h> #include <grpc/impl/codegen/compression_types.h> -#ifdef __cplusplus -extern "C" { -#endif - typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx, grpc_call *call, int success, void *user_data); @@ -37,7 +37,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx, typedef struct grpc_call_create_args { grpc_channel *channel; - grpc_call *parent_call; + grpc_call *parent; uint32_t propagation_mask; grpc_completion_queue *cq; @@ -89,7 +89,7 @@ grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx, /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); -void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, +void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity, grpc_call *call, const grpc_op *ops, size_t nops, void *tag); diff --git a/src/core/lib/surface/call_log_batch.c b/src/core/lib/surface/call_log_batch.c index 4443aba58a..4a1c265817 100644 --- a/src/core/lib/surface/call_log_batch.c +++ b/src/core/lib/surface/call_log_batch.c @@ -103,7 +103,7 @@ char *grpc_op_string(const grpc_op *op) { return out; } -void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, +void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity, grpc_call *call, const grpc_op *ops, size_t nops, void *tag) { char *tmp; diff --git a/src/core/lib/surface/call_test_only.h b/src/core/lib/surface/call_test_only.h index 2f1b80bfd7..a5a01b3679 100644 --- a/src/core/lib/surface/call_test_only.h +++ b/src/core/lib/surface/call_test_only.h @@ -42,6 +42,18 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call); * To be indexed by grpc_compression_algorithm enum values. */ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call); +/** Returns a bitset for the stream encodings (stream compression algorithms) + * supported by \a call's peer. + * + * To be indexed by grpc_stream_compression_algorithm enum values. */ +uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer( + grpc_call *call); + +/** Returns the incoming stream compression algorithm (content-encoding header) + * received by a call. */ +grpc_stream_compression_algorithm +grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call); + #ifdef __cplusplus } #endif diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 5780a18ce8..48962e5e45 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -27,6 +27,7 @@ #include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" @@ -77,6 +78,11 @@ grpc_channel *grpc_channel_create_with_builder( grpc_channel_args *args = grpc_channel_args_copy( grpc_channel_stack_builder_get_channel_arguments(builder)); grpc_channel *channel; + if (channel_stack_type == GRPC_SERVER_CHANNEL) { + GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx); + } else { + GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx); + } grpc_error *error = grpc_channel_stack_builder_finish( exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL, (void **)&channel); @@ -142,6 +148,16 @@ grpc_channel *grpc_channel_create_with_builder( GRPC_COMPRESS_LEVEL_NONE, GRPC_COMPRESS_LEVEL_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, + GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) { + channel->compression_options.default_stream_compression_level.is_set = + true; + channel->compression_options.default_stream_compression_level.level = + (grpc_stream_compression_level)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){GRPC_STREAM_COMPRESS_LEVEL_NONE, + GRPC_STREAM_COMPRESS_LEVEL_NONE, + GRPC_STREAM_COMPRESS_LEVEL_COUNT - 1}); + } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { channel->compression_options.default_algorithm.is_set = true; channel->compression_options.default_algorithm.algorithm = @@ -149,12 +165,31 @@ grpc_channel *grpc_channel_create_with_builder( &args->args[i], (grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_ALGORITHMS_COUNT - 1}); + } else if (0 == strcmp(args->args[i].key, + GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { + channel->compression_options.default_stream_compression_algorithm.is_set = + true; + channel->compression_options.default_stream_compression_algorithm + .algorithm = + (grpc_stream_compression_algorithm)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){ + GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE, + GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { channel->compression_options.enabled_algorithms_bitset = (uint32_t)args->args[i].value.integer | 0x1; /* always support no compression */ + } else if (0 == + strcmp( + args->args[i].key, + GRPC_STREAM_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { + channel->compression_options + .enabled_stream_compression_algorithms_bitset = + (uint32_t)args->args[i].value.integer | + 0x1; /* always support no compression */ } } @@ -247,7 +282,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_call_create_args args; memset(&args, 0, sizeof(args)); args.channel = channel; - args.parent_call = parent_call; + args.parent = parent_call; args.propagation_mask = propagation_mask; args.cq = cq; args.pollset_set_alternative = pollset_set_alternative; @@ -298,7 +333,7 @@ grpc_call *grpc_channel_create_pollset_set_call( void *grpc_channel_register_call(grpc_channel *channel, const char *method, const char *host, void *reserved) { - registered_call *rc = gpr_malloc(sizeof(registered_call)); + registered_call *rc = (registered_call *)gpr_malloc(sizeof(registered_call)); GRPC_API_TRACE( "grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)", 4, (channel, method, host, reserved)); @@ -325,7 +360,7 @@ grpc_call *grpc_channel_create_registered_call( grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, gpr_timespec deadline, void *reserved) { - registered_call *rc = registered_call_handle; + registered_call *rc = (registered_call *)registered_call_handle; GRPC_API_TRACE( "grpc_channel_create_registered_call(" "channel=%p, parent_call=%p, propagation_mask=%x, completion_queue=%p, " @@ -363,7 +398,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_channel *channel = arg; + grpc_channel *channel = (grpc_channel *)arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); while (channel->registered_calls) { registered_call *rc = channel->registered_calls; diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c index a1391ffe56..33f444b89e 100644 --- a/src/core/lib/surface/channel_init.c +++ b/src/core/lib/surface/channel_init.c @@ -53,9 +53,9 @@ void grpc_channel_init_register_stage(grpc_channel_stack_type type, GPR_ASSERT(!g_finalized); if (g_slots[type].cap_slots == g_slots[type].num_slots) { g_slots[type].cap_slots = GPR_MAX(8, 3 * g_slots[type].cap_slots / 2); - g_slots[type].slots = - gpr_realloc(g_slots[type].slots, - g_slots[type].cap_slots * sizeof(*g_slots[type].slots)); + g_slots[type].slots = (stage_slot *)gpr_realloc( + g_slots[type].slots, + g_slots[type].cap_slots * sizeof(*g_slots[type].slots)); } stage_slot *s = &g_slots[type].slots[g_slots[type].num_slots++]; s->insertion_order = g_slots[type].num_slots; @@ -65,8 +65,8 @@ void grpc_channel_init_register_stage(grpc_channel_stack_type type, } static int compare_slots(const void *a, const void *b) { - const stage_slot *sa = a; - const stage_slot *sb = b; + const stage_slot *sa = (const stage_slot *)a; + const stage_slot *sb = (const stage_slot *)b; int c = GPR_ICMP(sa->priority, sb->priority); if (c != 0) return c; @@ -85,7 +85,7 @@ void grpc_channel_init_finalize(void) { void grpc_channel_init_shutdown(void) { for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) { gpr_free(g_slots[i].slots); - g_slots[i].slots = (void *)(uintptr_t)0xdeadbeef; + g_slots[i].slots = (stage_slot *)(void *)(uintptr_t)0xdeadbeef; } } diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index e85b308850..f45b568958 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -39,7 +39,7 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg, } static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - ping_result *pr = arg; + ping_result *pr = (ping_result *)arg; grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, GRPC_ERROR_REF(error), ping_destroy, pr, &pr->completion_storage); } @@ -49,7 +49,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GRPC_API_TRACE("grpc_channel_ping(channel=%p, cq=%p, tag=%p, reserved=%p)", 4, (channel, cq, tag, reserved)); grpc_transport_op *op = grpc_make_transport_op(NULL); - ping_result *pr = gpr_malloc(sizeof(*pr)); + ping_result *pr = (ping_result *)gpr_malloc(sizeof(*pr)); grpc_channel_element *top_elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index c20cfbc740..fed66e3a20 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -26,6 +26,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -54,7 +55,7 @@ typedef struct { bool can_listen; size_t (*size)(void); void (*init)(grpc_pollset *pollset, gpr_mu **mu); - grpc_error *(*kick)(grpc_pollset *pollset, + grpc_error *(*kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker); grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker, gpr_timespec now, @@ -130,7 +131,8 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, } static grpc_error *non_polling_poller_kick( - grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) { non_polling_poller *p = (non_polling_poller *)pollset; if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root; if (specific_worker != NULL) { @@ -327,25 +329,12 @@ static void cq_destroy_pluck(void *data); /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ - {.data_size = sizeof(cq_next_data), - .cq_completion_type = GRPC_CQ_NEXT, - .init = cq_init_next, - .shutdown = cq_shutdown_next, - .destroy = cq_destroy_next, - .begin_op = cq_begin_op_for_next, - .end_op = cq_end_op_for_next, - .next = cq_next, - .pluck = NULL}, + {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next, + cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next, NULL}, /* GRPC_CQ_PLUCK */ - {.data_size = sizeof(cq_pluck_data), - .cq_completion_type = GRPC_CQ_PLUCK, - .init = cq_init_pluck, - .shutdown = cq_shutdown_pluck, - .destroy = cq_destroy_pluck, - .begin_op = cq_begin_op_for_pluck, - .end_op = cq_end_op_for_pluck, - .next = NULL, - .pluck = cq_pluck}, + {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck, + cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, NULL, + cq_pluck}, }; #define DATA_FROM_CQ(cq) ((void *)(cq + 1)) @@ -420,8 +409,13 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + - poller_vtable->size()); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_STATS_INC_CQS_CREATED(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + + cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) + + vtable->data_size + + poller_vtable->size()); cq->vtable = vtable; cq->poller_vtable = poller_vtable; @@ -441,7 +435,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( } static void cq_init_next(void *ptr) { - cq_next_data *cqd = ptr; + cq_next_data *cqd = (cq_next_data *)ptr; /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); cqd->shutdown_called = false; @@ -450,13 +444,13 @@ static void cq_init_next(void *ptr) { } static void cq_destroy_next(void *ptr) { - cq_next_data *cqd = ptr; + cq_next_data *cqd = (cq_next_data *)ptr; GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); cq_event_queue_destroy(&cqd->queue); } static void cq_init_pluck(void *ptr) { - cq_pluck_data *cqd = ptr; + cq_pluck_data *cqd = (cq_pluck_data *)ptr; /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); cqd->completed_tail = &cqd->completed_head; @@ -468,7 +462,7 @@ static void cq_init_pluck(void *ptr) { } static void cq_destroy_pluck(void *ptr) { - cq_pluck_data *cqd = ptr; + cq_pluck_data *cqd = (cq_pluck_data *)ptr; GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); } @@ -501,7 +495,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cq) { static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_completion_queue *cq = arg; + grpc_completion_queue *cq = (grpc_completion_queue *)arg; GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); } @@ -559,13 +553,13 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} * true if the increment was successful; false if the counter is zero */ static bool atm_inc_if_nonzero(gpr_atm *counter) { while (true) { - gpr_atm count = gpr_atm_no_barrier_load(counter); + gpr_atm count = gpr_atm_acq_load(counter); /* If zero, we are done. If not, we must to a CAS (instead of an atomic * increment) to maintain the contract: do not increment the counter if it * is zero. */ if (count == 0) { return false; - } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) { + } else if (gpr_atm_full_cas(counter, count, count + 1)) { break; } } @@ -574,12 +568,12 @@ static bool atm_inc_if_nonzero(gpr_atm *counter) { } static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); return atm_inc_if_nonzero(&cqd->pending_events); } static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); return atm_inc_if_nonzero(&cqd->pending_events); } @@ -588,9 +582,9 @@ bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { gpr_mu_lock(cq->mu); if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); - cq->outstanding_tags = - gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * - cq->outstanding_tag_capacity); + cq->outstanding_tags = (void **)gpr_realloc( + cq->outstanding_tags, + sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity); } cq->outstanding_tags[cq->outstanding_tag_count++] = tag; gpr_mu_unlock(cq->mu); @@ -624,7 +618,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, } } - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -637,15 +631,19 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, /* Add the completion to the queue */ bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - bool will_definitely_shutdown = - gpr_atm_no_barrier_load(&cqd->pending_events) == 1; + + /* Since we do not hold the cq lock here, it is important to do an 'acquire' + load here (instead of a 'no_barrier' load) to match with the release store + (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + */ + bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; if (!will_definitely_shutdown) { /* Only kick if this is the first item queued */ if (is_first) { gpr_mu_lock(cq->mu); grpc_error *kick_error = - cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { @@ -685,7 +683,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -731,7 +729,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, } grpc_error *kick_error = - cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); + cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker); gpr_mu_unlock(cq->mu); @@ -766,9 +764,9 @@ typedef struct { } cq_is_finished_arg; static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { - cq_is_finished_arg *a = arg; + cq_is_finished_arg *a = (cq_is_finished_arg *)arg; grpc_completion_queue *cq = a->cq; - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -819,7 +817,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -882,7 +880,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } } - if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { + if (gpr_atm_acq_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ @@ -928,9 +926,9 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } if (cq_event_queue_num_items(&cqd->queue) > 0 && - gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { + gpr_atm_acq_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); - cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cq->mu); } @@ -952,7 +950,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, this function */ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); @@ -963,7 +961,7 @@ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: * We call cq_finish_shutdown_next() below, that would call pollset shutdown. @@ -976,10 +974,12 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); - GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cqd->shutdown_called = true; + /* Doing a full_fetch_add (i.e acq/release) here to match with + * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write + * on this counter without necessarily holding a lock on cq */ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { cq_finish_shutdown_next(exec_ctx, cq); } @@ -994,7 +994,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -1006,7 +1006,7 @@ static int add_plucker(grpc_completion_queue *cq, void *tag, static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -1018,9 +1018,9 @@ static void del_plucker(grpc_completion_queue *cq, void *tag, } static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { - cq_is_finished_arg *a = arg; + cq_is_finished_arg *a = (cq_is_finished_arg *)arg; grpc_completion_queue *cq = a->cq; - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -1057,7 +1057,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -1181,7 +1181,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); @@ -1195,7 +1195,7 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, * merging them is a bit tricky and probably not worth it */ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. @@ -1208,7 +1208,6 @@ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); - GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cqd->shutdown_called = true; diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index d199ac060e..b089da2c54 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -28,12 +28,15 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/handshaker_registry.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/alarm_internal.h" @@ -118,6 +121,7 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); + grpc_stats_init(); grpc_slice_intern_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); @@ -127,6 +131,7 @@ void grpc_init(void) { grpc_register_tracer(&grpc_trace_channel_stack_builder); grpc_register_tracer(&grpc_http1_trace); grpc_register_tracer(&grpc_cq_pluck_trace); // default on + grpc_register_tracer(&grpc_call_combiner_trace); grpc_register_tracer(&grpc_combiner_trace); grpc_register_tracer(&grpc_server_channel_trace); grpc_register_tracer(&grpc_bdp_estimator_trace); @@ -175,17 +180,20 @@ void grpc_shutdown(void) { GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { - grpc_iomgr_shutdown(&exec_ctx); - gpr_timers_global_destroy(); - grpc_tracer_shutdown(); + grpc_executor_shutdown(&exec_ctx); + grpc_timer_manager_set_threading(false); // shutdown timer_manager thread for (i = g_number_of_plugins; i >= 0; i--) { if (g_all_of_the_plugins[i].destroy != NULL) { g_all_of_the_plugins[i].destroy(); } } + grpc_iomgr_shutdown(&exec_ctx); + gpr_timers_global_destroy(); + grpc_tracer_shutdown(); grpc_mdctx_global_shutdown(&exec_ctx); grpc_handshaker_factory_registry_shutdown(&exec_ctx); grpc_slice_intern_shutdown(); + grpc_stats_shutdown(); } gpr_mu_unlock(&g_init_mu); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index a0791080a9..6286f9159d 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -40,6 +40,7 @@ namespace grpc_core { namespace { struct CallData { + grpc_call_combiner *call_combiner; grpc_linked_mdelem status; grpc_linked_mdelem details; grpc_core::atomic<bool> filled_metadata; @@ -52,14 +53,14 @@ struct ChannelData { static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *mdb) { - CallData *calld = static_cast<CallData *>(elem->call_data); + CallData *calld = reinterpret_cast<CallData *>(elem->call_data); bool expected = false; if (!calld->filled_metadata.compare_exchange_strong( expected, true, grpc_core::memory_order_relaxed, grpc_core::memory_order_relaxed)) { return; } - ChannelData *chand = static_cast<ChannelData *>(elem->channel_data); + ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data); char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(chand->error_code, tmp); calld->status.md = grpc_mdelem_from_slices( @@ -79,6 +80,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void lame_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { + CallData *calld = reinterpret_cast<CallData *>(elem->call_data); if (op->recv_initial_metadata) { fill_metadata(exec_ctx, elem, op->payload->recv_initial_metadata.recv_initial_metadata); @@ -87,12 +89,8 @@ static void lame_start_transport_stream_op_batch( op->payload->recv_trailing_metadata.recv_trailing_metadata); } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); -} - -static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - return NULL; + exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"), + calld->call_combiner); } static void lame_get_channel_info(grpc_exec_ctx *exec_ctx, @@ -122,6 +120,8 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { + CallData *calld = reinterpret_cast<CallData *>(elem->call_data); + calld->call_combiner = args->call_combiner; return GRPC_ERROR_NONE; } @@ -156,7 +156,6 @@ extern "C" const grpc_channel_filter grpc_lame_filter = { sizeof(grpc_core::ChannelData), grpc_core::init_channel_elem, grpc_core::destroy_channel_elem, - grpc_core::lame_get_peer, grpc_core::lame_get_channel_info, "lame-client", }; @@ -176,7 +175,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, "error_message=%s)", 3, (target, (int)error_code, error_message)); GPR_ASSERT(elem->filter == &grpc_lame_filter); - auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data); + auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data); chand->error_code = error_code; chand->error_message = error_message; grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 66dcc299aa..1d0fd472d0 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -29,6 +29,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" @@ -75,7 +76,7 @@ typedef struct requested_call { grpc_call_details *details; } batch; struct { - registered_method *registered_method; + registered_method *method; gpr_timespec *deadline; grpc_byte_buffer **optional_payload; } registered; @@ -144,7 +145,7 @@ struct call_data { uint32_t recv_initial_metadata_flags; grpc_metadata_array initial_metadata; - request_matcher *request_matcher; + request_matcher *matcher; grpc_byte_buffer *payload; grpc_closure got_initial_metadata; @@ -170,7 +171,7 @@ struct registered_method { grpc_server_register_method_payload_handling payload_handling; uint32_t flags; /* one request matcher per method */ - request_matcher request_matcher; + request_matcher matcher; registered_method *next; }; @@ -250,7 +251,8 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { count++; } cb->num_channels = count; - cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); + cb->channels = + (grpc_channel **)gpr_malloc(sizeof(*cb->channels) * cb->num_channels); count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { cb->channels[count++] = c->channel; @@ -265,14 +267,15 @@ struct shutdown_cleanup_args { static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - struct shutdown_cleanup_args *a = arg; + struct shutdown_cleanup_args *a = (struct shutdown_cleanup_args *)arg; grpc_slice_unref_internal(exec_ctx, a->slice); gpr_free(a); } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, bool send_goaway, grpc_error *send_disconnect) { - struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); + struct shutdown_cleanup_args *sc = + (struct shutdown_cleanup_args *)gpr_malloc(sizeof(*sc)); GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc, grpc_schedule_on_exec_ctx); grpc_transport_op *op = grpc_make_transport_op(&sc->closure); @@ -314,8 +317,8 @@ static void request_matcher_init(request_matcher *rm, size_t entries, grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; - rm->requests_per_cq = - gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); + rm->requests_per_cq = (gpr_stack_lockfree **)gpr_malloc( + sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); } @@ -331,7 +334,7 @@ static void request_matcher_destroy(request_matcher *rm) { static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, grpc_error *error) { - grpc_call_unref(grpc_call_from_top_element(elem)); + grpc_call_unref(grpc_call_from_top_element((grpc_call_element *)elem)); } static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, @@ -384,7 +387,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; if (server->started) { - request_matcher_destroy(&rm->request_matcher); + request_matcher_destroy(&rm->matcher); } gpr_free(rm->method); gpr_free(rm->host); @@ -426,7 +429,7 @@ static void orphan_channel(channel_data *chand) { static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, grpc_error *error) { - channel_data *chand = cd; + channel_data *chand = (channel_data *)cd; grpc_server *server = chand->server; GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); server_unref(exec_ctx, server); @@ -459,7 +462,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - requested_call *rc = req; + requested_call *rc = (requested_call *)req; grpc_server *server = rc->server; if (rc >= server->requested_calls_per_cq[rc->cq_idx] && @@ -505,7 +508,7 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - channel_data *chand = elem->channel_data; + channel_data *chand = (channel_data *)elem->channel_data; server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); @@ -513,10 +516,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *call_elem = arg; - call_data *calld = call_elem->call_data; - channel_data *chand = call_elem->channel_data; - request_matcher *rm = calld->request_matcher; + grpc_call_element *call_elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)call_elem->call_data; + channel_data *chand = (channel_data *)call_elem->channel_data; + request_matcher *rm = calld->matcher; grpc_server *server = rm->server; if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { @@ -538,6 +541,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, if (request_id == -1) { continue; } else { + GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i); gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); @@ -548,6 +552,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, } /* no cq to take the request found: queue it on the slow list */ + GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx); gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); calld->state = PENDING; @@ -566,7 +571,7 @@ static void finish_start_new_rpc( grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem, request_matcher *rm, grpc_server_register_method_payload_handling payload_handling) { - call_data *calld = elem->call_data; + call_data *calld = (call_data *)elem->call_data; if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_lock(&calld->mu_state); @@ -578,7 +583,7 @@ static void finish_start_new_rpc( return; } - calld->request_matcher = rm; + calld->matcher = rm; switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: @@ -599,8 +604,8 @@ static void finish_start_new_rpc( } static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; + channel_data *chand = (channel_data *)elem->channel_data; + call_data *calld = (call_data *)elem->call_data; grpc_server *server = chand->server; uint32_t i; uint32_t hash; @@ -624,7 +629,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { continue; } finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, + &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -642,7 +647,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { continue; } finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, + &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -663,7 +668,7 @@ static int num_listeners(grpc_server *server) { static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server, grpc_cq_completion *completion) { - server_unref(exec_ctx, server); + server_unref(exec_ctx, (grpc_server *)server); } static int num_channels(grpc_server *server) { @@ -686,9 +691,9 @@ static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, exec_ctx, &server->unregistered_request_matcher); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, + request_matcher_kill_requests(exec_ctx, server, &rm->matcher, GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher); } } GRPC_ERROR_UNREF(error); @@ -732,8 +737,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, grpc_error *error) { - grpc_call_element *elem = ptr; - call_data *calld = elem->call_data; + grpc_call_element *elem = (grpc_call_element *)ptr; + call_data *calld = (call_data *)elem->call_data; gpr_timespec op_deadline; if (error == GRPC_ERROR_NONE) { @@ -771,7 +776,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, static void server_mutate_op(grpc_call_element *elem, grpc_transport_stream_op_batch *op) { - call_data *calld = elem->call_data; + call_data *calld = (call_data *)elem->call_data; if (op->recv_initial_metadata) { GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == NULL); @@ -789,15 +794,14 @@ static void server_mutate_op(grpc_call_element *elem, static void server_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); } static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, grpc_error *error) { - grpc_call_element *elem = ptr; - call_data *calld = elem->call_data; + grpc_call_element *elem = (grpc_call_element *)ptr; + call_data *calld = (call_data *)elem->call_data; if (error == GRPC_ERROR_NONE) { start_new_rpc(exec_ctx, elem); } else { @@ -823,7 +827,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, grpc_transport *transport, const void *transport_server_data) { - channel_data *chand = cd; + channel_data *chand = (channel_data *)cd; /* create a call */ grpc_call_create_args args; memset(&args, 0, sizeof(args)); @@ -839,7 +843,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, GRPC_ERROR_UNREF(error); return; } - call_data *calld = elem->call_data; + call_data *calld = (call_data *)elem->call_data; grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_INITIAL_METADATA; @@ -853,7 +857,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, grpc_error *error) { - channel_data *chand = cd; + channel_data *chand = (channel_data *)cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op *op = grpc_make_transport_op(NULL); @@ -874,8 +878,8 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; + call_data *calld = (call_data *)elem->call_data; + channel_data *chand = (channel_data *)elem->channel_data; memset(calld, 0, sizeof(call_data)); calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); calld->call = grpc_call_from_top_element(elem); @@ -892,8 +896,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *ignored) { - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; + channel_data *chand = (channel_data *)elem->channel_data; + call_data *calld = (call_data *)elem->call_data; GPR_ASSERT(calld->state != PENDING); @@ -914,7 +918,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { - channel_data *chand = elem->channel_data; + channel_data *chand = (channel_data *)elem->channel_data; GPR_ASSERT(args->is_first); GPR_ASSERT(!args->is_last); chand->server = NULL; @@ -931,7 +935,7 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { size_t i; - channel_data *chand = elem->channel_data; + channel_data *chand = (channel_data *)elem->channel_data; if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { grpc_slice_unref_internal(exec_ctx, chand->registered_methods[i].method); @@ -962,7 +966,6 @@ const grpc_channel_filter grpc_server_top_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "server", }; @@ -978,8 +981,8 @@ static void register_completion_queue(grpc_server *server, GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; - server->cqs = gpr_realloc(server->cqs, - server->cq_count * sizeof(grpc_completion_queue *)); + server->cqs = (grpc_completion_queue **)gpr_realloc( + server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); server->cqs[n] = cq; } @@ -1004,7 +1007,7 @@ void grpc_server_register_completion_queue(grpc_server *server, grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); - grpc_server *server = gpr_zalloc(sizeof(grpc_server)); + grpc_server *server = (grpc_server *)gpr_zalloc(sizeof(grpc_server)); gpr_mu_init(&server->mu_global); gpr_mu_init(&server->mu_call); @@ -1055,7 +1058,7 @@ void *grpc_server_register_method( flags); return NULL; } - m = gpr_zalloc(sizeof(registered_method)); + m = (registered_method *)gpr_zalloc(sizeof(registered_method)); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -1067,7 +1070,7 @@ void *grpc_server_register_method( static void start_listeners(grpc_exec_ctx *exec_ctx, void *s, grpc_error *error) { - grpc_server *server = s; + grpc_server *server = (grpc_server *)s; for (listener *l = server->listeners; l; l = l->next) { l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count); } @@ -1088,11 +1091,12 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; - server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); - server->request_freelist_per_cq = - gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); - server->requested_calls_per_cq = - gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); + server->pollsets = + (grpc_pollset **)gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = (gpr_stack_lockfree **)gpr_malloc( + sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = (requested_call **)gpr_malloc( + sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = @@ -1103,22 +1107,24 @@ void grpc_server_start(grpc_server *server) { for (int j = 0; j < server->max_requested_calls_per_cq; j++) { gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); } - server->requested_calls_per_cq[i] = - gpr_malloc((size_t)server->max_requested_calls_per_cq * - sizeof(*server->requested_calls_per_cq[i])); + server->requested_calls_per_cq[i] = (requested_call *)gpr_malloc( + (size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } request_matcher_init(&server->unregistered_request_matcher, (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, + request_matcher_init(&rm->matcher, (size_t)server->max_requested_calls_per_cq, server); } server_ref(server); server->starting = true; - GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server, - grpc_executor_scheduler), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + &exec_ctx, + GRPC_CLOSURE_CREATE(start_listeners, server, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), + GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } @@ -1173,7 +1179,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, if (num_registered_methods > 0) { slots = 2 * num_registered_methods; alloc = sizeof(channel_registered_method) * slots; - chand->registered_methods = gpr_zalloc(alloc); + chand->registered_methods = (channel_registered_method *)gpr_zalloc(alloc); for (rm = s->registered_methods; rm; rm = rm->next) { grpc_slice host; bool has_host; @@ -1234,7 +1240,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s, grpc_error *error) { - grpc_server *server = s; + grpc_server *server = (grpc_server *)s; gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; maybe_finish_shutdown(exec_ctx, server); @@ -1261,14 +1267,15 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* stay locked, and gather up some stuff to do */ GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { - grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, - NULL, gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op( + &exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, + (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); goto done; } - server->shutdown_tags = - gpr_realloc(server->shutdown_tags, - sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)); + server->shutdown_tags = (shutdown_tag *)gpr_realloc( + server->shutdown_tags, + sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)); sdt = &server->shutdown_tags[server->num_shutdown_tags++]; sdt->tag = tag; sdt->cq = cq; @@ -1351,7 +1358,7 @@ void grpc_server_add_listener( grpc_pollset **pollsets, size_t pollset_count), void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg, grpc_closure *on_done)) { - listener *l = gpr_malloc(sizeof(listener)); + listener *l = (listener *)gpr_malloc(sizeof(listener)); l->arg = arg; l->start = start; l->destroy = destroy; @@ -1384,7 +1391,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &server->unregistered_request_matcher; break; case REGISTERED_CALL: - rm = &rc->data.registered.registered_method->request_matcher; + rm = &rc->data.registered.method->matcher; break; } server->requested_calls_per_cq[cq_idx][request_id] = *rc; @@ -1428,7 +1435,8 @@ grpc_call_error grpc_server_request_call( grpc_completion_queue *cq_for_notification, void *tag) { grpc_call_error error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - requested_call *rc = gpr_malloc(sizeof(*rc)); + requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc)); + GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx); GRPC_API_TRACE( "grpc_server_request_call(" "server=%p, call=%p, details=%p, initial_metadata=%p, " @@ -1473,8 +1481,9 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue *cq_for_notification, void *tag) { grpc_call_error error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - requested_call *rc = gpr_malloc(sizeof(*rc)); - registered_method *rm = rmp; + requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc)); + registered_method *rm = (registered_method *)rmp; + GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx); GRPC_API_TRACE( "grpc_server_request_registered_call(" "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, " @@ -1511,7 +1520,7 @@ grpc_call_error grpc_server_request_registered_call( rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->call = call; - rc->data.registered.registered_method = rm; + rc->data.registered.method = rm; rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 96c16105e7..fd6ea4daa9 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -21,6 +21,6 @@ #include <grpc/grpc.h> -const char *grpc_version_string(void) { return "4.0.0-dev"; } +const char *grpc_version_string(void) { return "5.0.0-dev"; } const char *grpc_g_stands_for(void) { return "gambit"; } |