aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c125
1 files changed, 91 insertions, 34 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5cdd7cd0f6..cf0a595147 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -99,6 +99,8 @@ typedef enum {
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status came from the server sending status */
+ STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
} status_source;
@@ -152,9 +154,13 @@ struct grpc_call {
gpr_uint8 num_completed_requests;
/* are we currently reading a message? */
gpr_uint8 reading_message;
+ /* have we bound a pollset yet? */
+ gpr_uint8 bound_pollset;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint16 last_send_contains;
+ /* cancel with this status on the next outgoing transport op */
+ grpc_status_code cancel_with_status;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
@@ -248,8 +254,10 @@ static void execute_op(grpc_call *call, grpc_transport_op *op);
static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
- const char *description,
- gpr_uint8 locked);
+ const char *description);
+
+static void lock(grpc_call *call);
+static void unlock(grpc_call *call);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -266,6 +274,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_mu_init(&call->mu);
call->channel = channel;
call->cq = cq;
+ if (cq) {
+ GRPC_CQ_INTERNAL_REF(cq, "bind");
+ }
call->is_client = server_transport_data == NULL;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
call->request_set[i] = REQSET_EMPTY;
@@ -282,7 +293,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
call->send_initial_metadata_count = add_initial_metadata_count;
call->send_deadline = send_deadline;
- grpc_channel_internal_ref(channel);
+ GRPC_CHANNEL_INTERNAL_REF(channel, "call");
call->metadata_context = grpc_channel_get_metadata_context(channel);
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
@@ -312,7 +323,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
void grpc_call_set_completion_queue(grpc_call *call,
grpc_completion_queue *cq) {
+ lock(call);
call->cq = cq;
+ if (cq) {
+ GRPC_CQ_INTERNAL_REF(cq, "bind");
+ }
+ unlock(call);
}
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
@@ -333,7 +349,7 @@ static void destroy_call(void *call, int ignored_success) {
size_t i;
grpc_call *c = call;
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
- grpc_channel_internal_unref(c->channel);
+ GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
gpr_mu_destroy(&c->mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (c->status[i].details) {
@@ -359,6 +375,9 @@ static void destroy_call(void *call, int ignored_success) {
grpc_sopb_destroy(&c->recv_ops);
grpc_bbq_destroy(&c->incoming_queue);
gpr_slice_buffer_destroy(&c->incoming_message);
+ if (c->cq) {
+ GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
+ }
gpr_free(c);
}
@@ -411,6 +430,7 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
+ /* TODO(ctiller): this needs some serious cleanup */
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
(is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
@@ -419,7 +439,8 @@ static int need_more_data(grpc_call *call) {
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
- (call->write_state == WRITE_STATE_INITIAL && !call->is_client);
+ (call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
+ (call->cancel_with_status != GRPC_STATUS_OK);
}
static void unlock(grpc_call *call) {
@@ -431,6 +452,10 @@ static void unlock(grpc_call *call) {
memset(&op, 0, sizeof(op));
+ op.cancel_with_status = call->cancel_with_status;
+ start_op = op.cancel_with_status != GRPC_STATUS_OK;
+ call->cancel_with_status = GRPC_STATUS_OK; /* reset */
+
if (!call->receiving && need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
@@ -449,6 +474,12 @@ static void unlock(grpc_call *call) {
}
}
+ if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) {
+ call->bound_pollset = 1;
+ op.bind_pollset = grpc_cq_pollset(call->cq);
+ start_op = 1;
+ }
+
if (!call->completing && call->num_completed_requests != 0) {
completing_requests = call->num_completed_requests;
memcpy(completed_requests, call->completed_requests,
@@ -551,10 +582,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
+ case GRPC_IOREQ_SEND_STATUS:
+ if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details !=
+ NULL) {
+ grpc_mdstr_unref(
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details);
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
+ NULL;
+ }
+ break;
case GRPC_IOREQ_RECV_CLOSE:
case GRPC_IOREQ_SEND_INITIAL_METADATA:
case GRPC_IOREQ_SEND_TRAILING_METADATA:
- case GRPC_IOREQ_SEND_STATUS:
case GRPC_IOREQ_SEND_CLOSE:
break;
case GRPC_IOREQ_RECV_STATUS:
@@ -655,7 +694,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
gpr_asprintf(
&message, "Message terminated early; read %d bytes, expected %d",
(int)call->incoming_message.length, (int)call->incoming_message_length);
- cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
+ cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
gpr_free(message);
return 0;
}
@@ -666,7 +705,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
&message,
"Maximum message length of %d exceeded by a message of length %d",
grpc_channel_get_max_message_length(call->channel), msg.length);
- cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
+ cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
gpr_free(message);
return 0;
} else if (msg.length > 0) {
@@ -688,7 +727,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
/* we have to be reading a message to know what to do here */
if (!call->reading_message) {
cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
- "Received payload data while not reading a message", 1);
+ "Received payload data while not reading a message");
return 0;
}
/* append the slice to the incoming buffer */
@@ -699,7 +738,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
gpr_asprintf(
&message, "Receiving message overflow; read %d bytes, expected %d",
(int)call->incoming_message.length, (int)call->incoming_message_length);
- cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1);
+ cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
gpr_free(message);
return 0;
} else if (call->incoming_message.length == call->incoming_message_length) {
@@ -841,7 +880,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
}
grpc_sopb_add_metadata(&call->send_ops, mdb);
op->send_ops = &call->send_ops;
- op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
call->send_initial_metadata_count = 0;
/* fall through intended */
@@ -880,8 +918,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
call->metadata_context,
grpc_mdstr_ref(
grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_status.details)));
+ data.send_status.details));
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
+ NULL;
}
grpc_sopb_add_metadata(&call->send_ops, mdb);
}
@@ -981,6 +1020,14 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
GRPC_CALL_ERROR_INVALID_METADATA);
}
}
+ if (op == GRPC_IOREQ_SEND_STATUS) {
+ set_status_code(call, STATUS_FROM_SERVER_STATUS,
+ reqs[i].data.send_status.code);
+ if (reqs[i].data.send_status.details) {
+ set_status_details(call, STATUS_FROM_SERVER_STATUS,
+ grpc_mdstr_ref(reqs[i].data.send_status.details));
+ }
+ }
have_ops |= 1u << op;
call->request_data[op] = data;
@@ -1031,35 +1078,43 @@ grpc_call_error grpc_call_cancel(grpc_call *call) {
grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
const char *description) {
- return cancel_with_status(c, status, description, 0);
+ grpc_call_error r;
+ lock(c);
+ r = cancel_with_status(c, status, description);
+ unlock(c);
+ return r;
}
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
- const char *description,
- gpr_uint8 locked) {
- grpc_transport_op op;
+ const char *description) {
grpc_mdstr *details =
description ? grpc_mdstr_from_string(c->metadata_context, description)
: NULL;
- memset(&op, 0, sizeof(op));
- op.cancel_with_status = status;
- if (locked == 0) {
- lock(c);
- }
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+
set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
- if (locked == 0) {
- unlock(c);
- }
- execute_op(c, &op);
+ c->cancel_with_status = status;
return GRPC_CALL_OK;
}
+static void finished_loose_op(void *call, int success_ignored) {
+ GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
+}
+
static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
+
+ GPR_ASSERT(op->on_consumed == NULL);
+ if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
+ GRPC_CALL_INTERNAL_REF(call, "loose-op");
+ op->on_consumed = finished_loose_op;
+ op->on_consumed_user_data = call;
+ }
+
elem = CALL_ELEM_FROM_CALL(call, 0);
op->context = call->context;
elem->filter->start_transport_op(elem, op);
@@ -1072,12 +1127,10 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
static void call_alarm(void *arg, int success) {
grpc_call *call = arg;
if (success) {
- if (call->is_client) {
- cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
- "Deadline Exceeded", 0);
- } else {
- grpc_call_cancel(call);
- }
+ lock(call);
+ cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
+ "Deadline Exceeded");
+ unlock(call);
}
GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
}
@@ -1235,7 +1288,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
- if (!are_write_flags_valid(op->flags)){
+ if (!are_write_flags_valid(op->flags)) {
return GRPC_CALL_ERROR_INVALID_FLAGS;
}
req = &reqs[out++];
@@ -1270,7 +1323,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_SEND_STATUS;
req->data.send_status.code = op->data.send_status_from_server.status;
req->data.send_status.details =
- op->data.send_status_from_server.status_details;
+ op->data.send_status_from_server.status_details != NULL
+ ? grpc_mdstr_from_string(
+ call->metadata_context,
+ op->data.send_status_from_server.status_details)
+ : NULL;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
break;