From bf19961d0a49b43cb528392efeb4880eeebb9b5e Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 29 Aug 2017 16:59:07 -0700 Subject: Revert "Implement call combiner" --- src/core/lib/surface/call.c | 148 +++++++++++++------------------------------- 1 file changed, 42 insertions(+), 106 deletions(-) (limited to 'src/core/lib/surface/call.c') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index d1589da74f..62d542d2c5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -121,7 +121,6 @@ typedef struct batch_control { bool is_closure; } notify_tag; } completion_data; - grpc_closure start_batch; grpc_closure finish_batch; gpr_refcount steps_to_complete; @@ -151,7 +150,6 @@ typedef struct { struct grpc_call { gpr_refcount ext_ref; gpr_arena *arena; - grpc_call_combiner call_combiner; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; @@ -185,11 +183,6 @@ struct grpc_call { Element 0 is initial metadata, element 1 is trailing metadata. */ grpc_metadata_array *buffered_metadata[2]; - grpc_metadata compression_md; - - // A char* indicating the peer name. - gpr_atm peer_string; - /* Packed received call statuses from various sources */ gpr_atm status[STATUS_SOURCE_COUNT]; @@ -268,9 +261,8 @@ grpc_tracer_flag grpc_compression_trace = #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) -static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *op, - grpc_closure *start_batch_closure); +static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op_batch *op); static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description); @@ -335,7 +327,6 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, sizeof(grpc_call) + channel_stack->call_stack_size); gpr_ref_init(&call->ext_ref, 1); call->arena = arena; - grpc_call_combiner_init(&call->call_combiner); *out_call = call; call->channel = args->channel; call->cq = args->cq; @@ -439,8 +430,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, .path = path, .start_time = call->start_time, .deadline = send_deadline, - .arena = call->arena, - .call_combiner = &call->call_combiner}; + .arena = call->arena}; add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, destroy_call, call, &call_args)); if (error != GRPC_ERROR_NONE) { @@ -507,8 +497,6 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; grpc_channel *channel = c->channel; - grpc_call_combiner_destroy(&c->call_combiner); - gpr_free((char *)c->peer_string); grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } @@ -608,37 +596,30 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { return GRPC_CALL_OK; } -// This is called via the call combiner to start sending a batch down -// the filter stack. -static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *ignored) { - grpc_transport_stream_op_batch *batch = arg; - grpc_call *call = batch->handler_private.extra_arg; - GPR_TIMER_BEGIN("execute_batch", 0); - grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); - GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); - elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch); - GPR_TIMER_END("execute_batch", 0); -} - -// start_batch_closure points to a caller-allocated closure to be used -// for entering the call combiner. -static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *batch, - grpc_closure *start_batch_closure) { - batch->handler_private.extra_arg = call; - GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure, - GRPC_ERROR_NONE, "executing batch"); +static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op_batch *op) { + grpc_call_element *elem; + + GPR_TIMER_BEGIN("execute_op", 0); + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); + GPR_TIMER_END("execute_op", 0); } char *grpc_call_get_peer(grpc_call *call) { - char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string); - if (peer_string != NULL) return gpr_strdup(peer_string); - peer_string = grpc_channel_get_target(call->channel); - if (peer_string != NULL) return peer_string; - return gpr_strdup("unknown"); + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + char *result; + GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); + result = elem->filter->get_peer(&exec_ctx, elem); + if (result == NULL) { + result = grpc_channel_get_target(call->channel); + } + if (result == NULL) { + result = gpr_strdup("unknown"); + } + grpc_exec_ctx_finish(&exec_ctx); + return result; } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -665,41 +646,20 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return GRPC_CALL_OK; } -typedef struct { - grpc_call *call; - grpc_closure start_batch; - grpc_closure finish_batch; -} cancel_state; - -// The on_complete callback used when sending a cancel_stream batch down -// the filter stack. Yields the call combiner when the batch is done. -static void done_termination(grpc_exec_ctx *exec_ctx, void *arg, +static void done_termination(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { - cancel_state *state = (cancel_state *)arg; - GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner, - "on_complete for cancel_stream op"); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination"); - gpr_free(state); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination"); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); - // Inform the call combiner of the cancellation, so that it can cancel - // any in-flight asynchronous actions that may be holding the call - // combiner. This ensures that the cancel_stream batch can be sent - // down the filter stack in a timely manner. - grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error)); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state)); - state->call = c; - GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, - grpc_schedule_on_exec_ctx); - grpc_transport_stream_op_batch *op = - grpc_make_transport_stream_op(&state->finish_batch); + grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( + GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx)); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; - execute_batch(exec_ctx, c, op, &state->start_batch); + execute_op(exec_ctx, c, op); } static grpc_error *error_from_status(grpc_status_code status, @@ -1465,18 +1425,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } -// The recv_message_ready callback used when sending a batch containing -// a recv_message op down the filter stack. Yields the call combiner -// before processing the received message. -static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *bctlp, - grpc_error *error) { - batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready"); - receiving_stream_ready(exec_ctx, bctlp, error); -} - static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; @@ -1583,9 +1531,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, batch_control *bctl = bctlp; grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, - "recv_initial_metadata_ready"); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = @@ -1639,8 +1584,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete"); + add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); finish_batch_step(exec_ctx, bctl); } @@ -1660,6 +1604,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; + // sent_initial_metadata guards against variable reuse. + grpc_metadata compression_md; + GPR_TIMER_BEGIN("grpc_call_start_batch", 0); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); @@ -1707,7 +1654,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } /* process compression level */ - memset(&call->compression_md, 0, sizeof(call->compression_md)); + memset(&compression_md, 0, sizeof(compression_md)); size_t additional_metadata_count = 0; grpc_compression_level effective_compression_level = GRPC_COMPRESS_LEVEL_NONE; @@ -1745,9 +1692,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, const grpc_stream_compression_algorithm calgo = stream_compression_algorithm_for_level_locked( call, effective_stream_compression_level); - call->compression_md.key = + compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST; - call->compression_md.value = + compression_md.value = grpc_stream_compression_algorithm_slice(calgo); } else { const grpc_compression_algorithm calgo = @@ -1755,10 +1702,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call, effective_compression_level); /* the following will be picked up by the compress filter and used * as the call's compression algorithm. */ - call->compression_md.key = - GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; - call->compression_md.value = - grpc_compression_algorithm_slice(calgo); + compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; + compression_md.value = grpc_compression_algorithm_slice(calgo); additional_metadata_count++; } } @@ -1773,7 +1718,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, - &call->compression_md, (int)additional_metadata_count)) { + &compression_md, (int)additional_metadata_count)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } @@ -1785,10 +1730,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->send_initial_metadata.send_initial_metadata_flags = op->flags; - if (call->is_client) { - stream_op_payload->send_initial_metadata.peer_string = - &call->peer_string; - } break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1921,10 +1862,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; - if (!call->is_client) { - stream_op_payload->recv_initial_metadata.peer_string = - &call->peer_string; - } num_completion_callbacks_needed++; break; case GRPC_OP_RECV_MESSAGE: @@ -1941,9 +1878,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->recv_message = true; call->receiving_buffer = op->data.recv_message.recv_message; stream_op_payload->recv_message.recv_message = &call->receiving_stream; - GRPC_CLOSURE_INIT(&call->receiving_stream_ready, - receiving_stream_ready_in_call_combiner, bctl, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready, + bctl, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; @@ -2013,7 +1949,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); - execute_batch(exec_ctx, call, stream_op, &bctl->start_batch); + execute_op(exec_ctx, call, stream_op); done: GPR_TIMER_END("grpc_call_start_batch", 0); -- cgit v1.2.3