aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c56
1 files changed, 33 insertions, 23 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index ec6fd65ea3..4d2ba7cd7d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
switch ((grpc_ioreq_op)i) {
case GRPC_IOREQ_RECV_MESSAGE:
case GRPC_IOREQ_SEND_MESSAGE:
- if (master->success) {
- call->request_set[i] = REQSET_EMPTY;
- } else {
+ call->request_set[i] = REQSET_EMPTY;
+ if (!master->success) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
@@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
}
}
+static void early_out_write_ops(grpc_call *call) {
+ switch (call->write_state) {
+ case WRITE_STATE_WRITE_CLOSED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ /* fallthrough */
+ case WRITE_STATE_STARTED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
+ /* fallthrough */
+ case WRITE_STATE_INITIAL:
+ /* do nothing */
+ break;
+ }
+}
+
static void call_on_done_send(void *pc, int success) {
grpc_call *call = pc;
lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
+ call->write_state = WRITE_STATE_STARTED;
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
@@ -596,6 +613,11 @@ static void call_on_done_send(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ }
+ if (!success) {
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ early_out_write_ops(call);
}
call->send_ops.nops = 0;
call->last_send_contains = 0;
@@ -811,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->send_ops = &call->send_ops;
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
- call->write_state = WRITE_STATE_STARTED;
call->send_initial_metadata_count = 0;
/* fall through intended */
case WRITE_STATE_STARTED:
@@ -827,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->is_last_send = 1;
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
- call->write_state = WRITE_STATE_WRITE_CLOSED;
if (!call->is_client) {
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
@@ -919,23 +939,6 @@ static void finish_read_ops(grpc_call *call) {
}
}
-static void early_out_write_ops(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_WRITE_CLOSED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
- /* fallthrough */
- case WRITE_STATE_STARTED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
- /* fallthrough */
- case WRITE_STATE_INITIAL:
- /* do nothing */
- break;
- }
-}
-
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs,
grpc_ioreq_completion_func completion,
@@ -1176,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
+ grpc_cq_end_op(call->cq, tag, call, success);
+}
+
+static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
@@ -1186,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
+ void (*finish_func)(grpc_call *, int, void *) = finish_batch;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
@@ -1269,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_status_on_client.trailing_metadata;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
req = &reqs[out++];
@@ -1278,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
}
}
grpc_cq_begin_op(call->cq, call);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func,
tag);
}