aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c380
1 files changed, 235 insertions, 145 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4168c2ef0c..0b917f1561 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -163,8 +163,6 @@ struct grpc_call {
gpr_uint8 bound_pollset;
/* is an error status set */
gpr_uint8 error_status_set;
- /** should the alarm be cancelled */
- gpr_uint8 cancel_alarm;
/** bitmask of allocated completion events in completions */
gpr_uint8 allocated_completions;
/** flag indicating that cancellation is inherited */
@@ -182,15 +180,15 @@ struct grpc_call {
request_set[op] is an integer specifying a set of operations to which
the request belongs:
- - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
- completion, and the integer represents to which group of operations
- the ioreq belongs. Each group is represented by one master, and the
- integer in request_set is an index into masters to find the master
- data.
- - if it is REQSET_EMPTY, the ioreq op is inactive and available to be
- started
- - finally, if request_set[op] is REQSET_DONE, then the operation is
- complete and unavailable to be started again
+ - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
+ completion, and the integer represents to which group of operations
+ the ioreq belongs. Each group is represented by one master, and the
+ integer in request_set is an index into masters to find the master
+ data.
+ - if it is REQSET_EMPTY, the ioreq op is inactive and available to be
+ started
+ - finally, if request_set[op] is REQSET_DONE, then the operation is
+ complete and unavailable to be started again
request_data[op] is the request data as supplied by the initiator of
a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT.
@@ -256,10 +254,10 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
gpr_uint32 incoming_message_flags;
- grpc_iomgr_closure destroy_closure;
- grpc_iomgr_closure on_done_recv;
- grpc_iomgr_closure on_done_send;
- grpc_iomgr_closure on_done_bind;
+ grpc_closure destroy_closure;
+ grpc_closure on_done_recv;
+ grpc_closure on_done_send;
+ grpc_closure on_done_bind;
/** completion events - for completion queue use */
grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
@@ -278,19 +276,22 @@ struct grpc_call {
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
-static void call_on_done_recv(void *call, int success);
-static void call_on_done_send(void *call, int success);
+static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ gpr_timespec deadline);
+static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *call, int success);
+static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *call, int success);
static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op);
-static void execute_op(grpc_call *call, grpc_transport_stream_op *op);
-static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
+static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op *op);
+static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description);
-static void finished_loose_op(void *call, int success);
+static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, int success);
static void lock(grpc_call *call);
-static void unlock(grpc_call *call);
+static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 propagation_mask,
@@ -303,6 +304,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
grpc_transport_stream_op initial_op;
grpc_transport_stream_op *initial_op_ptr = NULL;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_call *call =
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
memset(call, 0, sizeof(grpc_call));
@@ -333,9 +335,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message);
- grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
- grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
- grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
+ grpc_closure_init(&call->on_done_recv, call_on_done_recv, call);
+ grpc_closure_init(&call->on_done_send, call_on_done_send, call);
+ grpc_closure_init(&call->on_done_bind, finished_loose_op, call);
/* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata.
@@ -350,8 +352,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
GRPC_CALL_INTERNAL_REF(call, "receiving");
initial_op_ptr = &initial_op;
}
- grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
- CALL_STACK_FROM_CALL(call));
+ grpc_call_stack_init(&exec_ctx, channel_stack, server_transport_data,
+ initial_op_ptr, CALL_STACK_FROM_CALL(call));
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
@@ -395,19 +397,20 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
}
if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
0) {
- set_deadline_alarm(call, send_deadline);
+ set_deadline_alarm(&exec_ctx, call, send_deadline);
}
+ grpc_exec_ctx_finish(&exec_ctx);
return call;
}
-void grpc_call_set_completion_queue(grpc_call *call,
+void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_completion_queue *cq) {
lock(call);
call->cq = cq;
if (cq) {
GRPC_CQ_INTERNAL_REF(cq, "bind");
}
- unlock(call);
+ unlock(exec_ctx, call);
}
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
@@ -429,13 +432,14 @@ static grpc_cq_completion *allocate_completion(grpc_call *call) {
abort();
}
-static void done_completion(void *call, grpc_cq_completion *completion) {
+static void done_completion(grpc_exec_ctx *exec_ctx, void *call,
+ grpc_cq_completion *completion) {
grpc_call *c = call;
gpr_mu_lock(&c->completion_mu);
c->allocated_completions &=
(gpr_uint8) ~(1u << (completion - c->completions));
gpr_mu_unlock(&c->completion_mu);
- GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, c, "completion");
}
#ifdef GRPC_CALL_REF_COUNT_DEBUG
@@ -448,11 +452,11 @@ void grpc_call_internal_ref(grpc_call *c) {
gpr_ref(&c->internal_refcount);
}
-static void destroy_call(void *call, int ignored_success) {
+static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) {
size_t i;
grpc_call *c = call;
- grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
- GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
gpr_mu_destroy(&c->mu);
gpr_mu_destroy(&c->completion_mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
@@ -486,21 +490,15 @@ static void destroy_call(void *call, int ignored_success) {
}
#ifdef GRPC_CALL_REF_COUNT_DEBUG
-void grpc_call_internal_unref(grpc_call *c, const char *reason,
- int allow_immediate_deletion) {
+void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c,
+ const char *reason) {
gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c,
c->internal_refcount.count, c->internal_refcount.count - 1, reason);
#else
-void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
+void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) {
#endif
if (gpr_unref(&c->internal_refcount)) {
- if (allow_immediate_deletion) {
- destroy_call(c, 1);
- } else {
- c->destroy_closure.cb = destroy_call;
- c->destroy_closure.cb_arg = c;
- grpc_iomgr_add_callback(&c->destroy_closure);
- }
+ destroy_call(exec_ctx, c);
}
}
@@ -599,7 +597,7 @@ static int need_more_data(grpc_call *call) {
(call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called;
}
-static void unlock(grpc_call *call) {
+static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) {
grpc_transport_stream_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
@@ -607,7 +605,6 @@ static void unlock(grpc_call *call) {
int i;
const size_t MAX_RECV_PEEK_AHEAD = 65536;
size_t buffered_bytes;
- int cancel_alarm = 0;
memset(&op, 0, sizeof(op));
@@ -615,9 +612,6 @@ static void unlock(grpc_call *call) {
start_op = op.cancel_with_status != GRPC_STATUS_OK;
call->cancel_with_status = GRPC_STATUS_OK; /* reset */
- cancel_alarm = call->cancel_alarm;
- call->cancel_alarm = 0;
-
if (!call->receiving && need_more_data(call)) {
if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
op.max_recv_bytes = call->incoming_message_length -
@@ -667,23 +661,20 @@ static void unlock(grpc_call *call) {
gpr_mu_unlock(&call->mu);
- if (cancel_alarm) {
- grpc_alarm_cancel(&call->alarm);
- }
-
if (start_op) {
- execute_op(call, &op);
+ execute_op(exec_ctx, call, &op);
}
if (completing_requests > 0) {
for (i = 0; i < completing_requests; i++) {
- completed_requests[i].on_complete(call, completed_requests[i].success,
+ completed_requests[i].on_complete(exec_ctx, call,
+ completed_requests[i].success,
completed_requests[i].user_data);
}
lock(call);
call->completing = 0;
- unlock(call);
- GRPC_CALL_INTERNAL_UNREF(call, "completing", 0);
+ unlock(exec_ctx, call);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing");
}
}
@@ -828,7 +819,7 @@ static void early_out_write_ops(grpc_call *call) {
}
}
-static void call_on_done_send(void *pc, int success) {
+static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *pc, int success) {
grpc_call *call = pc;
lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
@@ -851,8 +842,8 @@ static void call_on_done_send(void *pc, int success) {
call->send_ops.nops = 0;
call->last_send_contains = 0;
call->sending = 0;
- unlock(call);
- GRPC_CALL_INTERNAL_UNREF(call, "sending", 0);
+ unlock(exec_ctx, call);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "sending");
}
static void finish_message(grpc_call *call) {
@@ -958,7 +949,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
}
}
-static void call_on_done_recv(void *pc, int success) {
+static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) {
grpc_call *call = pc;
grpc_call *child_call;
grpc_call *next_child_call;
@@ -973,7 +964,7 @@ static void call_on_done_recv(void *pc, int success) {
case GRPC_NO_OP:
break;
case GRPC_OP_METADATA:
- recv_metadata(call, &op->data.metadata);
+ recv_metadata(exec_ctx, call, &op->data.metadata);
break;
case GRPC_OP_BEGIN_MESSAGE:
success = begin_message(call, op->data.begin_message);
@@ -994,7 +985,9 @@ static void call_on_done_recv(void *pc, int success) {
if (call->recv_state == GRPC_STREAM_CLOSED) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
- call->cancel_alarm |= call->have_alarm;
+ if (call->have_alarm) {
+ grpc_alarm_cancel(exec_ctx, &call->alarm);
+ }
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {
@@ -1003,12 +996,12 @@ static void call_on_done_recv(void *pc, int success) {
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
grpc_call_cancel(child_call, NULL);
- GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
}
child_call = next_child_call;
} while (child_call != call->first_child);
}
- GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "closed");
}
finish_read_ops(call);
} else {
@@ -1020,9 +1013,9 @@ static void call_on_done_recv(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0);
}
call->recv_ops.nops = 0;
- unlock(call);
+ unlock(exec_ctx, call);
- GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "receiving");
GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
}
@@ -1037,7 +1030,7 @@ static int prepare_application_metadata(grpc_call *call, size_t count,
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
(const gpr_uint8 *)md->value,
- md->value_length, 1);
+ md->value_length);
if (!grpc_mdstr_is_legal_header(l->md->key)) {
gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s",
grpc_mdstr_as_c_string(l->md->key));
@@ -1273,18 +1266,19 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
}
grpc_call_error grpc_call_start_ioreq_and_call_back(
- grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
- grpc_ioreq_completion_func on_complete, void *user_data) {
+ grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_ioreq *reqs,
+ size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) {
grpc_call_error err;
lock(call);
err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
- unlock(call);
+ unlock(exec_ctx, call);
return err;
}
void grpc_call_destroy(grpc_call *c) {
int cancel;
grpc_call *parent = c->parent;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (parent) {
gpr_mu_lock(&parent->mu);
@@ -1297,17 +1291,20 @@ void grpc_call_destroy(grpc_call *c) {
c->sibling_next->sibling_prev = c->sibling_prev;
}
gpr_mu_unlock(&parent->mu);
- GRPC_CALL_INTERNAL_UNREF(parent, "child", 1);
+ GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
}
lock(c);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- c->cancel_alarm |= c->have_alarm;
+ if (c->have_alarm) {
+ grpc_alarm_cancel(&exec_ctx, &c->alarm);
+ }
cancel = c->read_state != READ_STATE_STREAM_CLOSED;
- unlock(c);
+ unlock(&exec_ctx, c);
if (cancel) grpc_call_cancel(c, NULL);
- GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
+ GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
@@ -1321,17 +1318,19 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
const char *description,
void *reserved) {
grpc_call_error r;
- (void)reserved;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GPR_ASSERT(reserved == NULL);
lock(c);
r = cancel_with_status(c, status, description);
- unlock(c);
+ unlock(&exec_ctx, c);
+ grpc_exec_ctx_finish(&exec_ctx);
return r;
}
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description) {
grpc_mdstr *details =
- description ? grpc_mdstr_from_string(c->metadata_context, description, 0)
+ description ? grpc_mdstr_from_string(c->metadata_context, description)
: NULL;
GPR_ASSERT(status != GRPC_STATUS_OK);
@@ -1344,22 +1343,25 @@ static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
return GRPC_CALL_OK;
}
-static void finished_loose_op(void *call, int success_ignored) {
- GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
+static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call,
+ int success_ignored) {
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "loose-op");
}
typedef struct {
grpc_call *call;
- grpc_iomgr_closure closure;
+ grpc_closure closure;
} finished_loose_op_allocated_args;
-static void finished_loose_op_allocated(void *alloc, int success) {
+static void finished_loose_op_allocated(grpc_exec_ctx *exec_ctx, void *alloc,
+ int success) {
finished_loose_op_allocated_args *args = alloc;
- finished_loose_op(args->call, success);
+ finished_loose_op(exec_ctx, args->call, success);
gpr_free(args);
}
-static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
+static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);
@@ -1370,27 +1372,29 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
} else {
finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
args->call = call;
- grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated,
- args);
+ grpc_closure_init(&args->closure, finished_loose_op_allocated, args);
op->on_consumed = &args->closure;
}
}
elem = CALL_ELEM_FROM_CALL(call, 0);
op->context = call->context;
- elem->filter->start_transport_stream_op(elem, op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
char *grpc_call_get_peer(grpc_call *call) {
grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- return elem->filter->get_peer(elem);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ char *result = elem->filter->get_peer(&exec_ctx, elem);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return result;
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(void *arg, int success) {
+static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) {
grpc_call *call = arg;
lock(call);
call->have_alarm = 0;
@@ -1399,11 +1403,12 @@ static void call_alarm(void *arg, int success) {
"Deadline Exceeded");
}
finish_read_ops(call);
- unlock(call);
- GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
+ unlock(exec_ctx, call);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
}
-static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
+static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ gpr_timespec deadline) {
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
assert(0);
@@ -1412,7 +1417,7 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call,
+ grpc_alarm_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
gpr_now(GPR_CLOCK_MONOTONIC));
}
@@ -1464,7 +1469,8 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
return algorithm;
}
-static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
+static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_metadata_batch *md) {
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
@@ -1473,18 +1479,18 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
for (l = md->list.head; l != NULL; l = l->next) {
- grpc_mdelem *md = l->md;
- grpc_mdstr *key = md->key;
+ grpc_mdelem *mdel = l->md;
+ grpc_mdstr *key = mdel->key;
if (key == grpc_channel_get_status_string(call->channel)) {
- set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
+ set_status_code(call, STATUS_FROM_WIRE, decode_status(mdel));
} else if (key == grpc_channel_get_message_string(call->channel)) {
- set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value));
+ set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(mdel->value));
} else if (key ==
grpc_channel_get_compression_algorithm_string(call->channel)) {
- set_compression_algorithm(call, decode_compression(md));
+ set_compression_algorithm(call, decode_compression(mdel));
} else if (key == grpc_channel_get_encodings_accepted_by_peer_string(
call->channel)) {
- set_encodings_accepted_by_peer(call, md->value->slice);
+ set_encodings_accepted_by_peer(call, mdel->value->slice);
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1493,9 +1499,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
}
mdusr = &dest->metadata[dest->count++];
- mdusr->key = grpc_mdstr_as_c_string(md->key);
- mdusr->value = grpc_mdstr_as_c_string(md->value);
- mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
+ mdusr->key = grpc_mdstr_as_c_string(mdel->key);
+ mdusr->value = grpc_mdstr_as_c_string(mdel->value);
+ mdusr->value_length = GPR_SLICE_LENGTH(mdel->value->slice);
if (call->owned_metadata_count == call->owned_metadata_capacity) {
call->owned_metadata_capacity =
GPR_MAX(call->owned_metadata_capacity + 8,
@@ -1504,14 +1510,14 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
gpr_realloc(call->owned_metadata,
sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
}
- call->owned_metadata[call->owned_metadata_count++] = md;
- l->md = 0;
+ call->owned_metadata[call->owned_metadata_count++] = mdel;
+ l->md = NULL;
}
}
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
- set_deadline_alarm(call, md->deadline);
+ set_deadline_alarm(exec_ctx, call, md->deadline);
}
if (!is_trailing) {
call->read_state = READ_STATE_GOT_INITIAL_METADATA;
@@ -1543,13 +1549,15 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
*(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
}
-static void finish_batch(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, success, done_completion, call,
+static void finish_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, int success,
+ void *tag) {
+ grpc_cq_end_op(exec_ctx, call->cq, tag, success, done_completion, call,
allocate_completion(call));
}
-static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
+static void finish_batch_with_close(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ int success, void *tag) {
+ grpc_cq_end_op(exec_ctx, call->cq, tag, 1, done_completion, call,
allocate_completion(call));
}
@@ -1568,30 +1576,45 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
- void (*finish_func)(grpc_call *, int, void *) = finish_batch;
+ void (*finish_func)(grpc_exec_ctx *, grpc_call *, int, void *) = finish_batch;
+ grpc_call_error error;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- if (reserved != NULL) return GRPC_CALL_ERROR;
+ if (reserved != NULL) {
+ error = GRPC_CALL_ERROR;
+ goto done;
+ }
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
grpc_cq_begin_op(call->cq);
GRPC_CALL_INTERNAL_REF(call, "completion");
- grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
+ grpc_cq_end_op(&exec_ctx, call->cq, tag, 1, done_completion, call,
allocate_completion(call));
- return GRPC_CALL_OK;
+ error = GRPC_CALL_OK;
+ goto done;
}
/* rewrite batch ops into ioreq ops */
for (in = 0, out = 0; in < nops; in++) {
op = &ops[in];
- if (op->reserved != NULL) return GRPC_CALL_ERROR;
+ if (op->reserved != NULL) {
+ error = GRPC_CALL_ERROR;
+ goto done;
+ }
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* Flag validation: currently allow no flags */
- if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+ if (op->flags != 0) {
+ error = GRPC_CALL_ERROR_INVALID_FLAGS;
+ goto done;
+ }
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
@@ -1600,36 +1623,55 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
- return GRPC_CALL_ERROR_INVALID_FLAGS;
+ error = GRPC_CALL_ERROR_INVALID_FLAGS;
+ goto done;
}
if (op->data.send_message == NULL) {
- return GRPC_CALL_ERROR_INVALID_MESSAGE;
+ error = GRPC_CALL_ERROR_INVALID_MESSAGE;
+ goto done;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
req->flags = op->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */
- if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+ if (op->flags != 0) {
+ error = GRPC_CALL_ERROR_INVALID_FLAGS;
+ goto done;
+ }
if (!call->is_client) {
- return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ error = GRPC_CALL_ERROR_NOT_ON_SERVER;
+ goto done;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_SEND_CLOSE;
req->flags = op->flags;
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
/* Flag validation: currently allow no flags */
- if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+ if (op->flags != 0) {
+ error = GRPC_CALL_ERROR_INVALID_FLAGS;
+ goto done;
+ }
if (call->is_client) {
- return GRPC_CALL_ERROR_NOT_ON_CLIENT;
+ error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
+ goto done;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
req->flags = op->flags;
req->data.send_metadata.count =
@@ -1637,27 +1679,40 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->data.send_metadata.metadata =
op->data.send_status_from_server.trailing_metadata;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_SEND_STATUS;
req->data.send_status.code = op->data.send_status_from_server.status;
req->data.send_status.details =
op->data.send_status_from_server.status_details != NULL
? grpc_mdstr_from_string(
call->metadata_context,
- op->data.send_status_from_server.status_details, 0)
+ op->data.send_status_from_server.status_details)
: NULL;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
/* Flag validation: currently allow no flags */
- if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+ if (op->flags != 0) {
+ error = GRPC_CALL_ERROR_INVALID_FLAGS;
+ goto done;
+ }
if (!call->is_client) {
- return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ error = GRPC_CALL_ERROR_NOT_ON_SERVER;
+ goto done;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
req->data.recv_metadata->count = 0;
@@ -1665,55 +1720,86 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
break;
case GRPC_OP_RECV_MESSAGE:
/* Flag validation: currently allow no flags */
- if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+ if (op->flags != 0) {
+ error = GRPC_CALL_ERROR_INVALID_FLAGS;
+ goto done;
+ }
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
req->flags = op->flags;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
/* Flag validation: currently allow no flags */
- if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+ if (op->flags != 0) {
+ error = GRPC_CALL_ERROR_INVALID_FLAGS;
+ goto done;
+ }
if (!call->is_client) {
- return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ error = GRPC_CALL_ERROR_NOT_ON_SERVER;
+ goto done;
}
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_RECV_STATUS;
req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
req->data.recv_status_details.details =
op->data.recv_status_on_client.status_details;
req->data.recv_status_details.details_capacity =
op->data.recv_status_on_client.status_details_capacity;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
req->data.recv_metadata =
op->data.recv_status_on_client.trailing_metadata;
req->data.recv_metadata->count = 0;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
/* Flag validation: currently allow no flags */
- if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+ if (op->flags != 0) {
+ error = GRPC_CALL_ERROR_INVALID_FLAGS;
+ goto done;
+ }
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_RECV_STATUS;
req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
- if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ if (out > GRPC_IOREQ_OP_COUNT) {
+ error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+ goto done;
+ }
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
break;
@@ -1723,7 +1809,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_INTERNAL_REF(call, "completion");
grpc_cq_begin_op(call->cq);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
+ error = grpc_call_start_ioreq_and_call_back(&exec_ctx, call, reqs, out,
+ finish_func, tag);
+done:
+ grpc_exec_ctx_finish(&exec_ctx);
+ return error;
}
void grpc_call_context_set(grpc_call *call, grpc_context_index elem,