diff options
author | Craig Tiller <ctiller@google.com> | 2017-08-31 12:24:15 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-08-31 12:24:15 -0700 |
commit | 890f542498a8af4c05f22e42d818f3b0eeafaea8 (patch) | |
tree | 4ccf813375c1744629ed43c5c5eaa62fbdffe7ac /src/core | |
parent | 5489d41c15926abbf12a5b8d27b24d1d605d7f0f (diff) | |
parent | ccad38227f63797318d7cffcba8a2df783394ccd (diff) |
Merge branch 'stats' into stats_histo
Diffstat (limited to 'src/core')
38 files changed, 829 insertions, 1211 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index 3e10f61154..13fe2e6b1c 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -179,6 +179,7 @@ const grpc_channel_filter grpc_client_census_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "census-client"}; @@ -192,5 +193,6 @@ const grpc_channel_filter grpc_server_census_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "census-server"}; diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index e6822ce801..58e31d7b45 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -796,8 +796,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, // send_message // recv_trailing_metadata // send_trailing_metadata -// We also add room for a single cancel_stream batch. -#define MAX_WAITING_BATCHES 7 +#define MAX_WAITING_BATCHES 6 /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. @@ -809,25 +808,23 @@ typedef struct client_channel_call_data { // The code in deadline_filter.c requires this to be the first field. // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state // and this struct both independently store a pointer to the call - // combiner. If/when we have time, find a way to avoid this without - // breaking the grpc_deadline_state abstraction. + // stack and each has its own mutex. If/when we have time, find a way + // to avoid this without breaking the grpc_deadline_state abstraction. grpc_deadline_state deadline_state; grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; - gpr_arena *arena; - grpc_call_combiner *call_combiner; - grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; - grpc_subchannel_call *subchannel_call; - grpc_error *error; + /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest + bit is 0), or a pointer to an error (if the lowest bit is 1) */ + gpr_atm subchannel_call_or_error; + gpr_arena *arena; grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending. grpc_closure lb_pick_closure; - grpc_closure cancel_closure; grpc_connected_subchannel *connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; @@ -835,9 +832,10 @@ typedef struct client_channel_call_data { grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES]; size_t waiting_for_pick_batches_count; - grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES]; - grpc_transport_stream_op_batch *initial_metadata_batch; + grpc_transport_stream_op_batch_payload *initial_metadata_payload; + + grpc_call_stack *owning_call; grpc_linked_mdelem lb_token_mdelem; @@ -845,42 +843,55 @@ typedef struct client_channel_call_data { grpc_closure *original_on_complete; } call_data; -grpc_subchannel_call *grpc_client_channel_get_subchannel_call( - grpc_call_element *elem) { - call_data *calld = elem->call_data; - return calld->subchannel_call; +typedef struct { + grpc_subchannel_call *subchannel_call; + grpc_error *error; +} call_or_error; + +static call_or_error get_call_or_error(call_data *p) { + gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error); + if (c == 0) + return (call_or_error){NULL, NULL}; + else if (c & 1) + return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)}; + else + return (call_or_error){(grpc_subchannel_call *)c, NULL}; } -// This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_add( - call_data *calld, grpc_transport_stream_op_batch *batch) { - if (batch->send_initial_metadata) { - GPR_ASSERT(calld->initial_metadata_batch == NULL); - calld->initial_metadata_batch = batch; +static bool set_call_or_error(call_data *p, call_or_error coe) { + // this should always be under a lock + call_or_error existing = get_call_or_error(p); + if (existing.error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(coe.error); + return false; + } + GPR_ASSERT(existing.subchannel_call == NULL); + if (coe.error != GRPC_ERROR_NONE) { + GPR_ASSERT(coe.subchannel_call == NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error); } else { - GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES); - calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] = - batch; + GPR_ASSERT(coe.subchannel_call != NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, + (gpr_atm)coe.subchannel_call); } + return true; } -// This is called via the call combiner, so access to calld is synchronized. -static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error) { - call_data *calld = arg; - if (calld->waiting_for_pick_batches_count > 0) { - --calld->waiting_for_pick_batches_count; - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, - calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count], - GRPC_ERROR_REF(error), calld->call_combiner); - } +grpc_subchannel_call *grpc_client_channel_get_subchannel_call( + grpc_call_element *call_elem) { + return get_call_or_error(call_elem->call_data).subchannel_call; } -// This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_error *error) { +static void waiting_for_pick_batches_add_locked( + call_data *calld, grpc_transport_stream_op_batch *batch) { + GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES); + calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] = + batch; +} + +static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { call_data *calld = elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, @@ -889,60 +900,34 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, grpc_error_string(error)); } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { - GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i], - fail_pending_batch_in_call_combiner, calld, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, - &calld->handle_pending_batch_in_call_combiner[i], - GRPC_ERROR_REF(error), - "waiting_for_pick_batches_fail"); - } - if (calld->initial_metadata_batch != NULL) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->initial_metadata_batch, GRPC_ERROR_REF(error), - calld->call_combiner); - } else { - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "waiting_for_pick_batches_fail"); + exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error)); } + calld->waiting_for_pick_batches_count = 0; GRPC_ERROR_UNREF(error); } -// This is called via the call combiner, so access to calld is synchronized. -static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *ignored) { - call_data *calld = arg; - if (calld->waiting_for_pick_batches_count > 0) { - --calld->waiting_for_pick_batches_count; - grpc_subchannel_call_process_op( - exec_ctx, calld->subchannel_call, - calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count]); - } -} - -// This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - channel_data *chand = elem->channel_data; +static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { call_data *calld = elem->call_data; + if (calld->waiting_for_pick_batches_count == 0) return; + call_or_error coe = get_call_or_error(calld); + if (coe.error != GRPC_ERROR_NONE) { + waiting_for_pick_batches_fail_locked(exec_ctx, elem, + GRPC_ERROR_REF(coe.error)); + return; + } if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR " pending batches to subchannel_call=%p", - chand, calld, calld->waiting_for_pick_batches_count, - calld->subchannel_call); + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + coe.subchannel_call); } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { - GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i], - run_pending_batch_in_call_combiner, calld, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, - &calld->handle_pending_batch_in_call_combiner[i], - GRPC_ERROR_NONE, - "waiting_for_pick_batches_resume"); - } - GPR_ASSERT(calld->initial_metadata_batch != NULL); - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, - calld->initial_metadata_batch); + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, + calld->waiting_for_pick_batches[i]); + } + calld->waiting_for_pick_batches_count = 0; } // Applies service config to the call. Must be invoked once we know @@ -983,28 +968,29 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_error *error) { - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + grpc_subchannel_call *subchannel_call = NULL; const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, .arena = calld->arena, - .context = calld->subchannel_call_context, - .call_combiner = calld->call_combiner}; + .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, &call_args, - &calld->subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", - chand, calld, calld->subchannel_call, grpc_error_string(new_error)); + elem->channel_data, calld, subchannel_call, + grpc_error_string(new_error)); } + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); - waiting_for_pick_batches_fail(exec_ctx, elem, new_error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error); } else { - waiting_for_pick_batches_resume(exec_ctx, elem); + waiting_for_pick_batches_resume_locked(exec_ctx, elem); } GRPC_ERROR_UNREF(error); } @@ -1016,27 +1002,60 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); + call_or_error coe = get_call_or_error(calld); if (calld->connected_subchannel == NULL) { // Failed to create subchannel. - GRPC_ERROR_UNREF(calld->error); - calld->error = error == GRPC_ERROR_NONE - ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Call dropped by load balancing policy") - : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1); + grpc_error *failure = + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1); if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: failed to create subchannel: error=%s", chand, - calld, grpc_error_string(calld->error)); + calld, grpc_error_string(failure)); + } + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure); + } else if (coe.error != GRPC_ERROR_NONE) { + /* already cancelled before subchannel became ready */ + grpc_error *child_errors[] = {error, coe.error}; + grpc_error *cancellation_error = + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Cancelled before creating subchannel", child_errors, + GPR_ARRAY_SIZE(child_errors)); + /* if due to deadline, attach the deadline exceeded status to the error */ + if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) { + cancellation_error = + grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_DEADLINE_EXCEEDED); } - waiting_for_pick_batches_fail(exec_ctx, elem, GRPC_ERROR_REF(calld->error)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: cancelled before subchannel became ready: %s", + chand, calld, grpc_error_string(cancellation_error)); + } + waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error); } else { /* Create call on subchannel. */ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); GRPC_ERROR_UNREF(error); } +static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + call_data *calld = elem->call_data; + grpc_subchannel_call *subchannel_call = + get_call_or_error(calld).subchannel_call; + if (subchannel_call == NULL) { + return NULL; + } else { + return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + } +} + /** Return true if subchannel is available immediately (in which case subchannel_ready_locked() should not be called), or false otherwise (in which case subchannel_ready_locked() should be called when the subchannel @@ -1050,44 +1069,6 @@ typedef struct { grpc_closure closure; } pick_after_resolver_result_args; -// Note: This runs under the client_channel combiner, but will NOT be -// holding the call combiner. -static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error) { - grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; - // If we don't yet have a resolver result, then a closure for - // pick_after_resolver_result_done_locked() will have been added to - // chand->waiting_for_resolver_result_closures, and it may not be invoked - // until after this call has been destroyed. We mark the operation as - // cancelled, so that when pick_after_resolver_result_done_locked() - // is called, it will be a no-op. We also immediately invoke - // subchannel_ready_locked() to propagate the error back to the caller. - for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head; - closure != NULL; closure = closure->next_data.next) { - pick_after_resolver_result_args *args = closure->cb_arg; - if (!args->cancelled && args->elem == elem) { - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: " - "cancelling pick waiting for resolver result", - chand, calld); - } - args->cancelled = true; - // Note: Although we are not in the call combiner here, we are - // basically stealing the call combiner from the pending pick, so - // it's safe to call subchannel_ready_locked() here -- we are - // essentially calling it here instead of calling it in - // pick_after_resolver_result_done_locked(). - subchannel_ready_locked(exec_ctx, elem, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); - } - } -} - static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -1098,24 +1079,21 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "call cancelled before resolver result"); } } else { - grpc_call_element *elem = args->elem; - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; - grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, - NULL); + channel_data *chand = args->elem->channel_data; + call_data *calld = args->elem->call_data; if (error != GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", chand, calld); } - subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); } else { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); } - if (pick_subchannel_locked(exec_ctx, elem)) { - subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_NONE); + if (pick_subchannel_locked(exec_ctx, args->elem)) { + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); } } } @@ -1138,33 +1116,41 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, args, grpc_combiner_scheduler(chand->combiner)); grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, &args->closure, GRPC_ERROR_NONE); - grpc_call_combiner_set_notify_on_cancel( - exec_ctx, calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->cancel_closure, - pick_after_resolver_result_cancel_locked, elem, - grpc_combiner_scheduler(chand->combiner))); } -// Note: This runs under the client_channel combiner, but will NOT be -// holding the call combiner. -static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = arg; +static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (calld->lb_policy != NULL) { - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", - chand, calld, calld->lb_policy); + // If we don't yet have a resolver result, then a closure for + // pick_after_resolver_result_done_locked() will have been added to + // chand->waiting_for_resolver_result_closures, and it may not be invoked + // until after this call has been destroyed. We mark the operation as + // cancelled, so that when pick_after_resolver_result_done_locked() + // is called, it will be a no-op. We also immediately invoke + // subchannel_ready_locked() to propagate the error back to the caller. + for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head; + closure != NULL; closure = closure->next_data.next) { + pick_after_resolver_result_args *args = closure->cb_arg; + if (!args->cancelled && args->elem == elem) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: " + "cancelling pick waiting for resolver result", + chand, calld); + } + args->cancelled = true; + subchannel_ready_locked(exec_ctx, elem, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); } - grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, - &calld->connected_subchannel, - GRPC_ERROR_REF(error)); } + GRPC_ERROR_UNREF(error); } // Callback invoked by grpc_lb_policy_pick_locked() for async picks. -// Unrefs the LB policy and invokes subchannel_ready_locked(). +// Unrefs the LB policy after invoking subchannel_ready_locked(). static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; @@ -1174,7 +1160,6 @@ static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", chand, calld); } - grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL); GPR_ASSERT(calld->lb_policy != NULL); GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); calld->lb_policy = NULL; @@ -1209,15 +1194,24 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, } GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); calld->lb_policy = NULL; - } else { - grpc_call_combiner_set_notify_on_cancel( - exec_ctx, calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->cancel_closure, pick_callback_cancel_locked, - elem, grpc_combiner_scheduler(chand->combiner))); } return pick_done; } +static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + GPR_ASSERT(calld->lb_policy != NULL); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", + chand, calld, calld->lb_policy); + } + grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, + &calld->connected_subchannel, error); +} + static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { GPR_TIMER_BEGIN("pick_subchannel", 0); @@ -1230,7 +1224,7 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, // Otherwise, if the service config specified a value for this // method, use that. uint32_t initial_metadata_flags = - calld->initial_metadata_batch->payload->send_initial_metadata + calld->initial_metadata_payload->send_initial_metadata .send_initial_metadata_flags; const bool wait_for_ready_set_from_api = initial_metadata_flags & @@ -1247,7 +1241,7 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, } } const grpc_lb_policy_pick_args inputs = { - calld->initial_metadata_batch->payload->send_initial_metadata + calld->initial_metadata_payload->send_initial_metadata .send_initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem}; pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs); @@ -1264,33 +1258,91 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, return pick_done; } -static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("start_pick_locked", 0); - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; - GPR_ASSERT(calld->connected_subchannel == NULL); - if (pick_subchannel_locked(exec_ctx, elem)) { - // Pick was returned synchronously. - if (calld->connected_subchannel == NULL) { - GRPC_ERROR_UNREF(calld->error); - calld->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Call dropped by load balancing policy"); - waiting_for_pick_batches_fail(exec_ctx, elem, - GRPC_ERROR_REF(calld->error)); +static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); + grpc_transport_stream_op_batch *batch = arg; + grpc_call_element *elem = batch->handler_private.extra_arg; + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + /* need to recheck that another thread hasn't set the call */ + call_or_error coe = get_call_or_error(calld); + if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); + goto done; + } + if (coe.subchannel_call != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); + goto done; + } + // Add to waiting-for-pick list. If we succeed in getting a + // subchannel call below, we'll handle this batch (along with any + // other waiting batches) in waiting_for_pick_batches_resume_locked(). + waiting_for_pick_batches_add_locked(calld, batch); + // If this is a cancellation, cancel the pending pick (if any) and + // fail any pending batches. + if (batch->cancel_stream) { + grpc_error *error = batch->payload->cancel_stream.cancel_error; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, + calld, grpc_error_string(error)); + } + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any batches are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first batch does get passed down. */ + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); + if (calld->lb_policy != NULL) { + pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } else { - // Create subchannel call. - create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE); + pick_after_resolver_result_cancel_locked(exec_ctx, elem, + GRPC_ERROR_REF(error)); } - } else { - // Pick will be done asynchronously. Add the call's polling entity to - // the channel's interested_parties, so that I/O for the resolver - // and LB policy can be done under it. - grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, - chand->interested_parties); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + goto done; } - GPR_TIMER_END("start_pick_locked", 0); + /* if we don't have a subchannel, try to get one */ + if (batch->send_initial_metadata) { + GPR_ASSERT(calld->connected_subchannel == NULL); + calld->initial_metadata_payload = batch->payload; + GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); + /* If a subchannel is not available immediately, the polling entity from + call_data should be provided to channel_data's interested_parties, so + that IO of the lb_policy and resolver could be done under it. */ + if (pick_subchannel_locked(exec_ctx, elem)) { + // Pick was returned synchronously. + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); + if (calld->connected_subchannel == NULL) { + grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy"); + set_call_or_error(calld, + (call_or_error){.error = GRPC_ERROR_REF(error)}); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, error); + } else { + // Create subchannel call. + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE); + } + } else { + grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, + chand->interested_parties); + } + } +done: + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "start_transport_stream_op_batch"); + GPR_TIMER_END("start_transport_stream_op_batch_locked", 0); } static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -1313,49 +1365,27 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GRPC_ERROR_REF(error)); } +/* The logic here is fairly complicated, due to (a) the fact that we + need to handle the case where we receive the send op before the + initial metadata op, and (b) the need for efficiency, especially in + the streaming case. + + We use double-checked locking to initially see if initialization has been + performed. If it has not, we acquire the combiner and perform initialization. + If it has, we proceed on the fast path. */ static void cc_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace) || + GRPC_TRACER_ON(grpc_trace_channel)) { + grpc_call_log_op(GPR_INFO, elem, batch); + } if (chand->deadline_checking_enabled) { grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, batch); } - GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); - // If we've previously been cancelled, immediately fail any new batches. - if (calld->error != GRPC_ERROR_NONE) { - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", - chand, calld, grpc_error_string(calld->error)); - } - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, batch, GRPC_ERROR_REF(calld->error), calld->call_combiner); - goto done; - } - if (batch->cancel_stream) { - // Stash a copy of cancel_error in our call data, so that we can use - // it for subsequent operations. This ensures that if the call is - // cancelled before any batches are passed down (e.g., if the deadline - // is in the past when the call starts), we can return the right - // error to the caller when the first batch does get passed down. - GRPC_ERROR_UNREF(calld->error); - calld->error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, - calld, grpc_error_string(calld->error)); - } - // If we have a subchannel call, send the cancellation batch down. - // Otherwise, fail all pending batches. - if (calld->subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch); - } else { - waiting_for_pick_batches_add(calld, batch); - waiting_for_pick_batches_fail(exec_ctx, elem, - GRPC_ERROR_REF(calld->error)); - } - goto done; - } // Intercept on_complete for recv_trailing_metadata so that we can // check retry throttle status. if (batch->recv_trailing_metadata) { @@ -1365,43 +1395,38 @@ static void cc_start_transport_stream_op_batch( grpc_schedule_on_exec_ctx); batch->on_complete = &calld->on_complete; } - // Check if we've already gotten a subchannel call. - // Note that once we have completed the pick, we do not need to enter - // the channel combiner, which is more efficient (especially for - // streaming calls). - if (calld->subchannel_call != NULL) { + /* try to (atomically) get the call */ + call_or_error coe = get_call_or_error(calld); + GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); + if (coe.error != GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, - calld, calld->subchannel_call); + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); } - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch); + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); goto done; } - // We do not yet have a subchannel call. - // Add the batch to the waiting-for-pick list. - waiting_for_pick_batches_add(calld, batch); - // For batches containing a send_initial_metadata op, enter the channel - // combiner to start a pick. - if (batch->send_initial_metadata) { - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld); - } - GRPC_CLOSURE_SCHED( - exec_ctx, - GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, - elem, grpc_combiner_scheduler(chand->combiner)), - GRPC_ERROR_NONE); - } else { - // For all other batches, release the call combiner. + if (coe.subchannel_call != NULL) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, - "chand=%p calld=%p: saved batch, yeilding call combiner", chand, - calld); + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); } - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "batch does not include send_initial_metadata"); + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); + goto done; + } + /* we failed; lock and figure out what to do */ + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld); } + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); + batch->handler_private.extra_arg = elem; + GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure, + start_transport_stream_op_batch_locked, batch, + grpc_combiner_scheduler(chand->combiner)), + GRPC_ERROR_NONE); done: GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1416,11 +1441,10 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + calld->owning_call = args->call_stack; calld->arena = args->arena; - calld->call_combiner = args->call_combiner; if (chand->deadline_checking_enabled) { - grpc_deadline_state_init(exec_ctx, elem, args->call_stack, - args->call_combiner, calld->deadline); + grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline); } return GRPC_ERROR_NONE; } @@ -1439,12 +1463,13 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, if (calld->method_params != NULL) { method_parameters_unref(calld->method_params); } - GRPC_ERROR_UNREF(calld->error); - if (calld->subchannel_call != NULL) { - grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, + call_or_error coe = get_call_or_error(calld); + GRPC_ERROR_UNREF(coe.error); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call, then_schedule_closure); then_schedule_closure = NULL; - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, calld->subchannel_call, + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, "client_channel_destroy_call"); } GPR_ASSERT(calld->lb_policy == NULL); @@ -1483,6 +1508,7 @@ const grpc_channel_filter grpc_client_channel_filter = { sizeof(channel_data), cc_init_channel_elem, cc_destroy_channel_elem, + cc_get_peer, cc_get_channel_info, "client-channel", }; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index 299f26b4de..568bb2ba8d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -132,5 +132,6 @@ const grpc_channel_filter grpc_client_load_reporting_filter = { 0, // sizeof(channel_data) init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "client_load_reporting"}; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index fd0fb41fb9..a50ba09bf5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -296,8 +296,6 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { pick_first_lb_policy *p = (pick_first_lb_policy *)policy; - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -317,11 +315,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, return; } const grpc_lb_addresses *addresses = arg->value.pointer.p; - size_t num_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - if (!addresses->addresses[i].is_balancer) ++num_addrs; - } - if (num_addrs == 0) { + if (addresses->num_addresses == 0) { // Empty update. Unsubscribe from all current subchannels and put the // channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( @@ -333,9 +327,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", - (void *)p, (unsigned long)num_addrs); + (void *)p, (unsigned long)addresses->num_addresses); } - grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs); + grpc_subchannel_args *sc_args = + gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses); /* We remove the following keys in order for subchannel keys belonging to * subchannels point to the same address to match. */ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, @@ -344,7 +339,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, /* Create list of subchannel args for new addresses in \a args. */ for (size_t i = 0; i < addresses->num_addresses; i++) { - if (addresses->addresses[i].is_balancer) continue; + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); if (addresses->addresses[i].user_data != NULL) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 110a9c8047..866fb9a1eb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -737,8 +737,6 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { round_robin_lb_policy *p = (round_robin_lb_policy *)policy; - /* Find the number of backend addresses. We ignore balancer addresses, since - * we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -757,12 +755,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, return; } grpc_lb_addresses *addresses = arg->value.pointer.p; - size_t num_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - if (!addresses->addresses[i].is_balancer) ++num_addrs; - } - rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs); - if (num_addrs == 0) { + rr_subchannel_list *subchannel_list = + rr_subchannel_list_create(p, addresses->num_addresses); + if (addresses->num_addresses == 0) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), @@ -794,9 +789,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, GRPC_ARG_LB_ADDRESSES}; /* Create subchannels for addresses in the update. */ for (size_t i = 0; i < addresses->num_addresses; i++) { - /* Skip balancer addresses, since we only know how to handle backends. */ - if (addresses->addresses[i].is_balancer) continue; - GPR_ASSERT(i < num_addrs); + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 5cc8be7628..5788819331 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -724,6 +724,13 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } +char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call *call) { + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); + grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); + return top_elem->filter->get_peer(exec_ctx, top_elem); +} + void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call, grpc_transport_stream_op_batch *op) { @@ -753,15 +760,13 @@ grpc_error *grpc_connected_subchannel_create_call( args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - const grpc_call_element_args call_args = { - .call_stack = callstk, - .server_transport_data = NULL, - .context = args->context, - .path = args->path, - .start_time = args->start_time, - .deadline = args->deadline, - .arena = args->arena, - .call_combiner = args->call_combiner}; + const grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = args->context, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; grpc_error *error = grpc_call_stack_init( exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 51d712f6a7..6d2abb04df 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -106,7 +106,6 @@ typedef struct { gpr_timespec deadline; gpr_arena *arena; grpc_call_context_element *context; - grpc_call_combiner *call_combiner; } grpc_connected_subchannel_call_args; grpc_error *grpc_connected_subchannel_create_call( @@ -151,6 +150,10 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call, grpc_transport_stream_op_batch *op); +/** continue querying for peer */ +char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call *subchannel_call); + /** Must be called once per call. Sets the 'then_schedule_closure' argument for call stack destruction. */ void grpc_subchannel_call_set_cleanup_closure( diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c index 565b0679dc..6789903c95 100644 --- a/src/core/ext/filters/deadline/deadline_filter.c +++ b/src/core/ext/filters/deadline/deadline_filter.c @@ -34,56 +34,22 @@ // grpc_deadline_state // -// The on_complete callback used when sending a cancel_error batch down the -// filter stack. Yields the call combiner when the batch returns. -static void yield_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* ignored) { - grpc_deadline_state* deadline_state = arg; - GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner, - "got on_complete from cancel_stream batch"); - GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); -} - -// This is called via the call combiner, so access to deadline_state is -// synchronized. -static void send_cancel_op_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - grpc_call_element* elem = arg; - grpc_deadline_state* deadline_state = elem->call_data; - grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op( - GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner, - deadline_state, grpc_schedule_on_exec_ctx)); - batch->cancel_stream = true; - batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error); - elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch); -} - // Timer callback. static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (error != GRPC_ERROR_CANCELLED) { - error = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); - grpc_call_combiner_cancel(exec_ctx, deadline_state->call_combiner, - GRPC_ERROR_REF(error)); - GRPC_CLOSURE_INIT(&deadline_state->timer_callback, - send_cancel_op_in_call_combiner, elem, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner, - &deadline_state->timer_callback, error, - "deadline exceeded -- sending cancel_stream op"); - } else { - GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, - "deadline_timer"); + grpc_call_element_signal_error( + exec_ctx, elem, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED)); } + GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } // Starts the deadline timer. -// This is called via the call combiner, so access to deadline_state is -// synchronized. static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec deadline) { @@ -92,39 +58,51 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, return; } grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; + grpc_deadline_timer_state cur_state; grpc_closure* closure = NULL; - switch (deadline_state->timer_state) { +retry: + cur_state = + (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state); + switch (cur_state) { case GRPC_DEADLINE_STATE_PENDING: // Note: We do not start the timer if there is already a timer return; case GRPC_DEADLINE_STATE_FINISHED: - deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING; - // If we've already created and destroyed a timer, we always create a - // new closure: we have no other guarantee that the inlined closure is - // not in use (it may hold a pending call to timer_callback) - closure = - GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx); + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_FINISHED, + GRPC_DEADLINE_STATE_PENDING)) { + // If we've already created and destroyed a timer, we always create a + // new closure: we have no other guarantee that the inlined closure is + // not in use (it may hold a pending call to timer_callback) + closure = GRPC_CLOSURE_CREATE(timer_callback, elem, + grpc_schedule_on_exec_ctx); + } else { + goto retry; + } break; case GRPC_DEADLINE_STATE_INITIAL: - deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING; - closure = - GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback, - elem, grpc_schedule_on_exec_ctx); + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING)) { + closure = + GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback, + elem, grpc_schedule_on_exec_ctx); + } else { + goto retry; + } break; } - GPR_ASSERT(closure != NULL); + GPR_ASSERT(closure); GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure, gpr_now(GPR_CLOCK_MONOTONIC)); } // Cancels the deadline timer. -// This is called via the call combiner, so access to deadline_state is -// synchronized. static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_deadline_state* deadline_state) { - if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) { - deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED; + if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING, + GRPC_DEADLINE_STATE_FINISHED)) { grpc_timer_cancel(exec_ctx, &deadline_state->timer); } else { // timer was either in STATE_INITAL (nothing to cancel) @@ -153,7 +131,6 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state, // Callback and associated state for starting the timer after call stack // initialization has been completed. struct start_timer_after_init_state { - bool in_call_combiner; grpc_call_element* elem; gpr_timespec deadline; grpc_closure closure; @@ -161,29 +138,15 @@ struct start_timer_after_init_state { static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { struct start_timer_after_init_state* state = arg; - grpc_deadline_state* deadline_state = state->elem->call_data; - if (!state->in_call_combiner) { - // We are initially called without holding the call combiner, so we - // need to bounce ourselves into it. - state->in_call_combiner = true; - GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner, - &state->closure, GRPC_ERROR_REF(error), - "scheduling deadline timer"); - return; - } start_timer_if_needed(exec_ctx, state->elem, state->deadline); gpr_free(state); - GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner, - "done scheduling deadline timer"); } void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack, - grpc_call_combiner* call_combiner, gpr_timespec deadline) { grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; deadline_state->call_stack = call_stack; - deadline_state->call_combiner = call_combiner; // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -195,7 +158,7 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, // call stack initialization is finished. To avoid that problem, we // create a closure to start the timer, and we schedule that closure // to be run after call stack initialization is done. - struct start_timer_after_init_state* state = gpr_zalloc(sizeof(*state)); + struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state)); state->elem = elem; state->deadline = deadline; GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state, @@ -269,8 +232,7 @@ typedef struct server_call_data { static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - grpc_deadline_state_init(exec_ctx, elem, args->call_stack, - args->call_combiner, args->deadline); + grpc_deadline_state_init(exec_ctx, elem, args->call_stack, args->deadline); return GRPC_ERROR_NONE; } @@ -348,6 +310,7 @@ const grpc_channel_filter grpc_client_deadline_filter = { 0, // sizeof(channel_data) init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "deadline", }; @@ -362,6 +325,7 @@ const grpc_channel_filter grpc_server_deadline_filter = { 0, // sizeof(channel_data) init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "deadline", }; diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h index 3eb102ad28..420bf7065a 100644 --- a/src/core/ext/filters/deadline/deadline_filter.h +++ b/src/core/ext/filters/deadline/deadline_filter.h @@ -31,8 +31,7 @@ typedef enum grpc_deadline_timer_state { typedef struct grpc_deadline_state { // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; - grpc_call_combiner* call_combiner; - grpc_deadline_timer_state timer_state; + gpr_atm timer_state; grpc_timer timer; grpc_closure timer_callback; // Closure to invoke when the call is complete. @@ -51,7 +50,6 @@ typedef struct grpc_deadline_state { // assumes elem->call_data is zero'd void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack, - grpc_call_combiner* call_combiner, gpr_timespec deadline); void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem); @@ -63,8 +61,6 @@ void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, // to ensure that the timer callback is not invoked while it is in the // process of being reset, which means that attempting to increase the // deadline may result in the timer being called twice. -// -// Note: Must be called while holding the call combiner. void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline); @@ -74,8 +70,6 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, // // Note: It is the caller's responsibility to chain to the next filter if // necessary after this function returns. -// -// Note: Must be called while holding the call combiner. void grpc_deadline_state_client_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op); diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c index 99ddd08e6a..3ca01a41b5 100644 --- a/src/core/ext/filters/http/client/http_client_filter.c +++ b/src/core/ext/filters/http/client/http_client_filter.c @@ -36,7 +36,6 @@ static const size_t kMaxPayloadSizeForGet = 2048; typedef struct call_data { - grpc_call_combiner *call_combiner; // State for handling send_initial_metadata ops. grpc_linked_mdelem method; grpc_linked_mdelem scheme; @@ -216,13 +215,13 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, call_data *calld = (call_data *)elem->call_data; if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + exec_ctx, calld->send_message_batch, error); return; } error = pull_slice_from_send_message(exec_ctx, calld); if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + exec_ctx, calld->send_message_batch, error); return; } // There may or may not be more to read, but we don't care. If we got @@ -415,7 +414,7 @@ static void hc_start_transport_stream_op_batch( done: if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, error, calld->call_combiner); + exec_ctx, calld->send_message_batch, error); } else if (!batch_will_be_handled_asynchronously) { grpc_call_next_op(exec_ctx, elem, batch); } @@ -427,7 +426,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = (call_data *)elem->call_data; - calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); @@ -567,5 +565,6 @@ const grpc_channel_filter grpc_http_client_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "http-client"}; diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 98a503cafc..a32819bfe4 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -35,29 +35,35 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" -typedef enum { - // Initial metadata not yet seen. - INITIAL_METADATA_UNSEEN = 0, - // Initial metadata seen; compression algorithm set. - HAS_COMPRESSION_ALGORITHM, - // Initial metadata seen; no compression algorithm set. - NO_COMPRESSION_ALGORITHM, -} initial_metadata_state; +#define INITIAL_METADATA_UNSEEN 0 +#define HAS_COMPRESSION_ALGORITHM 2 +#define NO_COMPRESSION_ALGORITHM 4 + +#define CANCELLED_BIT ((gpr_atm)1) typedef struct call_data { - grpc_call_combiner *call_combiner; + grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem stream_compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; grpc_linked_mdelem accept_stream_encoding_storage; + uint32_t remaining_slice_bytes; /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; - initial_metadata_state send_initial_metadata_state; - grpc_error *cancel_error; - grpc_closure start_send_message_batch_in_call_combiner; + + /* Atomic recording the state of initial metadata; allowed values: + INITIAL_METADATA_UNSEEN - initial metadata op not seen + HAS_COMPRESSION_ALGORITHM - initial metadata seen; compression algorithm + set + NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm + set + pointer - a stalled op containing a send_message that's waiting on initial + metadata + pointer | CANCELLED_BIT - request was cancelled with error pointed to */ + gpr_atm send_initial_metadata_state; + grpc_transport_stream_op_batch *send_message_batch; - grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_slice_buffer_stream replacement_stream; grpc_closure *original_send_message_on_complete; grpc_closure send_message_on_complete; @@ -86,13 +92,13 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags, channel_data *channeld = elem->channel_data; if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { - return true; + return 1; } if (has_compression_algorithm) { if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { - return true; + return 1; } - return false; /* we have an actual call-specific algorithm */ + return 0; /* we have an actual call-specific algorithm */ } /* no per-call compression override */ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; @@ -220,18 +226,6 @@ static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_REF(error)); } -static void send_message_batch_continue(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; - // Note: The call to grpc_call_next_op() results in yielding the - // call combiner, so we need to clear calld->send_message_batch - // before we do that. - grpc_transport_stream_op_batch *send_message_batch = - calld->send_message_batch; - calld->send_message_batch = NULL; - grpc_call_next_op(exec_ctx, elem, send_message_batch); -} - static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = (call_data *)elem->call_data; @@ -240,8 +234,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&tmp); uint32_t send_flags = calld->send_message_batch->payload->send_message.send_message->flags; - bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); + const bool did_compress = grpc_msg_compress( + exec_ctx, calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -279,19 +273,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; - send_message_batch_continue(exec_ctx, elem); -} - -static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error) { - call_data *calld = arg; - if (calld->send_message_batch != NULL) { - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error), - calld->call_combiner); - calld->send_message_batch = NULL; - } + grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); } // Pulls a slice from the send_message byte stream and adds it to calld->slices. @@ -311,25 +293,21 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, // If all data has been read, invokes finish_send_message(). Otherwise, // an async call to grpc_byte_stream_next() has been started, which will // eventually result in calling on_send_message_next_done(). -static void continue_reading_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { call_data *calld = (call_data *)elem->call_data; while (grpc_byte_stream_next( exec_ctx, calld->send_message_batch->payload->send_message.send_message, ~(size_t)0, &calld->on_send_message_next_done)) { grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); - if (error != GRPC_ERROR_NONE) { - // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); - GRPC_ERROR_UNREF(error); - return; - } + if (error != GRPC_ERROR_NONE) return error; if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); break; } } + return GRPC_ERROR_NONE; } // Async callback for grpc_byte_stream_next(). @@ -337,37 +315,46 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)arg; call_data *calld = (call_data *)elem->call_data; - if (error != GRPC_ERROR_NONE) { - // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); - return; - } + if (error != GRPC_ERROR_NONE) goto fail; error = pull_slice_from_send_message(exec_ctx, calld); - if (error != GRPC_ERROR_NONE) { - // Closure callback; does not take ownership of error. - fail_send_message_batch_in_call_combiner(exec_ctx, calld, error); - GRPC_ERROR_UNREF(error); - return; - } + if (error != GRPC_ERROR_NONE) goto fail; if (calld->slices.length == calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); } else { - continue_reading_send_message(exec_ctx, elem); + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // this function again. + error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) goto fail; } + return; +fail: + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); } -static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *unused) { - grpc_call_element *elem = (grpc_call_element *)arg; +static void start_send_message_batch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *batch, + bool has_compression_algorithm) { call_data *calld = (call_data *)elem->call_data; - if (skip_compression( - elem, - calld->send_message_batch->payload->send_message.send_message->flags, - calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { - send_message_batch_continue(exec_ctx, elem); + if (!skip_compression(elem, batch->payload->send_message.send_message->flags, + has_compression_algorithm)) { + calld->send_message_batch = batch; + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // on_send_message_next_done(). + grpc_error *error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + } } else { - continue_reading_send_message(exec_ctx, elem); + /* pass control down the stack */ + grpc_call_next_op(exec_ctx, elem, batch); } } @@ -375,80 +362,95 @@ static void compress_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); - // Handle cancel_stream. + if (batch->cancel_stream) { - GRPC_ERROR_UNREF(calld->cancel_error); - calld->cancel_error = - GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - if (calld->send_message_batch != NULL) { - if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { - GRPC_CALL_COMBINER_START( - exec_ctx, calld->call_combiner, - GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); - } else { - grpc_byte_stream_shutdown( - exec_ctx, - calld->send_message_batch->payload->send_message.send_message, - GRPC_ERROR_REF(calld->cancel_error)); - } + // TODO(roth): As part of the upcoming call combiner work, change + // this to call grpc_byte_stream_shutdown() on the incoming byte + // stream, to cancel any in-flight calls to grpc_byte_stream_next(). + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); + gpr_atm cur = gpr_atm_full_xchg( + &calld->send_initial_metadata_state, + CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error); + switch (cur) { + case HAS_COMPRESSION_ALGORITHM: + case NO_COMPRESSION_ALGORITHM: + case INITIAL_METADATA_UNSEEN: + break; + default: + if ((cur & CANCELLED_BIT) == 0) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, (grpc_transport_stream_op_batch *)cur, + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error)); + } else { + GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT)); + } + break; } - } else if (calld->cancel_error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error), - calld->call_combiner); - goto done; } - // Handle send_initial_metadata. + if (batch->send_initial_metadata) { - GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN); bool has_compression_algorithm; grpc_error *error = process_send_initial_metadata( exec_ctx, elem, batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, - calld->call_combiner); - goto done; + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, + error); + return; } - calld->send_initial_metadata_state = has_compression_algorithm - ? HAS_COMPRESSION_ALGORITHM - : NO_COMPRESSION_ALGORITHM; - // If we had previously received a batch containing a send_message op, - // handle it now. Note that we need to re-enter the call combiner - // for this, since we can't send two batches down while holding the - // call combiner, since the connected_channel filter (at the bottom of - // the call stack) will release the call combiner for each batch it sees. - if (calld->send_message_batch != NULL) { - GRPC_CALL_COMBINER_START( - exec_ctx, calld->call_combiner, - &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE, - "starting send_message after send_initial_metadata"); + gpr_atm cur; + retry_send_im: + cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); + GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM && + cur != NO_COMPRESSION_ALGORITHM); + if ((cur & CANCELLED_BIT) == 0) { + if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, + has_compression_algorithm + ? HAS_COMPRESSION_ALGORITHM + : NO_COMPRESSION_ALGORITHM)) { + goto retry_send_im; + } + if (cur != INITIAL_METADATA_UNSEEN) { + start_send_message_batch(exec_ctx, elem, + (grpc_transport_stream_op_batch *)cur, + has_compression_algorithm); + } } } - // Handle send_message. if (batch->send_message) { - GPR_ASSERT(calld->send_message_batch == NULL); - calld->send_message_batch = batch; - // If we have not yet seen send_initial_metadata, then we have to - // wait. We save the batch in calld and then drop the call - // combiner, which we'll have to pick up again later when we get - // send_initial_metadata. - if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) { - GRPC_CALL_COMBINER_STOP( - exec_ctx, calld->call_combiner, - "send_message batch pending send_initial_metadata"); - goto done; + gpr_atm cur; + retry_send: + cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); + switch (cur) { + case INITIAL_METADATA_UNSEEN: + if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, + (gpr_atm)batch)) { + goto retry_send; + } + break; + case HAS_COMPRESSION_ALGORITHM: + case NO_COMPRESSION_ALGORITHM: + start_send_message_batch(exec_ctx, elem, batch, + cur == HAS_COMPRESSION_ALGORITHM); + break; + default: + if (cur & CANCELLED_BIT) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, batch, + GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT))); + } else { + /* >1 send_message concurrently */ + GPR_UNREACHABLE_CODE(break); + } } - start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE); } else { - // Pass control down the stack. + /* pass control down the stack */ grpc_call_next_op(exec_ctx, elem, batch); } -done: + GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); } @@ -456,16 +458,16 @@ done: static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - call_data *calld = (call_data *)elem->call_data; - calld->call_combiner = args->call_combiner; - calld->cancel_error = GRPC_ERROR_NONE; + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + + /* initialize members */ grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner, - start_send_message_batch, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, elem, grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; } @@ -473,9 +475,14 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *ignored) { + /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); - GRPC_ERROR_UNREF(calld->cancel_error); + gpr_atm imstate = + gpr_atm_no_barrier_load(&calld->send_initial_metadata_state); + if (imstate & CANCELLED_BIT) { + GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT)); + } } /* Constructor for channel_data */ @@ -543,5 +550,6 @@ const grpc_channel_filter grpc_message_compress_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, - "message_compress"}; + "compress"}; diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index a10e69ba59..b145f12aff 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -32,8 +32,6 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 typedef struct call_data { - grpc_call_combiner *call_combiner; - grpc_linked_mdelem status; grpc_linked_mdelem content_type; @@ -283,11 +281,7 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; - // Re-enter call combiner for recv_message_ready, since the surface - // code will release the call combiner for each callback it receives. - GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner, - calld->recv_message_ready, GRPC_ERROR_REF(err), - "resuming recv_message_ready from on_complete"); + GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); calld->recv_message_ready = NULL; calld->payload_bin_delivered = true; } @@ -299,20 +293,15 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (calld->seen_path_with_query) { - // Do nothing. This is probably a GET request, and payload will be - // returned in hs_on_complete callback. - // Note that we release the call combiner here, so that other - // callbacks can run. - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "pausing recv_message_ready until on_complete"); + /* do nothing. This is probably a GET request, and payload will be returned + in hs_on_complete callback. */ } else { GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); } } -static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; @@ -334,7 +323,10 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata)); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; + } } if (op->recv_initial_metadata) { @@ -367,25 +359,21 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_error *error = server_filter_outgoing_metadata( exec_ctx, elem, op->payload->send_trailing_metadata.send_trailing_metadata); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; + } } - - return GRPC_ERROR_NONE; } -static void hs_start_transport_stream_op_batch( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - call_data *calld = elem->call_data; - GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0); - grpc_error *error = hs_mutate_op(exec_ctx, elem, op); - if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error, - calld->call_combiner); - } else { - grpc_call_next_op(exec_ctx, elem, op); - } - GPR_TIMER_END("hs_start_transport_stream_op_batch", 0); +static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + GPR_TIMER_BEGIN("hs_start_transport_op", 0); + hs_mutate_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, op); + GPR_TIMER_END("hs_start_transport_op", 0); } /* Constructor for call_data */ @@ -395,7 +383,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem, @@ -427,7 +414,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_stream_op_batch, + hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, @@ -436,5 +423,6 @@ const grpc_channel_filter grpc_http_server_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "http-server"}; diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/load_reporting_filter.c index 17e946937f..08474efb2e 100644 --- a/src/core/ext/filters/load_reporting/load_reporting_filter.c +++ b/src/core/ext/filters/load_reporting/load_reporting_filter.c @@ -223,5 +223,6 @@ const grpc_channel_filter grpc_load_reporting_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "load_reporting"}; diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 16c85a70d0..7d748b9c32 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -391,6 +391,7 @@ const grpc_channel_filter grpc_max_age_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "max_age"}; diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c index 47763b1deb..846c7df69a 100644 --- a/src/core/ext/filters/message_size/message_size_filter.c +++ b/src/core/ext/filters/message_size/message_size_filter.c @@ -68,7 +68,6 @@ static void* message_size_limits_create_from_json(const grpc_json* json) { } typedef struct call_data { - grpc_call_combiner* call_combiner; message_size_limits limits; // Receive closures are chained: we inject this closure as the // recv_message_ready up-call on transport_stream_op, and remember to @@ -132,8 +131,7 @@ static void start_transport_stream_op_batch( exec_ctx, op, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_RESOURCE_EXHAUSTED), - calld->call_combiner); + GRPC_STATUS_RESOURCE_EXHAUSTED)); gpr_free(message_string); return; } @@ -154,7 +152,6 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, const grpc_call_element_args* args) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = NULL; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); @@ -262,6 +259,7 @@ const grpc_channel_filter grpc_message_size_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "message_size"}; diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c index c8b2fe5f99..b4d2cb4b8c 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c @@ -177,6 +177,7 @@ const grpc_channel_filter grpc_workaround_cronet_compression_filter = { 0, init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "workaround_cronet_compression"}; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 2f0ac85152..7541bd5c92 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1370,28 +1370,17 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, "send_initial_metadata_finished"); } } - if (op_payload->send_initial_metadata.peer_string != NULL) { - gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string, - (gpr_atm)gpr_strdup(t->peer_string)); - } } if (op->send_message) { on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { - // Return an error unless the client has already received trailing - // metadata from the server, since an application using a - // streaming call might send another message before getting a - // recv_message failure, breaking out of its loop, and then - // starting recv_trailing_metadata. grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, - t->is_client && s->received_trailing_metadata - ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Attempt to send message after stream was closed", - &s->write_closed_error, 1), + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Attempt to send message after stream was closed", + &s->write_closed_error, 1), "fetching_send_message_finished"); } else { GPR_ASSERT(s->fetching_send_message == NULL); @@ -1477,10 +1466,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, op_payload->recv_initial_metadata.recv_initial_metadata; s->trailing_metadata_available = op_payload->recv_initial_metadata.trailing_metadata_available; - if (op_payload->recv_initial_metadata.peer_string != NULL) { - gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string, - (gpr_atm)gpr_strdup(t->peer_string)); - } grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); } @@ -1839,7 +1824,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } } } - if (s->read_closed && s->frame_storage.length == 0 && !pending_data && + if (s->read_closed && s->frame_storage.length == 0 && + (!pending_data || s->seen_error) && s->recv_trailing_metadata_finished != NULL) { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata); @@ -2946,6 +2932,14 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, } /******************************************************************************* + * INTEGRATION GLUE + */ + +static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { + return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); +} + +/******************************************************************************* * MONITORING */ static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, @@ -2962,6 +2956,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), perform_transport_op, destroy_stream, destroy_transport, + chttp2_get_peer, chttp2_get_endpoint}; grpc_transport *grpc_create_chttp2_transport( diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 9fff30d54f..3c41a8958f 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -509,8 +509,6 @@ struct grpc_chttp2_stream { /** Are we buffering writes on this stream? If yes, we won't become writable until there's enough queued up in the flow_controlled_buffer */ bool write_buffering; - /** Has trailing metadata been received. */ - bool received_trailing_metadata; /** the error that resulted in this stream being read-closed */ grpc_error *read_closed_error; diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 19bd86fd0c..18d163ee98 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -623,7 +623,6 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx, *s->trailing_metadata_available = true; } t->hpack_parser.on_header = on_trailing_header; - s->received_trailing_metadata = true; } else { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing initial_metadata")); t->hpack_parser.on_header = on_initial_header; @@ -632,7 +631,6 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx, case 1: GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing trailing_metadata")); t->hpack_parser.on_header = on_trailing_header; - s->received_trailing_metadata = true; break; case 2: gpr_log(GPR_ERROR, "too many header frames received"); diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 09420d92e7..abb558982b 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -1386,6 +1386,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} +static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { + return NULL; +} + static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { return NULL; @@ -1404,6 +1408,7 @@ static const grpc_transport_vtable grpc_cronet_vtable = { perform_op, destroy_stream, destroy_transport, + get_peer, get_endpoint}; grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index b2d6f2d0c9..6f4b429ee2 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -1251,14 +1251,20 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, // Nothing to do here } +static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { + return gpr_strdup("inproc"); +} + static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return NULL; } static const grpc_transport_vtable inproc_vtable = { - sizeof(inproc_stream), "inproc", init_stream, - set_pollset, set_pollset_set, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport, + sizeof(inproc_stream), "inproc", + init_stream, set_pollset, + set_pollset_set, perform_stream_op, + perform_transport_op, destroy_stream, + destroy_transport, get_peer, get_endpoint}; /******************************************************************************* diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 775c8bc667..0f8e33c4be 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -233,10 +233,15 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { grpc_call_element *next_elem = elem + 1; - GRPC_CALL_LOG_OP(GPR_INFO, next_elem, op); next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op); } +char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + grpc_call_element *next_elem = elem + 1; + return next_elem->filter->get_peer(exec_ctx, next_elem); +} + void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *channel_info) { @@ -260,3 +265,12 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( sizeof(grpc_call_stack))); } + +void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { + grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL); + op->cancel_stream = true; + op->payload->cancel_stream.cancel_error = error; + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); +} diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index ae1cac31f7..a80f8aa826 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -40,7 +40,6 @@ #include <grpc/support/time.h> #include "src/core/lib/debug/trace.h" -#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/support/arena.h" #include "src/core/lib/transport/transport.h" @@ -72,7 +71,6 @@ typedef struct { gpr_timespec start_time; gpr_timespec deadline; gpr_arena *arena; - grpc_call_combiner *call_combiner; } grpc_call_element_args; typedef struct { @@ -152,6 +150,9 @@ typedef struct { void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem); + /* Implement grpc_call_get_peer() */ + char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + /* Implement grpc_channel_get_info() */ void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *channel_info); @@ -270,6 +271,8 @@ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, stack */ void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op); +/* Pass through a request to get_peer to the next child element */ +char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); /* Pass through a request to get_channel_info() to the next child element */ void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -285,6 +288,10 @@ void grpc_call_log_op(char *file, int line, gpr_log_severity severity, grpc_call_element *elem, grpc_transport_stream_op_batch *op); +void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, + grpc_call_element *cur_elem, + grpc_error *error); + extern grpc_tracer_flag grpc_trace_channel; #define GRPC_CALL_LOG_OP(sev, elem, op) \ diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 8285226fc4..af06ca802e 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -36,57 +36,7 @@ typedef struct connected_channel_channel_data { grpc_transport *transport; } channel_data; -typedef struct { - grpc_closure closure; - grpc_closure *original_closure; - grpc_call_combiner *call_combiner; - const char *reason; -} callback_state; - -typedef struct connected_channel_call_data { - grpc_call_combiner *call_combiner; - // Closures used for returning results on the call combiner. - callback_state on_complete[6]; // Max number of pending batches. - callback_state recv_initial_metadata_ready; - callback_state recv_message_ready; -} call_data; - -static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - callback_state *state = (callback_state *)arg; - GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner, - state->original_closure, GRPC_ERROR_REF(error), - state->reason); -} - -static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - run_in_call_combiner(exec_ctx, arg, error); - gpr_free(arg); -} - -static void intercept_callback(call_data *calld, callback_state *state, - bool free_when_done, const char *reason, - grpc_closure **original_closure) { - state->original_closure = *original_closure; - state->call_combiner = calld->call_combiner; - state->reason = reason; - *original_closure = GRPC_CLOSURE_INIT( - &state->closure, - free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner, - state, grpc_schedule_on_exec_ctx); -} - -static callback_state *get_state_for_batch( - call_data *calld, grpc_transport_stream_op_batch *batch) { - if (batch->send_initial_metadata) return &calld->on_complete[0]; - if (batch->send_message) return &calld->on_complete[1]; - if (batch->send_trailing_metadata) return &calld->on_complete[2]; - if (batch->recv_initial_metadata) return &calld->on_complete[3]; - if (batch->recv_message) return &calld->on_complete[4]; - if (batch->recv_trailing_metadata) return &calld->on_complete[5]; - GPR_UNREACHABLE_CODE(return NULL); -} +typedef struct connected_channel_call_data { void *unused; } call_data; /* We perform a small hack to locate transport data alongside the connected channel data in call allocations, to allow everything to be pulled in minimal @@ -99,38 +49,13 @@ static callback_state *get_state_for_batch( into transport stream operations */ static void con_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *batch) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (batch->recv_initial_metadata) { - callback_state *state = &calld->recv_initial_metadata_ready; - intercept_callback( - calld, state, false, "recv_initial_metadata_ready", - &batch->payload->recv_initial_metadata.recv_initial_metadata_ready); - } - if (batch->recv_message) { - callback_state *state = &calld->recv_message_ready; - intercept_callback(calld, state, false, "recv_message_ready", - &batch->payload->recv_message.recv_message_ready); - } - if (batch->cancel_stream) { - // There can be more than one cancellation batch in flight at any - // given time, so we can't just pick out a fixed index into - // calld->on_complete like we can for the other ops. However, - // cancellation isn't in the fast path, so we just allocate a new - // closure for each one. - callback_state *state = (callback_state *)gpr_malloc(sizeof(*state)); - intercept_callback(calld, state, true, "on_complete (cancel_stream)", - &batch->on_complete); - } else { - callback_state *state = get_state_for_batch(calld, batch); - intercept_callback(calld, state, false, "on_complete", &batch->on_complete); - } + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_transport_perform_stream_op(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - batch); - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "passed batch to transport"); + TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } static void con_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -146,7 +71,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - calld->call_combiner = args->call_combiner; int r = grpc_transport_init_stream( exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount, args->server_transport_data, args->arena); @@ -194,6 +118,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } } +static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_transport_get_peer(exec_ctx, chand->transport); +} + /* No-op. */ static void con_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -209,6 +138,7 @@ const grpc_channel_filter grpc_connected_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + con_get_peer, con_get_channel_info, "connected", }; diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c deleted file mode 100644 index 899f98552d..0000000000 --- a/src/core/lib/iomgr/call_combiner.c +++ /dev/null @@ -1,180 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/call_combiner.h" - -#include <grpc/support/log.h> - -grpc_tracer_flag grpc_call_combiner_trace = - GRPC_TRACER_INITIALIZER(false, "call_combiner"); - -static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) { - if (cancel_state & 1) { - return (grpc_error*)(cancel_state & ~(gpr_atm)1); - } - return GRPC_ERROR_NONE; -} - -static gpr_atm encode_cancel_state_error(grpc_error* error) { - return (gpr_atm)1 | (gpr_atm)error; -} - -void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { - gpr_mpscq_init(&call_combiner->queue); -} - -void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { - gpr_mpscq_destroy(&call_combiner->queue); - GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state)); -} - -#ifndef NDEBUG -#define DEBUG_ARGS , const char *file, int line -#define DEBUG_FMT_STR "%s:%d: " -#define DEBUG_FMT_ARGS , file, line -#else -#define DEBUG_ARGS -#define DEBUG_FMT_STR -#define DEBUG_FMT_ARGS -#endif - -void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure, - grpc_error* error DEBUG_ARGS, - const char* reason) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR - "%s] error=%s", - call_combiner, closure DEBUG_FMT_ARGS, reason, - grpc_error_string(error)); - } - size_t prev_size = - (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, - prev_size + 1); - } - if (prev_size == 0) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY"); - } - // Queue was empty, so execute this closure immediately. - GRPC_CLOSURE_SCHED(exec_ctx, closure, error); - } else { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_INFO, " QUEUING"); - } - // Queue was not empty, so add closure to queue. - closure->error_data.error = error; - gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure); - } -} - -void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner DEBUG_ARGS, - const char* reason) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", - call_combiner DEBUG_FMT_ARGS, reason); - } - size_t prev_size = - (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, - prev_size - 1); - } - GPR_ASSERT(prev_size >= 1); - if (prev_size > 1) { - while (true) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " checking queue"); - } - bool empty; - grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end( - &call_combiner->queue, &empty); - if (closure == NULL) { - // This can happen either due to a race condition within the mpscq - // code or because of a race with grpc_call_combiner_start(). - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " queue returned no result; checking again"); - } - continue; - } - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s", - closure, grpc_error_string(closure->error_data.error)); - } - GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error); - break; - } - } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " queue empty"); - } -} - -void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure) { - while (true) { - // Decode original state. - gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); - grpc_error* original_error = decode_cancel_state_error(original_state); - // If error is set, invoke the cancellation closure immediately. - // Otherwise, store the new closure. - if (original_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error)); - break; - } else { - if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, - (gpr_atm)closure)) { - break; - } - } - // cas failed, try again. - } -} - -void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_error* error) { - while (true) { - gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); - grpc_error* original_error = decode_cancel_state_error(original_state); - if (original_error != GRPC_ERROR_NONE) { - GRPC_ERROR_UNREF(error); - break; - } - if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, - encode_cancel_state_error(error))) { - if (original_state != 0) { - grpc_closure* notify_on_cancel = (grpc_closure*)original_state; - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "call_combiner=%p: scheduling notify_on_cancel callback=%p", - call_combiner, notify_on_cancel); - } - GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error)); - } - break; - } - // cas failed, try again. - } -} diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h deleted file mode 100644 index 621e2c3669..0000000000 --- a/src/core/lib/iomgr/call_combiner.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H -#define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H - -#include <stddef.h> - -#include <grpc/support/atm.h> - -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/support/mpscq.h" - -// A simple, lock-free mechanism for serializing activity related to a -// single call. This is similar to a combiner but is more lightweight. -// -// It requires the callback (or, in the common case where the callback -// actually kicks off a chain of callbacks, the last callback in that -// chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP()) -// when it is done with the action that was kicked off by the original -// callback. - -extern grpc_tracer_flag grpc_call_combiner_trace; - -typedef struct { - gpr_atm size; // size_t, num closures in queue or currently executing - gpr_mpscq queue; - // Either 0 (if not cancelled and no cancellation closure set), - // a grpc_closure* (if the lowest bit is 0), - // or a grpc_error* (if the lowest bit is 1). - gpr_atm cancel_state; -} grpc_call_combiner; - -// Assumes memory was initialized to zero. -void grpc_call_combiner_init(grpc_call_combiner* call_combiner); - -void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner); - -#ifndef NDEBUG -#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \ - reason) \ - grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \ - __FILE__, __LINE__, (reason)) -#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \ - grpc_call_combiner_stop((exec_ctx), (call_combiner), __FILE__, __LINE__, \ - (reason)) -/// Starts processing \a closure on \a call_combiner. -void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure, grpc_error* error, - const char* file, int line, const char* reason); -/// Yields the call combiner to the next closure in the queue, if any. -void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - const char* file, int line, const char* reason); -#else -#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \ - reason) \ - grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \ - (reason)) -#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \ - grpc_call_combiner_stop((exec_ctx), (call_combiner), (reason)) -/// Starts processing \a closure on \a call_combiner. -void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure, grpc_error* error, - const char* reason); -/// Yields the call combiner to the next closure in the queue, if any. -void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - const char* reason); -#endif - -/// Tells \a call_combiner to invoke \a closure when -/// grpc_call_combiner_cancel() is called. If grpc_call_combiner_cancel() -/// was previously called, \a closure will be invoked immediately. -/// If \a closure is NULL, then no closure will be invoked on -/// cancellation; this effectively unregisters the previously set closure. -void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure); - -/// Indicates that the call has been cancelled. -void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_error* error); - -#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index e3f0163a6c..531a88434f 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -39,7 +39,6 @@ /* We can have a per-call credentials. */ typedef struct { - grpc_call_combiner *call_combiner; grpc_call_credentials *creds; bool have_host; bool have_method; @@ -50,11 +49,17 @@ typedef struct { pollset_set so that work can progress when this call wants work to progress */ grpc_polling_entity *pollent; + gpr_atm security_context_set; + gpr_mu security_context_mu; grpc_credentials_mdelem_array md_array; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; grpc_auth_metadata_context auth_md_context; - grpc_closure async_cancel_closure; - grpc_closure async_result_closure; + grpc_closure closure; + // Either 0 (no cancellation and no async operation in flight), + // a grpc_closure* (if the lowest bit is 0), + // or a grpc_error* (if the lowest bit is 1). + gpr_atm cancellation_state; + grpc_closure cancel_closure; } call_data; /* We can have a per-channel credentials. */ @@ -63,6 +68,43 @@ typedef struct { grpc_auth_context *auth_context; } channel_data; +static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func, + grpc_error **error) { + // If the lowest bit is 1, the value is a grpc_error*. + // Otherwise, if non-zdero, the value is a grpc_closure*. + if (cancel_state & 1) { + *error = (grpc_error *)(cancel_state & ~(gpr_atm)1); + } else if (cancel_state != 0) { + *func = (grpc_closure *)cancel_state; + } +} + +static gpr_atm encode_cancel_state_error(grpc_error *error) { + // Set the lowest bit to 1 to indicate that it's an error. + return (gpr_atm)1 | (gpr_atm)error; +} + +// Returns an error if the call has been cancelled. Otherwise, sets the +// cancellation function to be called upon cancellation. +static grpc_error *set_cancel_func(grpc_call_element *elem, + grpc_iomgr_cb_func func) { + call_data *calld = (call_data *)elem->call_data; + // Decode original state. + gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state); + grpc_error *original_error = GRPC_ERROR_NONE; + grpc_closure *original_func = NULL; + decode_cancel_state(original_state, &original_func, &original_error); + // If error is set, return it. + if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error); + // Otherwise, store func. + GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem, + grpc_schedule_on_exec_ctx); + GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0); + gpr_atm_rel_store(&calld->cancellation_state, + (gpr_atm)&calld->cancel_closure); + return GRPC_ERROR_NONE; +} + static void reset_auth_metadata_context( grpc_auth_metadata_context *auth_md_context) { if (auth_md_context->service_url != NULL) { @@ -93,7 +135,6 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg; grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; - grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL); reset_auth_metadata_context(&calld->auth_md_context); grpc_error *error = GRPC_ERROR_REF(input_error); if (error == GRPC_ERROR_NONE) { @@ -112,8 +153,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg, } else { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, - calld->call_combiner); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error); } } @@ -183,8 +223,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED), - calld->call_combiner); + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED)); return; } } else { @@ -195,23 +234,22 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, build_auth_metadata_context(&chand->security_connector->base, chand->auth_context, calld); + grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata); + if (cancel_error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, + cancel_error); + return; + } GPR_ASSERT(calld->pollent != NULL); - - GRPC_CLOSURE_INIT(&calld->async_result_closure, on_credentials_metadata, - batch, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch, + grpc_schedule_on_exec_ctx); grpc_error *error = GRPC_ERROR_NONE; if (grpc_call_credentials_get_request_metadata( exec_ctx, calld->creds, calld->pollent, calld->auth_md_context, - &calld->md_array, &calld->async_result_closure, &error)) { + &calld->md_array, &calld->closure, &error)) { // Synchronous return; invoke on_credentials_metadata() directly. on_credentials_metadata(exec_ctx, batch, error); GRPC_ERROR_UNREF(error); - } else { - // Async return; register cancellation closure with call combiner. - GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_get_request_metadata, - elem, grpc_schedule_on_exec_ctx); - grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, - &calld->async_cancel_closure); } } @@ -220,7 +258,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg; grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; - grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL); + if (error == GRPC_ERROR_NONE) { send_security_metadata(exec_ctx, elem, batch); } else { @@ -233,8 +271,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, batch, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAUTHENTICATED), - calld->call_combiner); + GRPC_STATUS_UNAUTHENTICATED)); gpr_free(error_msg); } } @@ -245,7 +282,7 @@ static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg, call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; grpc_channel_security_connector_cancel_check_call_host( - exec_ctx, chand->security_connector, &calld->async_result_closure, + exec_ctx, chand->security_connector, &calld->closure, GRPC_ERROR_REF(error)); } @@ -258,19 +295,52 @@ static void auth_start_transport_stream_op_batch( call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (!batch->cancel_stream) { - GPR_ASSERT(batch->payload->context != NULL); - if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { - batch->payload->context[GRPC_CONTEXT_SECURITY].value = - grpc_client_security_context_create(); - batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = - grpc_client_security_context_destroy; + if (batch->cancel_stream) { + while (true) { + // Decode the original cancellation state. + gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state); + grpc_error *cancel_error = GRPC_ERROR_NONE; + grpc_closure *func = NULL; + decode_cancel_state(original_state, &func, &cancel_error); + // If we had already set a cancellation error, there's nothing + // more to do. + if (cancel_error != GRPC_ERROR_NONE) break; + // If there's a cancel func, call it. + // Note that even if the cancel func has been changed by some + // other thread between when we decoded it and now, it will just + // be a no-op. + cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); + if (func != NULL) { + GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error)); + } + // Encode the new error into cancellation state. + if (gpr_atm_full_cas(&calld->cancellation_state, original_state, + encode_cancel_state_error(cancel_error))) { + break; // Success. + } + // The cas failed, so try again. + } + } else { + /* double checked lock over security context to ensure it's set once */ + if (gpr_atm_acq_load(&calld->security_context_set) == 0) { + gpr_mu_lock(&calld->security_context_mu); + if (gpr_atm_acq_load(&calld->security_context_set) == 0) { + GPR_ASSERT(batch->payload->context != NULL); + if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + batch->payload->context[GRPC_CONTEXT_SECURITY].value = + grpc_client_security_context_create(); + batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = + grpc_client_security_context_destroy; + } + grpc_client_security_context *sec_ctx = + batch->payload->context[GRPC_CONTEXT_SECURITY].value; + GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); + sec_ctx->auth_context = + GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); + gpr_atm_rel_store(&calld->security_context_set, 1); + } + gpr_mu_unlock(&calld->security_context_mu); } - grpc_client_security_context *sec_ctx = - batch->payload->context[GRPC_CONTEXT_SECURITY].value; - GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); - sec_ctx->auth_context = - GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); } if (batch->send_initial_metadata) { @@ -295,25 +365,26 @@ static void auth_start_transport_stream_op_batch( } } if (calld->have_host) { - batch->handler_private.extra_arg = elem; - GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch, - grpc_schedule_on_exec_ctx); - char *call_host = grpc_slice_to_c_string(calld->host); - grpc_error *error = GRPC_ERROR_NONE; - if (grpc_channel_security_connector_check_call_host( - exec_ctx, chand->security_connector, call_host, - chand->auth_context, &calld->async_result_closure, &error)) { - // Synchronous return; invoke on_host_checked() directly. - on_host_checked(exec_ctx, batch, error); - GRPC_ERROR_UNREF(error); + grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host); + if (cancel_error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, + cancel_error); } else { - // Async return; register cancellation closure with call combiner. - GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_check_call_host, - elem, grpc_schedule_on_exec_ctx); - grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, - &calld->async_cancel_closure); + char *call_host = grpc_slice_to_c_string(calld->host); + batch->handler_private.extra_arg = elem; + grpc_error *error = GRPC_ERROR_NONE; + if (grpc_channel_security_connector_check_call_host( + exec_ctx, chand->security_connector, call_host, + chand->auth_context, + GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch, + grpc_schedule_on_exec_ctx), + &error)) { + // Synchronous return; invoke on_host_checked() directly. + on_host_checked(exec_ctx, batch, error); + GRPC_ERROR_UNREF(error); + } + gpr_free(call_host); } - gpr_free(call_host); GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); return; /* early exit */ } @@ -329,7 +400,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = elem->call_data; - calld->call_combiner = args->call_combiner; + memset(calld, 0, sizeof(*calld)); + gpr_mu_init(&calld->security_context_mu); return GRPC_ERROR_NONE; } @@ -354,6 +426,12 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_slice_unref_internal(exec_ctx, calld->method); } reset_auth_metadata_context(&calld->auth_md_context); + gpr_mu_destroy(&calld->security_context_mu); + gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state); + grpc_error *cancel_error = GRPC_ERROR_NONE; + grpc_closure *cancel_func = NULL; + decode_cancel_state(cancel_state, &cancel_func, &cancel_error); + GRPC_ERROR_UNREF(cancel_error); } /* Constructor for channel_data */ @@ -412,5 +490,6 @@ const grpc_channel_filter grpc_client_auth_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"}; diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index b721ce4a22..9bf3f0ca0f 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -26,15 +26,7 @@ #include "src/core/lib/security/transport/auth_filters.h" #include "src/core/lib/slice/slice_internal.h" -typedef enum { - STATE_INIT = 0, - STATE_DONE, - STATE_CANCELLED, -} async_state; - typedef struct call_data { - grpc_call_combiner *call_combiner; - grpc_call_stack *owning_call; grpc_transport_stream_op_batch *recv_initial_metadata_batch; grpc_closure *original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; @@ -42,8 +34,6 @@ typedef struct call_data { const grpc_metadata *consumed_md; size_t num_consumed_md; grpc_auth_context *auth_context; - grpc_closure cancel_closure; - gpr_atm state; // async_state } call_data; typedef struct channel_data { @@ -88,92 +78,54 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx, return GRPC_FILTERED_MDELEM(md); } -static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_metadata *consumed_md, - size_t num_consumed_md, - const grpc_metadata *response_md, - size_t num_response_md, - grpc_error *error) { +/* called from application code */ +static void on_md_processing_done( + void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, + const grpc_metadata *response_md, size_t num_response_md, + grpc_status_code status, const char *error_details) { + grpc_call_element *elem = user_data; call_data *calld = elem->call_data; grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; - grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; /* TODO(jboeuf): Implement support for response_md. */ if (response_md != NULL && num_response_md > 0) { gpr_log(GPR_INFO, "response_md in auth metadata processing not supported for now. " "Ignoring..."); } - if (error == GRPC_ERROR_NONE) { + grpc_error *error = GRPC_ERROR_NONE; + if (status == GRPC_STATUS_OK) { calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; error = grpc_metadata_batch_filter( - exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, + &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); - } - GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready, - error); -} - -// Called from application code. -static void on_md_processing_done( - void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, - const grpc_metadata *response_md, size_t num_response_md, - grpc_status_code status, const char *error_details) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - // If the call was not cancelled while we were in flight, process the result. - if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, - (gpr_atm)STATE_DONE)) { - grpc_error *error = GRPC_ERROR_NONE; - if (status != GRPC_STATUS_OK) { - if (error_details == NULL) { - error_details = "Authentication metadata processing failed."; - } - error = grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status); + } else { + if (error_details == NULL) { + error_details = "Authentication metadata processing failed."; } - on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md, - response_md, num_response_md, error); + error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status); } - // Clean up. for (size_t i = 0; i < calld->md.count; i++) { grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); } grpc_metadata_array_destroy(&calld->md); - GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata"); + GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready, + error); grpc_exec_ctx_finish(&exec_ctx); } -static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = elem->call_data; - // If the result was not already processed, invoke the callback now. - if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, - (gpr_atm)STATE_CANCELLED)) { - on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0, - GRPC_ERROR_REF(error)); - } -} - static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; + grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; if (error == GRPC_ERROR_NONE) { if (chand->creds != NULL && chand->creds->processor.process != NULL) { - // We're calling out to the application, so we need to make sure - // to drop the call combiner early if we get cancelled. - GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem, - grpc_schedule_on_exec_ctx); - grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, - &calld->cancel_closure); - GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata"); calld->md = metadata_batch_to_md_array( batch->payload->recv_initial_metadata.recv_initial_metadata); chand->creds->processor.process( @@ -207,8 +159,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - calld->call_combiner = args->call_combiner; - calld->owning_call = args->call_stack; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); @@ -268,5 +218,6 @@ const grpc_channel_filter grpc_server_auth_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "server-auth"}; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 03eaaf99ac..e68c201134 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -122,7 +122,6 @@ typedef struct batch_control { bool is_closure; } notify_tag; } completion_data; - grpc_closure start_batch; grpc_closure finish_batch; gpr_refcount steps_to_complete; @@ -152,7 +151,6 @@ typedef struct { struct grpc_call { gpr_refcount ext_ref; gpr_arena *arena; - grpc_call_combiner call_combiner; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; @@ -186,11 +184,6 @@ struct grpc_call { Element 0 is initial metadata, element 1 is trailing metadata. */ grpc_metadata_array *buffered_metadata[2]; - grpc_metadata compression_md; - - // A char* indicating the peer name. - gpr_atm peer_string; - /* Packed received call statuses from various sources */ gpr_atm status[STATUS_SOURCE_COUNT]; @@ -269,9 +262,8 @@ grpc_tracer_flag grpc_compression_trace = #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) -static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *op, - grpc_closure *start_batch_closure); +static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op_batch *op); static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description); @@ -336,7 +328,6 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, sizeof(grpc_call) + channel_stack->call_stack_size); gpr_ref_init(&call->ext_ref, 1); call->arena = arena; - grpc_call_combiner_init(&call->call_combiner); *out_call = call; call->channel = args->channel; call->cq = args->cq; @@ -445,8 +436,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, .path = path, .start_time = call->start_time, .deadline = send_deadline, - .arena = call->arena, - .call_combiner = &call->call_combiner}; + .arena = call->arena}; add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, destroy_call, call, &call_args)); if (error != GRPC_ERROR_NONE) { @@ -513,8 +503,6 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; grpc_channel *channel = c->channel; - grpc_call_combiner_destroy(&c->call_combiner); - gpr_free((char *)c->peer_string); grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } @@ -614,37 +602,30 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { return GRPC_CALL_OK; } -// This is called via the call combiner to start sending a batch down -// the filter stack. -static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *ignored) { - grpc_transport_stream_op_batch *batch = arg; - grpc_call *call = batch->handler_private.extra_arg; - GPR_TIMER_BEGIN("execute_batch", 0); - grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); - GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); - elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch); - GPR_TIMER_END("execute_batch", 0); -} - -// start_batch_closure points to a caller-allocated closure to be used -// for entering the call combiner. -static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *batch, - grpc_closure *start_batch_closure) { - batch->handler_private.extra_arg = call; - GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure, - GRPC_ERROR_NONE, "executing batch"); +static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op_batch *op) { + grpc_call_element *elem; + + GPR_TIMER_BEGIN("execute_op", 0); + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); + GPR_TIMER_END("execute_op", 0); } char *grpc_call_get_peer(grpc_call *call) { - char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string); - if (peer_string != NULL) return gpr_strdup(peer_string); - peer_string = grpc_channel_get_target(call->channel); - if (peer_string != NULL) return peer_string; - return gpr_strdup("unknown"); + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + char *result; + GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); + result = elem->filter->get_peer(&exec_ctx, elem); + if (result == NULL) { + result = grpc_channel_get_target(call->channel); + } + if (result == NULL) { + result = gpr_strdup("unknown"); + } + grpc_exec_ctx_finish(&exec_ctx); + return result; } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -671,41 +652,20 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return GRPC_CALL_OK; } -typedef struct { - grpc_call *call; - grpc_closure start_batch; - grpc_closure finish_batch; -} cancel_state; - -// The on_complete callback used when sending a cancel_stream batch down -// the filter stack. Yields the call combiner when the batch is done. -static void done_termination(grpc_exec_ctx *exec_ctx, void *arg, +static void done_termination(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { - cancel_state *state = (cancel_state *)arg; - GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner, - "on_complete for cancel_stream op"); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination"); - gpr_free(state); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination"); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); - // Inform the call combiner of the cancellation, so that it can cancel - // any in-flight asynchronous actions that may be holding the call - // combiner. This ensures that the cancel_stream batch can be sent - // down the filter stack in a timely manner. - grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error)); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state)); - state->call = c; - GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, - grpc_schedule_on_exec_ctx); - grpc_transport_stream_op_batch *op = - grpc_make_transport_stream_op(&state->finish_batch); + grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( + GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx)); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; - execute_batch(exec_ctx, c, op, &state->start_batch); + execute_op(exec_ctx, c, op); } static grpc_error *error_from_status(grpc_status_code status, @@ -1471,18 +1431,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } -// The recv_message_ready callback used when sending a batch containing -// a recv_message op down the filter stack. Yields the call combiner -// before processing the received message. -static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *bctlp, - grpc_error *error) { - batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready"); - receiving_stream_ready(exec_ctx, bctlp, error); -} - static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; @@ -1589,9 +1537,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, batch_control *bctl = bctlp; grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, - "recv_initial_metadata_ready"); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = @@ -1645,8 +1590,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete"); + add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); finish_batch_step(exec_ctx, bctl); } @@ -1666,6 +1610,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; + // sent_initial_metadata guards against variable reuse. + grpc_metadata compression_md; + GPR_TIMER_BEGIN("grpc_call_start_batch", 0); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); @@ -1713,7 +1660,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } /* process compression level */ - memset(&call->compression_md, 0, sizeof(call->compression_md)); + memset(&compression_md, 0, sizeof(compression_md)); size_t additional_metadata_count = 0; grpc_compression_level effective_compression_level = GRPC_COMPRESS_LEVEL_NONE; @@ -1751,9 +1698,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, const grpc_stream_compression_algorithm calgo = stream_compression_algorithm_for_level_locked( call, effective_stream_compression_level); - call->compression_md.key = + compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST; - call->compression_md.value = + compression_md.value = grpc_stream_compression_algorithm_slice(calgo); } else { const grpc_compression_algorithm calgo = @@ -1761,10 +1708,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call, effective_compression_level); /* the following will be picked up by the compress filter and used * as the call's compression algorithm. */ - call->compression_md.key = - GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; - call->compression_md.value = - grpc_compression_algorithm_slice(calgo); + compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; + compression_md.value = grpc_compression_algorithm_slice(calgo); additional_metadata_count++; } } @@ -1779,7 +1724,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, - &call->compression_md, (int)additional_metadata_count)) { + &compression_md, (int)additional_metadata_count)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } @@ -1791,10 +1736,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->send_initial_metadata.send_initial_metadata_flags = op->flags; - if (call->is_client) { - stream_op_payload->send_initial_metadata.peer_string = - &call->peer_string; - } break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1927,10 +1868,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; - if (!call->is_client) { - stream_op_payload->recv_initial_metadata.peer_string = - &call->peer_string; - } num_completion_callbacks_needed++; break; case GRPC_OP_RECV_MESSAGE: @@ -1947,9 +1884,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->recv_message = true; call->receiving_buffer = op->data.recv_message.recv_message; stream_op_payload->recv_message.recv_message = &call->receiving_stream; - GRPC_CLOSURE_INIT(&call->receiving_stream_ready, - receiving_stream_ready_in_call_combiner, bctl, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready, + bctl, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; @@ -2019,7 +1955,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); - execute_batch(exec_ctx, call, stream_op, &bctl->start_batch); + execute_op(exec_ctx, call, stream_op); done: GPR_TIMER_END("grpc_call_start_batch", 0); diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 280315036f..898476daee 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -31,7 +31,6 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" -#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" @@ -130,7 +129,6 @@ void grpc_init(void) { grpc_register_tracer(&grpc_trace_channel_stack_builder); grpc_register_tracer(&grpc_http1_trace); grpc_register_tracer(&grpc_cq_pluck_trace); // default on - grpc_register_tracer(&grpc_call_combiner_trace); grpc_register_tracer(&grpc_combiner_trace); grpc_register_tracer(&grpc_server_channel_trace); grpc_register_tracer(&grpc_bdp_estimator_trace); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 6286f9159d..a0791080a9 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -40,7 +40,6 @@ namespace grpc_core { namespace { struct CallData { - grpc_call_combiner *call_combiner; grpc_linked_mdelem status; grpc_linked_mdelem details; grpc_core::atomic<bool> filled_metadata; @@ -53,14 +52,14 @@ struct ChannelData { static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *mdb) { - CallData *calld = reinterpret_cast<CallData *>(elem->call_data); + CallData *calld = static_cast<CallData *>(elem->call_data); bool expected = false; if (!calld->filled_metadata.compare_exchange_strong( expected, true, grpc_core::memory_order_relaxed, grpc_core::memory_order_relaxed)) { return; } - ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data); + ChannelData *chand = static_cast<ChannelData *>(elem->channel_data); char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(chand->error_code, tmp); calld->status.md = grpc_mdelem_from_slices( @@ -80,7 +79,6 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void lame_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { - CallData *calld = reinterpret_cast<CallData *>(elem->call_data); if (op->recv_initial_metadata) { fill_metadata(exec_ctx, elem, op->payload->recv_initial_metadata.recv_initial_metadata); @@ -89,8 +87,12 @@ static void lame_start_transport_stream_op_batch( op->payload->recv_trailing_metadata.recv_trailing_metadata); } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"), - calld->call_combiner); + exec_ctx, op, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); +} + +static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + return NULL; } static void lame_get_channel_info(grpc_exec_ctx *exec_ctx, @@ -120,8 +122,6 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - CallData *calld = reinterpret_cast<CallData *>(elem->call_data); - calld->call_combiner = args->call_combiner; return GRPC_ERROR_NONE; } @@ -156,6 +156,7 @@ extern "C" const grpc_channel_filter grpc_lame_filter = { sizeof(grpc_core::ChannelData), grpc_core::init_channel_elem, grpc_core::destroy_channel_elem, + grpc_core::lame_get_peer, grpc_core::lame_get_channel_info, "lame-client", }; @@ -175,7 +176,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, "error_message=%s)", 3, (target, (int)error_code, error_message)); GPR_ASSERT(elem->filter == &grpc_lame_filter); - auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data); + auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data); chand->error_code = error_code; chand->error_message = error_message; grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 8582d826ca..66dcc299aa 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -789,6 +789,7 @@ static void server_mutate_op(grpc_call_element *elem, static void server_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); } @@ -961,6 +962,7 @@ const grpc_channel_filter grpc_server_top_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "server", }; diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index fb03a10315..08f61629a9 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -85,6 +85,7 @@ static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx, static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer); GRPC_ERROR_UNREF(stream->shutdown_error); } diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 1e1e8310b8..be2a35213e 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -81,7 +81,9 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, // grpc_slice_buffer_stream // -// A grpc_byte_stream that wraps a slice buffer. +// A grpc_byte_stream that wraps a slice buffer. The stream takes +// ownership of the slices in the buffer, and on destruction will +// reset the contents of the buffer. typedef struct grpc_slice_buffer_stream { grpc_byte_stream base; diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 650b0559aa..6c61f4b8d9 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -197,6 +197,11 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, then_schedule_closure); } +char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, + grpc_transport *transport) { + return transport->vtable->get_peer(exec_ctx, transport); +} + grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *transport) { return transport->vtable->get_endpoint(exec_ctx, transport); @@ -209,24 +214,24 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, // is a function that must always unref cancel_error // though it lives in lib, it handles transport stream ops sure // it's grpc_transport_stream_op_batch_finish_with_failure + void grpc_transport_stream_op_batch_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch, - grpc_error *error, grpc_call_combiner *call_combiner) { + grpc_error *error) { if (batch->send_message) { grpc_byte_stream_destroy(exec_ctx, batch->payload->send_message.send_message); } if (batch->recv_message) { - GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, - batch->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error), - "failing recv_message_ready"); + GRPC_CLOSURE_SCHED(exec_ctx, + batch->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error)); } if (batch->recv_initial_metadata) { - GRPC_CALL_COMBINER_START( - exec_ctx, call_combiner, + GRPC_CLOSURE_SCHED( + exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata_ready, - GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); + GRPC_ERROR_REF(error)); } GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error); if (batch->cancel_stream) { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index fbf5dcb8b5..099138ea14 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -22,7 +22,6 @@ #include <stddef.h> #include "src/core/lib/channel/context.h" -#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/pollset.h" @@ -153,9 +152,6 @@ struct grpc_transport_stream_op_batch_payload { /** Iff send_initial_metadata != NULL, flags associated with send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */ uint32_t send_initial_metadata_flags; - // If non-NULL, will be set by the transport to the peer string - // (a char*, which the caller takes ownership of). - gpr_atm *peer_string; } send_initial_metadata; struct { @@ -180,9 +176,6 @@ struct grpc_transport_stream_op_batch_payload { // immediately available. This may be a signal that we received a // Trailers-Only response. bool *trailing_metadata_available; - // If non-NULL, will be set by the transport to the peer string - // (a char*, which the caller takes ownership of). - gpr_atm *peer_string; } recv_initial_metadata; struct { @@ -300,7 +293,7 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_batch_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, - grpc_error *error, grpc_call_combiner *call_combiner); + grpc_error *error); char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op); char *grpc_transport_op_string(grpc_transport_op *op); @@ -339,6 +332,10 @@ void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); +/* Get the transports peer */ +char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, + grpc_transport *transport); + /* Get the endpoint used by \a transport */ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *transport); diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index bbae69c223..fc772c6dd1 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -59,6 +59,9 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); + /* implementation of grpc_transport_get_peer */ + char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self); + /* implementation of grpc_transport_get_endpoint */ grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self); } grpc_transport_vtable; diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index 409a6c4103..7b18229ba6 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -112,13 +112,6 @@ char *grpc_transport_stream_op_batch_string( gpr_strvec_add(&b, tmp); } - if (op->collect_stats) { - gpr_strvec_add(&b, gpr_strdup(" ")); - gpr_asprintf(&tmp, "COLLECT_STATS:%p", - op->payload->collect_stats.collect_stats); - gpr_strvec_add(&b, tmp); - } - out = gpr_strvec_flatten(&b, NULL); gpr_strvec_destroy(&b); |