diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 380 |
1 files changed, 235 insertions, 145 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 4168c2ef0c..0b917f1561 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -163,8 +163,6 @@ struct grpc_call { gpr_uint8 bound_pollset; /* is an error status set */ gpr_uint8 error_status_set; - /** should the alarm be cancelled */ - gpr_uint8 cancel_alarm; /** bitmask of allocated completion events in completions */ gpr_uint8 allocated_completions; /** flag indicating that cancellation is inherited */ @@ -182,15 +180,15 @@ struct grpc_call { request_set[op] is an integer specifying a set of operations to which the request belongs: - - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending - completion, and the integer represents to which group of operations - the ioreq belongs. Each group is represented by one master, and the - integer in request_set is an index into masters to find the master - data. - - if it is REQSET_EMPTY, the ioreq op is inactive and available to be - started - - finally, if request_set[op] is REQSET_DONE, then the operation is - complete and unavailable to be started again + - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending + completion, and the integer represents to which group of operations + the ioreq belongs. Each group is represented by one master, and the + integer in request_set is an index into masters to find the master + data. + - if it is REQSET_EMPTY, the ioreq op is inactive and available to be + started + - finally, if request_set[op] is REQSET_DONE, then the operation is + complete and unavailable to be started again request_data[op] is the request data as supplied by the initiator of a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT. @@ -256,10 +254,10 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; gpr_uint32 incoming_message_flags; - grpc_iomgr_closure destroy_closure; - grpc_iomgr_closure on_done_recv; - grpc_iomgr_closure on_done_send; - grpc_iomgr_closure on_done_bind; + grpc_closure destroy_closure; + grpc_closure on_done_recv; + grpc_closure on_done_send; + grpc_closure on_done_bind; /** completion events - for completion queue use */ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; @@ -278,19 +276,22 @@ struct grpc_call { #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) -static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); -static void call_on_done_recv(void *call, int success); -static void call_on_done_send(void *call, int success); +static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, + gpr_timespec deadline); +static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *call, int success); +static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *call, int success); static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op); -static void execute_op(grpc_call *call, grpc_transport_stream_op *op); -static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); +static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op *op); +static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, const char *description); -static void finished_loose_op(void *call, int success); +static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, int success); static void lock(grpc_call *call); -static void unlock(grpc_call *call); +static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call); grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, @@ -303,6 +304,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, grpc_transport_stream_op initial_op; grpc_transport_stream_op *initial_op_ptr = NULL; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); @@ -333,9 +335,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, grpc_sopb_init(&call->send_ops); grpc_sopb_init(&call->recv_ops); gpr_slice_buffer_init(&call->incoming_message); - grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call); - grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call); - grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call); + grpc_closure_init(&call->on_done_recv, call_on_done_recv, call); + grpc_closure_init(&call->on_done_send, call_on_done_send, call); + grpc_closure_init(&call->on_done_bind, finished_loose_op, call); /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */ gpr_ref_init(&call->internal_refcount, 2); /* server hack: start reads immediately so we can get initial metadata. @@ -350,8 +352,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, GRPC_CALL_INTERNAL_REF(call, "receiving"); initial_op_ptr = &initial_op; } - grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, - CALL_STACK_FROM_CALL(call)); + grpc_call_stack_init(&exec_ctx, channel_stack, server_transport_data, + initial_op_ptr, CALL_STACK_FROM_CALL(call)); if (parent_call != NULL) { GRPC_CALL_INTERNAL_REF(parent_call, "child"); GPR_ASSERT(call->is_client); @@ -395,19 +397,20 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, } if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) { - set_deadline_alarm(call, send_deadline); + set_deadline_alarm(&exec_ctx, call, send_deadline); } + grpc_exec_ctx_finish(&exec_ctx); return call; } -void grpc_call_set_completion_queue(grpc_call *call, +void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_completion_queue *cq) { lock(call); call->cq = cq; if (cq) { GRPC_CQ_INTERNAL_REF(cq, "bind"); } - unlock(call); + unlock(exec_ctx, call); } grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { @@ -429,13 +432,14 @@ static grpc_cq_completion *allocate_completion(grpc_call *call) { abort(); } -static void done_completion(void *call, grpc_cq_completion *completion) { +static void done_completion(grpc_exec_ctx *exec_ctx, void *call, + grpc_cq_completion *completion) { grpc_call *c = call; gpr_mu_lock(&c->completion_mu); c->allocated_completions &= (gpr_uint8) ~(1u << (completion - c->completions)); gpr_mu_unlock(&c->completion_mu); - GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, c, "completion"); } #ifdef GRPC_CALL_REF_COUNT_DEBUG @@ -448,11 +452,11 @@ void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } -static void destroy_call(void *call, int ignored_success) { +static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) { size_t i; grpc_call *c = call; - grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); - GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); gpr_mu_destroy(&c->mu); gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { @@ -486,21 +490,15 @@ static void destroy_call(void *call, int ignored_success) { } #ifdef GRPC_CALL_REF_COUNT_DEBUG -void grpc_call_internal_unref(grpc_call *c, const char *reason, - int allow_immediate_deletion) { +void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c, + const char *reason) { gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c, c->internal_refcount.count, c->internal_refcount.count - 1, reason); #else -void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { +void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) { #endif if (gpr_unref(&c->internal_refcount)) { - if (allow_immediate_deletion) { - destroy_call(c, 1); - } else { - c->destroy_closure.cb = destroy_call; - c->destroy_closure.cb_arg = c; - grpc_iomgr_add_callback(&c->destroy_closure); - } + destroy_call(exec_ctx, c); } } @@ -599,7 +597,7 @@ static int need_more_data(grpc_call *call) { (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called; } -static void unlock(grpc_call *call) { +static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) { grpc_transport_stream_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; @@ -607,7 +605,6 @@ static void unlock(grpc_call *call) { int i; const size_t MAX_RECV_PEEK_AHEAD = 65536; size_t buffered_bytes; - int cancel_alarm = 0; memset(&op, 0, sizeof(op)); @@ -615,9 +612,6 @@ static void unlock(grpc_call *call) { start_op = op.cancel_with_status != GRPC_STATUS_OK; call->cancel_with_status = GRPC_STATUS_OK; /* reset */ - cancel_alarm = call->cancel_alarm; - call->cancel_alarm = 0; - if (!call->receiving && need_more_data(call)) { if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { op.max_recv_bytes = call->incoming_message_length - @@ -667,23 +661,20 @@ static void unlock(grpc_call *call) { gpr_mu_unlock(&call->mu); - if (cancel_alarm) { - grpc_alarm_cancel(&call->alarm); - } - if (start_op) { - execute_op(call, &op); + execute_op(exec_ctx, call, &op); } if (completing_requests > 0) { for (i = 0; i < completing_requests; i++) { - completed_requests[i].on_complete(call, completed_requests[i].success, + completed_requests[i].on_complete(exec_ctx, call, + completed_requests[i].success, completed_requests[i].user_data); } lock(call); call->completing = 0; - unlock(call); - GRPC_CALL_INTERNAL_UNREF(call, "completing", 0); + unlock(exec_ctx, call); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing"); } } @@ -828,7 +819,7 @@ static void early_out_write_ops(grpc_call *call) { } } -static void call_on_done_send(void *pc, int success) { +static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *pc, int success) { grpc_call *call = pc; lock(call); if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { @@ -851,8 +842,8 @@ static void call_on_done_send(void *pc, int success) { call->send_ops.nops = 0; call->last_send_contains = 0; call->sending = 0; - unlock(call); - GRPC_CALL_INTERNAL_UNREF(call, "sending", 0); + unlock(exec_ctx, call); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "sending"); } static void finish_message(grpc_call *call) { @@ -958,7 +949,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { } } -static void call_on_done_recv(void *pc, int success) { +static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) { grpc_call *call = pc; grpc_call *child_call; grpc_call *next_child_call; @@ -973,7 +964,7 @@ static void call_on_done_recv(void *pc, int success) { case GRPC_NO_OP: break; case GRPC_OP_METADATA: - recv_metadata(call, &op->data.metadata); + recv_metadata(exec_ctx, call, &op->data.metadata); break; case GRPC_OP_BEGIN_MESSAGE: success = begin_message(call, op->data.begin_message); @@ -994,7 +985,9 @@ static void call_on_done_recv(void *pc, int success) { if (call->recv_state == GRPC_STREAM_CLOSED) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; - call->cancel_alarm |= call->have_alarm; + if (call->have_alarm) { + grpc_alarm_cancel(exec_ctx, &call->alarm); + } /* propagate cancellation to any interested children */ child_call = call->first_child; if (child_call != NULL) { @@ -1003,12 +996,12 @@ static void call_on_done_recv(void *pc, int success) { if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); grpc_call_cancel(child_call, NULL); - GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); } child_call = next_child_call; } while (child_call != call->first_child); } - GRPC_CALL_INTERNAL_UNREF(call, "closed", 0); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "closed"); } finish_read_ops(call); } else { @@ -1020,9 +1013,9 @@ static void call_on_done_recv(void *pc, int success) { finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0); } call->recv_ops.nops = 0; - unlock(call); + unlock(exec_ctx, call); - GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "receiving"); GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0); } @@ -1037,7 +1030,7 @@ static int prepare_application_metadata(grpc_call *call, size_t count, GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, (const gpr_uint8 *)md->value, - md->value_length, 1); + md->value_length); if (!grpc_mdstr_is_legal_header(l->md->key)) { gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", grpc_mdstr_as_c_string(l->md->key)); @@ -1273,18 +1266,19 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, } grpc_call_error grpc_call_start_ioreq_and_call_back( - grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, - grpc_ioreq_completion_func on_complete, void *user_data) { + grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_ioreq *reqs, + size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) { grpc_call_error err; lock(call); err = start_ioreq(call, reqs, nreqs, on_complete, user_data); - unlock(call); + unlock(exec_ctx, call); return err; } void grpc_call_destroy(grpc_call *c) { int cancel; grpc_call *parent = c->parent; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (parent) { gpr_mu_lock(&parent->mu); @@ -1297,17 +1291,20 @@ void grpc_call_destroy(grpc_call *c) { c->sibling_next->sibling_prev = c->sibling_prev; } gpr_mu_unlock(&parent->mu); - GRPC_CALL_INTERNAL_UNREF(parent, "child", 1); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); } lock(c); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - c->cancel_alarm |= c->have_alarm; + if (c->have_alarm) { + grpc_alarm_cancel(&exec_ctx, &c->alarm); + } cancel = c->read_state != READ_STATE_STREAM_CLOSED; - unlock(c); + unlock(&exec_ctx, c); if (cancel) grpc_call_cancel(c, NULL); - GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); } grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { @@ -1321,17 +1318,19 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, const char *description, void *reserved) { grpc_call_error r; - (void)reserved; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_ASSERT(reserved == NULL); lock(c); r = cancel_with_status(c, status, description); - unlock(c); + unlock(&exec_ctx, c); + grpc_exec_ctx_finish(&exec_ctx); return r; } static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) { grpc_mdstr *details = - description ? grpc_mdstr_from_string(c->metadata_context, description, 0) + description ? grpc_mdstr_from_string(c->metadata_context, description) : NULL; GPR_ASSERT(status != GRPC_STATUS_OK); @@ -1344,22 +1343,25 @@ static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, return GRPC_CALL_OK; } -static void finished_loose_op(void *call, int success_ignored) { - GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0); +static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, + int success_ignored) { + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "loose-op"); } typedef struct { grpc_call *call; - grpc_iomgr_closure closure; + grpc_closure closure; } finished_loose_op_allocated_args; -static void finished_loose_op_allocated(void *alloc, int success) { +static void finished_loose_op_allocated(grpc_exec_ctx *exec_ctx, void *alloc, + int success) { finished_loose_op_allocated_args *args = alloc; - finished_loose_op(args->call, success); + finished_loose_op(exec_ctx, args->call, success); gpr_free(args); } -static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { +static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op *op) { grpc_call_element *elem; GPR_ASSERT(op->on_consumed == NULL); @@ -1370,27 +1372,29 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { } else { finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); args->call = call; - grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, - args); + grpc_closure_init(&args->closure, finished_loose_op_allocated, args); op->on_consumed = &args->closure; } } elem = CALL_ELEM_FROM_CALL(call, 0); op->context = call->context; - elem->filter->start_transport_stream_op(elem, op); + elem->filter->start_transport_stream_op(exec_ctx, elem, op); } char *grpc_call_get_peer(grpc_call *call) { grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); - return elem->filter->get_peer(elem); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + char *result = elem->filter->get_peer(&exec_ctx, elem); + grpc_exec_ctx_finish(&exec_ctx); + return result; } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } -static void call_alarm(void *arg, int success) { +static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) { grpc_call *call = arg; lock(call); call->have_alarm = 0; @@ -1399,11 +1403,12 @@ static void call_alarm(void *arg, int success) { "Deadline Exceeded"); } finish_read_ops(call); - unlock(call); - GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1); + unlock(exec_ctx, call); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm"); } -static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { +static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, + gpr_timespec deadline) { if (call->have_alarm) { gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); assert(0); @@ -1412,7 +1417,7 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call, + grpc_alarm_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC)); } @@ -1464,7 +1469,8 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { return algorithm; } -static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { +static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_metadata_batch *md) { grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; @@ -1473,18 +1479,18 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; for (l = md->list.head; l != NULL; l = l->next) { - grpc_mdelem *md = l->md; - grpc_mdstr *key = md->key; + grpc_mdelem *mdel = l->md; + grpc_mdstr *key = mdel->key; if (key == grpc_channel_get_status_string(call->channel)) { - set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); + set_status_code(call, STATUS_FROM_WIRE, decode_status(mdel)); } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value)); + set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(mdel->value)); } else if (key == grpc_channel_get_compression_algorithm_string(call->channel)) { - set_compression_algorithm(call, decode_compression(md)); + set_compression_algorithm(call, decode_compression(mdel)); } else if (key == grpc_channel_get_encodings_accepted_by_peer_string( call->channel)) { - set_encodings_accepted_by_peer(call, md->value->slice); + set_encodings_accepted_by_peer(call, mdel->value->slice); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { @@ -1493,9 +1499,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); } mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(md->key); - mdusr->value = grpc_mdstr_as_c_string(md->value); - mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + mdusr->key = grpc_mdstr_as_c_string(mdel->key); + mdusr->value = grpc_mdstr_as_c_string(mdel->value); + mdusr->value_length = GPR_SLICE_LENGTH(mdel->value->slice); if (call->owned_metadata_count == call->owned_metadata_capacity) { call->owned_metadata_capacity = GPR_MAX(call->owned_metadata_capacity + 8, @@ -1504,14 +1510,14 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { gpr_realloc(call->owned_metadata, sizeof(grpc_mdelem *) * call->owned_metadata_capacity); } - call->owned_metadata[call->owned_metadata_count++] = md; - l->md = 0; + call->owned_metadata[call->owned_metadata_count++] = mdel; + l->md = NULL; } } if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { - set_deadline_alarm(call, md->deadline); + set_deadline_alarm(exec_ctx, call, md->deadline); } if (!is_trailing) { call->read_state = READ_STATE_GOT_INITIAL_METADATA; @@ -1543,13 +1549,15 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); } -static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, success, done_completion, call, +static void finish_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, int success, + void *tag) { + grpc_cq_end_op(exec_ctx, call->cq, tag, success, done_completion, call, allocate_completion(call)); } -static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, 1, done_completion, call, +static void finish_batch_with_close(grpc_exec_ctx *exec_ctx, grpc_call *call, + int success, void *tag) { + grpc_cq_end_op(exec_ctx, call->cq, tag, 1, done_completion, call, allocate_completion(call)); } @@ -1568,30 +1576,45 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; - void (*finish_func)(grpc_call *, int, void *) = finish_batch; + void (*finish_func)(grpc_exec_ctx *, grpc_call *, int, void *) = finish_batch; + grpc_call_error error; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - if (reserved != NULL) return GRPC_CALL_ERROR; + if (reserved != NULL) { + error = GRPC_CALL_ERROR; + goto done; + } GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); if (nops == 0) { grpc_cq_begin_op(call->cq); GRPC_CALL_INTERNAL_REF(call, "completion"); - grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + grpc_cq_end_op(&exec_ctx, call->cq, tag, 1, done_completion, call, allocate_completion(call)); - return GRPC_CALL_OK; + error = GRPC_CALL_OK; + goto done; } /* rewrite batch ops into ioreq ops */ for (in = 0, out = 0; in < nops; in++) { op = &ops[in]; - if (op->reserved != NULL) return GRPC_CALL_ERROR; + if (op->reserved != NULL) { + error = GRPC_CALL_ERROR; + goto done; + } switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: /* Flag validation: currently allow no flags */ - if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; + if (op->flags != 0) { + error = GRPC_CALL_ERROR_INVALID_FLAGS; + goto done; + } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; req->data.send_metadata.count = op->data.send_initial_metadata.count; req->data.send_metadata.metadata = @@ -1600,36 +1623,55 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { - return GRPC_CALL_ERROR_INVALID_FLAGS; + error = GRPC_CALL_ERROR_INVALID_FLAGS; + goto done; } if (op->data.send_message == NULL) { - return GRPC_CALL_ERROR_INVALID_MESSAGE; + error = GRPC_CALL_ERROR_INVALID_MESSAGE; + goto done; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; req->flags = op->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ - if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; + if (op->flags != 0) { + error = GRPC_CALL_ERROR_INVALID_FLAGS; + goto done; + } if (!call->is_client) { - return GRPC_CALL_ERROR_NOT_ON_SERVER; + error = GRPC_CALL_ERROR_NOT_ON_SERVER; + goto done; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_SEND_CLOSE; req->flags = op->flags; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: /* Flag validation: currently allow no flags */ - if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; + if (op->flags != 0) { + error = GRPC_CALL_ERROR_INVALID_FLAGS; + goto done; + } if (call->is_client) { - return GRPC_CALL_ERROR_NOT_ON_CLIENT; + error = GRPC_CALL_ERROR_NOT_ON_CLIENT; + goto done; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; req->flags = op->flags; req->data.send_metadata.count = @@ -1637,27 +1679,40 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->data.send_metadata.metadata = op->data.send_status_from_server.trailing_metadata; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_SEND_STATUS; req->data.send_status.code = op->data.send_status_from_server.status; req->data.send_status.details = op->data.send_status_from_server.status_details != NULL ? grpc_mdstr_from_string( call->metadata_context, - op->data.send_status_from_server.status_details, 0) + op->data.send_status_from_server.status_details) : NULL; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_SEND_CLOSE; break; case GRPC_OP_RECV_INITIAL_METADATA: /* Flag validation: currently allow no flags */ - if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; + if (op->flags != 0) { + error = GRPC_CALL_ERROR_INVALID_FLAGS; + goto done; + } if (!call->is_client) { - return GRPC_CALL_ERROR_NOT_ON_SERVER; + error = GRPC_CALL_ERROR_NOT_ON_SERVER; + goto done; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; req->data.recv_metadata->count = 0; @@ -1665,55 +1720,86 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, break; case GRPC_OP_RECV_MESSAGE: /* Flag validation: currently allow no flags */ - if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; + if (op->flags != 0) { + error = GRPC_CALL_ERROR_INVALID_FLAGS; + goto done; + } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_RECV_MESSAGE; req->data.recv_message = op->data.recv_message; req->flags = op->flags; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: /* Flag validation: currently allow no flags */ - if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; + if (op->flags != 0) { + error = GRPC_CALL_ERROR_INVALID_FLAGS; + goto done; + } if (!call->is_client) { - return GRPC_CALL_ERROR_NOT_ON_SERVER; + error = GRPC_CALL_ERROR_NOT_ON_SERVER; + goto done; } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_RECV_STATUS; req->flags = op->flags; req->data.recv_status.set_value = set_status_value_directly; req->data.recv_status.user_data = op->data.recv_status_on_client.status; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_RECV_STATUS_DETAILS; req->data.recv_status_details.details = op->data.recv_status_on_client.status_details; req->data.recv_status_details.details_capacity = op->data.recv_status_on_client.status_details_capacity; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; req->data.recv_metadata = op->data.recv_status_on_client.trailing_metadata; req->data.recv_metadata->count = 0; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_RECV_CLOSE; finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ - if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; + if (op->flags != 0) { + error = GRPC_CALL_ERROR_INVALID_FLAGS; + goto done; + } req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_RECV_STATUS; req->flags = op->flags; req->data.recv_status.set_value = set_cancelled_value; req->data.recv_status.user_data = op->data.recv_close_on_server.cancelled; req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; + if (out > GRPC_IOREQ_OP_COUNT) { + error = GRPC_CALL_ERROR_BATCH_TOO_BIG; + goto done; + } req->op = GRPC_IOREQ_RECV_CLOSE; finish_func = finish_batch_with_close; break; @@ -1723,7 +1809,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, GRPC_CALL_INTERNAL_REF(call, "completion"); grpc_cq_begin_op(call->cq); - return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); + error = grpc_call_start_ioreq_and_call_back(&exec_ctx, call, reqs, out, + finish_func, tag); +done: + grpc_exec_ctx_finish(&exec_ctx); + return error; } void grpc_call_context_set(grpc_call *call, grpc_context_index elem, |