diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 56 |
1 files changed, 33 insertions, 23 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ec6fd65ea3..4d2ba7cd7d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, switch ((grpc_ioreq_op)i) { case GRPC_IOREQ_RECV_MESSAGE: case GRPC_IOREQ_SEND_MESSAGE: - if (master->success) { - call->request_set[i] = REQSET_EMPTY; - } else { + call->request_set[i] = REQSET_EMPTY; + if (!master->success) { call->write_state = WRITE_STATE_WRITE_CLOSED; } break; @@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { } } +static void early_out_write_ops(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_WRITE_CLOSED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + /* fallthrough */ + case WRITE_STATE_STARTED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); + /* fallthrough */ + case WRITE_STATE_INITIAL: + /* do nothing */ + break; + } +} + static void call_on_done_send(void *pc, int success) { grpc_call *call = pc; lock(call); if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); + call->write_state = WRITE_STATE_STARTED; } if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success); @@ -596,6 +613,11 @@ static void call_on_done_send(void *pc, int success) { finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + call->write_state = WRITE_STATE_WRITE_CLOSED; + } + if (!success) { + call->write_state = WRITE_STATE_WRITE_CLOSED; + early_out_write_ops(call); } call->send_ops.nops = 0; call->last_send_contains = 0; @@ -811,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->send_ops = &call->send_ops; op->bind_pollset = grpc_cq_pollset(call->cq); call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; - call->write_state = WRITE_STATE_STARTED; call->send_initial_metadata_count = 0; /* fall through intended */ case WRITE_STATE_STARTED: @@ -827,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->is_last_send = 1; op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; - call->write_state = WRITE_STATE_WRITE_CLOSED; if (!call->is_client) { /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; @@ -919,23 +939,6 @@ static void finish_read_ops(grpc_call *call) { } } -static void early_out_write_ops(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - /* fallthrough */ - case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); - /* fallthrough */ - case WRITE_STATE_INITIAL: - /* do nothing */ - break; - } -} - static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func completion, @@ -1176,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { + grpc_cq_end_op(call->cq, tag, call, success); +} + +static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } @@ -1186,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; + void (*finish_func)(grpc_call *, int, void *) = finish_batch; GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); @@ -1269,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_status_on_client.trailing_metadata; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: req = &reqs[out++]; @@ -1278,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_close_on_server.cancelled; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; } } grpc_cq_begin_op(call->cq, call); - return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } |