diff options
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r-- | src/core/lib/surface/call.c | 129 |
1 files changed, 70 insertions, 59 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 8ca3cab9d5..899e8fab3f 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -49,6 +49,7 @@ #include "src/core/lib/compression/algorithm_metadata.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" @@ -226,12 +227,12 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error); -grpc_error *grpc_call_create(const grpc_call_create_args *args, +grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, + const grpc_call_create_args *args, grpc_call **out_call) { size_t i, j; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(args->channel); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); @@ -315,14 +316,14 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ grpc_error *error = grpc_call_stack_init( - &exec_ctx, channel_stack, 1, destroy_call, call, call->context, + exec_ctx, channel_stack, 1, destroy_call, call, call->context, args->server_transport_data, path, call->start_time, send_deadline, CALL_STACK_FROM_CALL(call)); if (error != GRPC_ERROR_NONE) { grpc_status_code status; const char *error_str; grpc_error_get_status(error, &status, &error_str); - close_with_status(&exec_ctx, call, status, error_str); + close_with_status(exec_ctx, call, status, error_str); } if (args->cq != NULL) { GPR_ASSERT( @@ -338,12 +339,11 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, } if (!grpc_polling_entity_is_empty(&call->pollent)) { grpc_call_stack_set_pollset_or_pollset_set( - &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); + exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } - if (path != NULL) GRPC_MDSTR_UNREF(path); + if (path != NULL) GRPC_MDSTR_UNREF(exec_ctx, path); - grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_create", 0); return error; } @@ -404,7 +404,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_BEGIN("destroy_call", 0); for (i = 0; i < 2; i++) { grpc_metadata_batch_destroy( - &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); + exec_ctx, &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); } if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); @@ -412,11 +412,11 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, gpr_mu_destroy(&c->mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { - GRPC_MDSTR_UNREF(c->status[i].details); + GRPC_MDSTR_UNREF(exec_ctx, c->status[i].details); } } for (ii = 0; ii < c->send_extra_metadata_count; ii++) { - GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md); + GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } for (i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (c->context[i].destroy) { @@ -446,22 +446,22 @@ static void set_status_code(grpc_call *call, status_source source, call->status[source].code = (grpc_status_code)status; } -static void set_status_details(grpc_call *call, status_source source, - grpc_mdstr *status) { +static void set_status_details(grpc_exec_ctx *exec_ctx, grpc_call *call, + status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { - GRPC_MDSTR_UNREF(status); + GRPC_MDSTR_UNREF(exec_ctx, status); } else { call->status[source].details = status; } } -static void set_status_from_error(grpc_call *call, status_source source, - grpc_error *error) { +static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, + status_source source, grpc_error *error) { grpc_status_code status; const char *msg; grpc_error_get_status(error, &status, &msg); set_status_code(call, source, (uint32_t)status); - set_status_details(call, source, grpc_mdstr_from_string(msg)); + set_status_details(exec_ctx, call, source, grpc_mdstr_from_string(msg)); } static void set_incoming_compression_algorithm( @@ -495,7 +495,8 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { static void destroy_encodings_accepted_by_peer(void *p) { return; } -static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { +static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, + grpc_call *call, grpc_mdelem *mdel) { size_t i; grpc_compression_algorithm algorithm; grpc_slice_buffer accept_encoding_parts; @@ -535,7 +536,7 @@ static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { } } - grpc_slice_buffer_destroy(&accept_encoding_parts); + grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts); grpc_mdelem_set_user_data( mdel, destroy_encodings_accepted_by_peer, @@ -593,12 +594,10 @@ static grpc_metadata *get_md_elem(grpc_metadata *metadata, return res; } -static int prepare_application_metadata(grpc_call *call, int count, - grpc_metadata *metadata, - int is_trailing, - int prepend_extra_metadata, - grpc_metadata *additional_metadata, - int additional_metadata_count) { +static int prepare_application_metadata( + grpc_exec_ctx *exec_ctx, grpc_call *call, int count, + grpc_metadata *metadata, int is_trailing, int prepend_extra_metadata, + grpc_metadata *additional_metadata, int additional_metadata_count) { int total_count = count + additional_metadata_count; int i; grpc_metadata_batch *batch = @@ -609,7 +608,7 @@ static int prepare_application_metadata(grpc_call *call, int count, grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); l->md = grpc_mdelem_from_string_and_buffer( - md->key, (const uint8_t *)md->value, md->value_length); + exec_ctx, md->key, (const uint8_t *)md->value, md->value_length); if (!grpc_header_key_is_legal(grpc_mdstr_as_c_string(l->md->key), GRPC_MDSTR_LENGTH(l->md->key))) { gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", @@ -629,7 +628,7 @@ static int prepare_application_metadata(grpc_call *call, int count, const grpc_metadata *md = get_md_elem(metadata, additional_metadata, j, count); grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; - GRPC_MDELEM_UNREF(l->md); + GRPC_MDELEM_UNREF(exec_ctx, l->md); } return 0; } @@ -794,7 +793,8 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { memset(&tc->op, 0, sizeof(tc->op)); tc->op.cancel_error = tc->error; /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc); + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); tc->op.on_complete = &tc->closure; execute_op(exec_ctx, tc->call, &tc->op); } @@ -804,23 +804,25 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { memset(&tc->op, 0, sizeof(tc->op)); tc->op.close_error = tc->error; /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc); + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); tc->op.on_complete = &tc->closure; execute_op(exec_ctx, tc->call, &tc->op); } static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, termination_closure *tc) { - set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error); + set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE, + tc->error); if (tc->type == TC_CANCEL) { - grpc_closure_init(&tc->closure, send_cancel, tc); + grpc_closure_init(&tc->closure, send_cancel, tc, grpc_schedule_on_exec_ctx); GRPC_CALL_INTERNAL_REF(tc->call, "cancel"); } else if (tc->type == TC_CLOSE) { - grpc_closure_init(&tc->closure, send_close, tc); + grpc_closure_init(&tc->closure, send_close, tc, grpc_schedule_on_exec_ctx); GRPC_CALL_INTERNAL_REF(tc->call, "close"); } - grpc_exec_ctx_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); return GRPC_CALL_OK; } @@ -928,7 +930,8 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem *md) { return algorithm; } -static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) { +static grpc_mdelem *recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_mdelem *elem) { if (elem->key == GRPC_MDSTR_GRPC_STATUS) { GPR_TIMER_BEGIN("status", 0); set_status_code(call, STATUS_FROM_WIRE, decode_status(elem)); @@ -936,7 +939,8 @@ static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) { return NULL; } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) { GPR_TIMER_BEGIN("status-details", 0); - set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value)); + set_status_details(exec_ctx, call, STATUS_FROM_WIRE, + GRPC_MDSTR_REF(elem->value)); GPR_TIMER_END("status-details", 0); return NULL; } @@ -962,9 +966,10 @@ static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem, return elem; } -static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { - grpc_call *call = callp; - elem = recv_common_filter(call, elem); +static grpc_mdelem *recv_initial_filter(grpc_exec_ctx *exec_ctx, void *args, + grpc_mdelem *elem) { + grpc_call *call = args; + elem = recv_common_filter(exec_ctx, call, elem); if (elem == NULL) { return NULL; } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { @@ -974,7 +979,7 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { return NULL; } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); - set_encodings_accepted_by_peer(call, elem); + set_encodings_accepted_by_peer(exec_ctx, call, elem); GPR_TIMER_END("encodings_accepted_by_peer", 0); return NULL; } else { @@ -982,9 +987,10 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { } } -static grpc_mdelem *recv_trailing_filter(void *callp, grpc_mdelem *elem) { - grpc_call *call = callp; - elem = recv_common_filter(call, elem); +static grpc_mdelem *recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, + grpc_mdelem *elem) { + grpc_call *call = args; + elem = recv_common_filter(exec_ctx, call, elem); if (elem == NULL) { return NULL; } else { @@ -1138,8 +1144,8 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, } else { *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, - bctl); + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl, + grpc_schedule_on_exec_ctx); continue_receiving_slices(exec_ctx, bctl); } } @@ -1234,7 +1240,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - grpc_metadata_batch_filter(md, recv_initial_filter, call); + grpc_metadata_batch_filter(exec_ctx, md, recv_initial_filter, call); GPR_TIMER_BEGIN("validate_filtered_metadata", 0); validate_filtered_metadata(exec_ctx, bctl); @@ -1251,9 +1257,10 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, call->has_initial_md_been_received = true; if (call->saved_receiving_stream_ready_bctlp != NULL) { grpc_closure *saved_rsr_closure = grpc_closure_create( - receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); + receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, + grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL); + grpc_closure_sched(exec_ctx, saved_rsr_closure, error); } gpr_mu_unlock(&call->mu); @@ -1278,14 +1285,15 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, intptr_t status; if (error != GRPC_ERROR_NONE && grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { - set_status_from_error(call, STATUS_FROM_CORE, error); + set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error); } if (bctl->send_initial_metadata) { if (error != GRPC_ERROR_NONE) { - set_status_from_error(call, STATUS_FROM_CORE, error); + set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error); } grpc_metadata_batch_destroy( + exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->send_message) { @@ -1293,12 +1301,13 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, } if (bctl->send_final_op) { grpc_metadata_batch_destroy( + exec_ctx, &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->recv_final_op) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - grpc_metadata_batch_filter(md, recv_trailing_filter, call); + grpc_metadata_batch_filter(exec_ctx, md, recv_trailing_filter, call); call->received_final_op = true; /* propagate cancellation to any interested children */ @@ -1435,7 +1444,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->send_initial_metadata = 1; call->sent_initial_metadata = 1; if (!prepare_application_metadata( - call, (int)op->data.send_initial_metadata.count, + exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, &compression_md, (int)additional_metadata_count)) { error = GRPC_CALL_ERROR_INVALID_METADATA; @@ -1515,15 +1524,15 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->sent_final_op = 1; call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( - call->channel, op->data.send_status_from_server.status); + exec_ctx, call->channel, op->data.send_status_from_server.status); if (op->data.send_status_from_server.status_details != NULL) { call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_GRPC_MESSAGE, + exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_string( op->data.send_status_from_server.status_details)); call->send_extra_metadata_count++; set_status_details( - call, STATUS_FROM_API_OVERRIDE, + exec_ctx, call, STATUS_FROM_API_OVERRIDE, GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value)); } if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { @@ -1531,7 +1540,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, (uint32_t)op->data.send_status_from_server.status); } if (!prepare_application_metadata( - call, + exec_ctx, call, (int)op->data.send_status_from_server.trailing_metadata_count, op->data.send_status_from_server.trailing_metadata, 1, 1, NULL, 0)) { @@ -1558,7 +1567,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, - receiving_initial_metadata_ready, bctl); + receiving_initial_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); bctl->recv_initial_metadata = 1; stream_op->recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1581,7 +1591,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->receiving_buffer = op->data.recv_message; stream_op->recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, - bctl); + bctl, grpc_schedule_on_exec_ctx); stream_op->recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; @@ -1646,7 +1656,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); stream_op->context = call->context; - grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); + grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; gpr_mu_unlock(&call->mu); @@ -1660,7 +1671,7 @@ done_with_error: /* reverse any mutations that occured */ if (bctl->send_initial_metadata) { call->sent_initial_metadata = 0; - grpc_metadata_batch_clear(&call->metadata_batch[0][0]); + grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]); } if (bctl->send_message) { call->sending_message = 0; @@ -1668,7 +1679,7 @@ done_with_error: } if (bctl->send_final_op) { call->sent_final_op = 0; - grpc_metadata_batch_clear(&call->metadata_batch[0][1]); + grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]); } if (bctl->recv_initial_metadata) { call->received_initial_metadata = 0; |