diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 125 |
1 files changed, 91 insertions, 34 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5cdd7cd0f6..cf0a595147 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -99,6 +99,8 @@ typedef enum { /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, + /* Status came from the server sending status */ + STATUS_FROM_SERVER_STATUS, STATUS_SOURCE_COUNT } status_source; @@ -152,9 +154,13 @@ struct grpc_call { gpr_uint8 num_completed_requests; /* are we currently reading a message? */ gpr_uint8 reading_message; + /* have we bound a pollset yet? */ + gpr_uint8 bound_pollset; /* flags with bits corresponding to write states allowing us to determine what was sent */ gpr_uint16 last_send_contains; + /* cancel with this status on the next outgoing transport op */ + grpc_status_code cancel_with_status; /* Active ioreqs. request_set and request_data contain one element per active ioreq @@ -248,8 +254,10 @@ static void execute_op(grpc_call *call, grpc_transport_op *op); static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, - const char *description, - gpr_uint8 locked); + const char *description); + +static void lock(grpc_call *call); +static void unlock(grpc_call *call); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data, @@ -266,6 +274,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_mu_init(&call->mu); call->channel = channel; call->cq = cq; + if (cq) { + GRPC_CQ_INTERNAL_REF(cq, "bind"); + } call->is_client = server_transport_data == NULL; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { call->request_set[i] = REQSET_EMPTY; @@ -282,7 +293,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, } call->send_initial_metadata_count = add_initial_metadata_count; call->send_deadline = send_deadline; - grpc_channel_internal_ref(channel); + GRPC_CHANNEL_INTERNAL_REF(channel, "call"); call->metadata_context = grpc_channel_get_metadata_context(channel); grpc_sopb_init(&call->send_ops); grpc_sopb_init(&call->recv_ops); @@ -312,7 +323,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq) { + lock(call); call->cq = cq; + if (cq) { + GRPC_CQ_INTERNAL_REF(cq, "bind"); + } + unlock(call); } grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { @@ -333,7 +349,7 @@ static void destroy_call(void *call, int ignored_success) { size_t i; grpc_call *c = call; grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); - grpc_channel_internal_unref(c->channel); + GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); gpr_mu_destroy(&c->mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { @@ -359,6 +375,9 @@ static void destroy_call(void *call, int ignored_success) { grpc_sopb_destroy(&c->recv_ops); grpc_bbq_destroy(&c->incoming_queue); gpr_slice_buffer_destroy(&c->incoming_message); + if (c->cq) { + GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); + } gpr_free(c); } @@ -411,6 +430,7 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static int need_more_data(grpc_call *call) { if (call->read_state == READ_STATE_STREAM_CLOSED) return 0; + /* TODO(ctiller): this needs some serious cleanup */ return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) || @@ -419,7 +439,8 @@ static int need_more_data(grpc_call *call) { is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || - (call->write_state == WRITE_STATE_INITIAL && !call->is_client); + (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || + (call->cancel_with_status != GRPC_STATUS_OK); } static void unlock(grpc_call *call) { @@ -431,6 +452,10 @@ static void unlock(grpc_call *call) { memset(&op, 0, sizeof(op)); + op.cancel_with_status = call->cancel_with_status; + start_op = op.cancel_with_status != GRPC_STATUS_OK; + call->cancel_with_status = GRPC_STATUS_OK; /* reset */ + if (!call->receiving && need_more_data(call)) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; @@ -449,6 +474,12 @@ static void unlock(grpc_call *call) { } } + if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) { + call->bound_pollset = 1; + op.bind_pollset = grpc_cq_pollset(call->cq); + start_op = 1; + } + if (!call->completing && call->num_completed_requests != 0) { completing_requests = call->num_completed_requests; memcpy(completed_requests, call->completed_requests, @@ -551,10 +582,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, call->write_state = WRITE_STATE_WRITE_CLOSED; } break; + case GRPC_IOREQ_SEND_STATUS: + if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != + NULL) { + grpc_mdstr_unref( + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; + } + break; case GRPC_IOREQ_RECV_CLOSE: case GRPC_IOREQ_SEND_INITIAL_METADATA: case GRPC_IOREQ_SEND_TRAILING_METADATA: - case GRPC_IOREQ_SEND_STATUS: case GRPC_IOREQ_SEND_CLOSE: break; case GRPC_IOREQ_RECV_STATUS: @@ -655,7 +694,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { gpr_asprintf( &message, "Message terminated early; read %d bytes, expected %d", (int)call->incoming_message.length, (int)call->incoming_message_length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1); + cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); gpr_free(message); return 0; } @@ -666,7 +705,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { &message, "Maximum message length of %d exceeded by a message of length %d", grpc_channel_get_max_message_length(call->channel), msg.length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1); + cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); gpr_free(message); return 0; } else if (msg.length > 0) { @@ -688,7 +727,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { /* we have to be reading a message to know what to do here */ if (!call->reading_message) { cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, - "Received payload data while not reading a message", 1); + "Received payload data while not reading a message"); return 0; } /* append the slice to the incoming buffer */ @@ -699,7 +738,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { gpr_asprintf( &message, "Receiving message overflow; read %d bytes, expected %d", (int)call->incoming_message.length, (int)call->incoming_message_length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message, 1); + cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); gpr_free(message); return 0; } else if (call->incoming_message.length == call->incoming_message_length) { @@ -841,7 +880,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { } grpc_sopb_add_metadata(&call->send_ops, mdb); op->send_ops = &call->send_ops; - op->bind_pollset = grpc_cq_pollset(call->cq); call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; call->send_initial_metadata_count = 0; /* fall through intended */ @@ -880,8 +918,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { call->metadata_context, grpc_mdstr_ref( grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + data.send_status.details)); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; } grpc_sopb_add_metadata(&call->send_ops, mdb); } @@ -981,6 +1020,14 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, GRPC_CALL_ERROR_INVALID_METADATA); } } + if (op == GRPC_IOREQ_SEND_STATUS) { + set_status_code(call, STATUS_FROM_SERVER_STATUS, + reqs[i].data.send_status.code); + if (reqs[i].data.send_status.details) { + set_status_details(call, STATUS_FROM_SERVER_STATUS, + grpc_mdstr_ref(reqs[i].data.send_status.details)); + } + } have_ops |= 1u << op; call->request_data[op] = data; @@ -1031,35 +1078,43 @@ grpc_call_error grpc_call_cancel(grpc_call *call) { grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) { - return cancel_with_status(c, status, description, 0); + grpc_call_error r; + lock(c); + r = cancel_with_status(c, status, description); + unlock(c); + return r; } static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, - const char *description, - gpr_uint8 locked) { - grpc_transport_op op; + const char *description) { grpc_mdstr *details = description ? grpc_mdstr_from_string(c->metadata_context, description) : NULL; - memset(&op, 0, sizeof(op)); - op.cancel_with_status = status; - if (locked == 0) { - lock(c); - } + GPR_ASSERT(status != GRPC_STATUS_OK); + set_status_code(c, STATUS_FROM_API_OVERRIDE, status); set_status_details(c, STATUS_FROM_API_OVERRIDE, details); - if (locked == 0) { - unlock(c); - } - execute_op(c, &op); + c->cancel_with_status = status; return GRPC_CALL_OK; } +static void finished_loose_op(void *call, int success_ignored) { + GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0); +} + static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; + + GPR_ASSERT(op->on_consumed == NULL); + if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) { + GRPC_CALL_INTERNAL_REF(call, "loose-op"); + op->on_consumed = finished_loose_op; + op->on_consumed_user_data = call; + } + elem = CALL_ELEM_FROM_CALL(call, 0); op->context = call->context; elem->filter->start_transport_op(elem, op); @@ -1072,12 +1127,10 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { static void call_alarm(void *arg, int success) { grpc_call *call = arg; if (success) { - if (call->is_client) { - cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, - "Deadline Exceeded", 0); - } else { - grpc_call_cancel(call); - } + lock(call); + cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, + "Deadline Exceeded"); + unlock(call); } GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1); } @@ -1235,7 +1288,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: - if (!are_write_flags_valid(op->flags)){ + if (!are_write_flags_valid(op->flags)) { return GRPC_CALL_ERROR_INVALID_FLAGS; } req = &reqs[out++]; @@ -1270,7 +1323,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_SEND_STATUS; req->data.send_status.code = op->data.send_status_from_server.status; req->data.send_status.details = - op->data.send_status_from_server.status_details; + op->data.send_status_from_server.status_details != NULL + ? grpc_mdstr_from_string( + call->metadata_context, + op->data.send_status_from_server.status_details) + : NULL; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; break; |