diff options
author | 2015-04-24 08:04:59 -0700 | |
---|---|---|
committer | 2015-04-24 08:04:59 -0700 | |
commit | 48b9fde74e291d1fcbc2f975c58befb44d7e90d9 (patch) | |
tree | 907c29c05bdfc63a839b906c92a7dea27add8962 /src/core | |
parent | 7d4a96a58b9770ad1cb41492f2afb49d7f3d9fd2 (diff) |
call progress
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/surface/call.c | 79 |
1 files changed, 43 insertions, 36 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 135ab040ae..21390b59f0 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -610,7 +610,7 @@ static void call_on_done_send(void *pc, int success) { if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error); finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK); } call->last_send_contains = 0; call->sending = 0; @@ -698,35 +698,41 @@ static void call_on_done_recv(void *pc, int success) { int unref = 0; lock(call); call->receiving = 0; - for (i = 0; success && i < call->recv_ops.nops; i++) { - grpc_stream_op *op = &call->recv_ops.ops[i]; - switch (op->type) { - case GRPC_NO_OP: - break; - case GRPC_OP_METADATA: - recv_metadata(call, &op->data.metadata); - break; - case GRPC_OP_BEGIN_MESSAGE: - success = begin_message(call, op->data.begin_message); - break; - case GRPC_OP_SLICE: - success = add_slice_to_message(call, op->data.slice); - break; + if (success) { + for (i = 0; success && i < call->recv_ops.nops; i++) { + grpc_stream_op *op = &call->recv_ops.ops[i]; + switch (op->type) { + case GRPC_NO_OP: + break; + case GRPC_OP_METADATA: + recv_metadata(call, &op->data.metadata); + break; + case GRPC_OP_BEGIN_MESSAGE: + success = begin_message(call, op->data.begin_message); + break; + case GRPC_OP_SLICE: + success = add_slice_to_message(call, op->data.slice); + break; + } } + if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); + call->read_state = READ_STATE_READ_CLOSED; + } + if (call->recv_state == GRPC_STREAM_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); + call->read_state = READ_STATE_STREAM_CLOSED; + unref = 1; + } + finish_read_ops(call); + } else { + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR); } - if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { - GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); - call->read_state = READ_STATE_READ_CLOSED; - } - if (call->recv_state == GRPC_STREAM_CLOSED) { - GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); - call->read_state = READ_STATE_STREAM_CLOSED; - unref = 1; - } - if (!success) { - abort(); - } - finish_read_ops(call); unlock(call); if (unref) { @@ -992,26 +998,27 @@ void grpc_call_destroy(grpc_call *c) { } grpc_call_error grpc_call_cancel(grpc_call *call) { - grpc_transport_op op; - memset(&op, 0, sizeof(op)); - op.cancel_with_status = GRPC_STATUS_CANCELLED; - - execute_op(call, &op); - - return GRPC_CALL_OK; + return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled"); } grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) { + grpc_transport_op op; grpc_mdstr *details = description ? grpc_mdstr_from_string(c->metadata_context, description) : NULL; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = status; + lock(c); set_status_code(c, STATUS_FROM_API_OVERRIDE, status); set_status_details(c, STATUS_FROM_API_OVERRIDE, details); unlock(c); - return grpc_call_cancel(c); + + execute_op(c, &op); + + return GRPC_CALL_OK; } static void execute_op(grpc_call *call, grpc_transport_op *op) { |