diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 305 |
1 files changed, 159 insertions, 146 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5040aeef13..4f8ac6193a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -36,6 +36,7 @@ #include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" +#include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> @@ -59,7 +60,7 @@ typedef struct { grpc_metadata_array trailing_md_in; grpc_recv_status status_in; size_t msg_in_read_idx; - grpc_byte_buffer_array msg_in; + grpc_byte_buffer *msg_in; gpr_uint8 got_status; void *finished_tag; @@ -72,6 +73,7 @@ typedef enum { SEND_INITIAL_METADATA, SEND_MESSAGE, SEND_TRAILING_METADATA, + SEND_STATUS, SEND_FINISH } send_action; @@ -81,10 +83,24 @@ typedef struct { grpc_op_error status; } completed_request; +/* See reqinfo.set below for a description */ +#define REQSET_EMPTY 255 +#define REQSET_DONE 254 + +/* The state of an ioreq */ typedef struct reqinfo { - req_state state; + /* User supplied parameters */ grpc_ioreq_data data; - struct reqinfo *master; + /* In which set is this ioreq? + This value could be: + - an element of grpc_ioreq_op enumeration, in which case + it designates the master ioreq in a set of requests + - REQSET_EMPTY, in which case this reqinfo type has no application + request against it + - REQSET_DONE, in which case this reqinfo has been satisfied for + all time for this call, and no further use will be made of it */ + gpr_uint8 set; + grpc_op_error status; grpc_ioreq_completion_func on_complete; void *user_data; gpr_uint32 need_mask; @@ -122,10 +138,9 @@ struct grpc_call { reqinfo requests[GRPC_IOREQ_OP_COUNT]; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; - grpc_byte_buffer_array buffered_messages; + grpc_byte_buffer_queue incoming_queue; grpc_metadata_array buffered_initial_metadata; grpc_metadata_array buffered_trailing_metadata; - size_t write_index; grpc_mdelem **owned_metadata; size_t owned_metadata_count; size_t owned_metadata_capacity; @@ -159,6 +174,7 @@ static void enact_send_action(grpc_call *call, send_action sa); grpc_call *grpc_call_create(grpc_channel *channel, const void *server_transport_data) { + size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); @@ -166,8 +182,11 @@ grpc_call *grpc_call_create(grpc_channel *channel, gpr_mu_init(&call->mu); call->channel = channel; call->is_client = server_transport_data == NULL; + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + call->requests[i].set = REQSET_EMPTY; + } if (call->is_client) { - call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state = REQ_DONE; + call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE; } grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); @@ -189,15 +208,6 @@ legacy_state *get_legacy_state(grpc_call *call) { void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } -static void destroy_message_array(grpc_byte_buffer_array *array, - size_t start_idx) { - size_t i; - for (i = start_idx; i < array->count; i++) { - grpc_byte_buffer_destroy(array->buffers[i]); - } - gpr_free(array->buffers); -} - static void destroy_call(void *call, int ignored_success) { size_t i, j; grpc_call *c = call; @@ -213,7 +223,6 @@ static void destroy_call(void *call, int ignored_success) { grpc_mdelem_unref(c->owned_metadata[i]); } gpr_free(c->owned_metadata); - destroy_message_array(&c->buffered_messages, 0); gpr_free(c->buffered_initial_metadata.metadata); gpr_free(c->buffered_trailing_metadata.metadata); if (c->legacy_state) { @@ -226,8 +235,6 @@ static void destroy_call(void *call, int ignored_success) { } gpr_free(c->legacy_state->initial_md_in.metadata); gpr_free(c->legacy_state->trailing_md_in.metadata); - destroy_message_array(&c->legacy_state->msg_in, - c->legacy_state->msg_in_read_idx); gpr_free(c->legacy_state); } gpr_free(c); @@ -284,7 +291,7 @@ static void unlock(grpc_call *call) { int num_completed_requests = call->num_completed_requests; int need_more_data = call->need_more_data && - call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state == REQ_DONE; + call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set == REQSET_DONE; int i; if (need_more_data) { @@ -321,124 +328,131 @@ static void unlock(grpc_call *call) { } } -static void get_final_status(grpc_call *call, grpc_status_code *code, - const char **details) { +static void get_final_status(grpc_call *call, grpc_recv_status_args args) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].set) { - *code = call->status[i].code; - *details = call->status[i].details - ? grpc_mdstr_as_c_string(call->status[i].details) - : NULL; + *args.code = call->status[i].code; + if (call->status[i].details) { + gpr_slice details = call->status[i].details->slice; + size_t len = GPR_SLICE_LENGTH(details); + if (len + 1 > *args.details_capacity) { + *args.details_capacity = GPR_MAX(len + 1, *args.details_capacity * 3 / 2); + *args.details = gpr_realloc(*args.details, *args.details_capacity); + } + memcpy(*args.details, GPR_SLICE_START_PTR(details), len); + (*args.details)[len] = 0; + } else { + goto no_details; + } return; } } - *code = GRPC_STATUS_UNKNOWN; - *details = NULL; + *args.code = GRPC_STATUS_UNKNOWN; + +no_details: + if (0 == *args.details_capacity) { + *args.details_capacity = 8; + *args.details = gpr_malloc(*args.details_capacity); + } + **args.details = 0; } static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error status) { - reqinfo *master = call->requests[op].master; completed_request *cr; size_t i; - switch (call->requests[op].state) { - case REQ_INITIAL: /* not started yet */ - return; - case REQ_DONE: /* already finished */ - return; - case REQ_READY: - master->complete_mask |= 1 << op; - call->requests[op].state = - (op == GRPC_IOREQ_SEND_MESSAGES || op == GRPC_IOREQ_RECV_MESSAGES) - ? REQ_INITIAL - : REQ_DONE; - if (master->complete_mask == master->need_mask || - status == GRPC_OP_ERROR) { - if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { - get_final_status( - call, - &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status, - &call->requests[GRPC_IOREQ_RECV_STATUS] - .data.recv_status->details); - } - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (call->requests[i].master == master) { - call->requests[i].master = NULL; + if (call->requests[op].set < GRPC_IOREQ_OP_COUNT) { + reqinfo *master = &call->requests[call->requests[op].set]; + /* ioreq is live: we need to do something */ + master->complete_mask |= 1 << op; + call->requests[op].set = + (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE) + ? REQSET_EMPTY + : REQSET_DONE; + if (master->complete_mask == master->need_mask || + status == GRPC_OP_ERROR) { + if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { + get_final_status( + call, + call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status); + } + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (call->requests[i].set == op) { + if (call->requests[i].status != GRPC_OP_OK) { + status = GRPC_OP_ERROR; } + call->requests[i].set = REQSET_EMPTY; } - cr = &call->completed_requests[call->num_completed_requests++]; - cr->status = status; - cr->on_complete = master->on_complete; - cr->user_data = master->user_data; } + cr = &call->completed_requests[call->num_completed_requests++]; + cr->status = status; + cr->on_complete = master->on_complete; + cr->user_data = master->user_data; + } } } -static void finish_write_step(void *pc, grpc_op_error error) { - grpc_call *call = pc; +static void finish_send_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error error) { lock(call); - if (error == GRPC_OP_OK) { - if (call->write_index == - call->requests[GRPC_IOREQ_SEND_MESSAGES].data.send_messages.count) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_OK); - } - } else { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR); - } + finish_ioreq_op(call, op, error); call->sending = 0; unlock(call); grpc_call_internal_unref(call, 0); } +static void finish_write_step(void *pc, grpc_op_error error) { + finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, error); +} + static void finish_finish_step(void *pc, grpc_op_error error) { - grpc_call *call = pc; - lock(call); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error); - call->sending = 0; - unlock(call); - grpc_call_internal_unref(call, 0); + finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, error); } static void finish_start_step(void *pc, grpc_op_error error) { - grpc_call *call = pc; - lock(call); - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error); - call->sending = 0; - unlock(call); - grpc_call_internal_unref(call, 0); + finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, error); } static send_action choose_send_action(grpc_call *call) { - switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state) { - case REQ_INITIAL: + switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set) { + case REQSET_EMPTY: return SEND_NOTHING; - case REQ_READY: + default: return SEND_INITIAL_METADATA; - case REQ_DONE: + case REQSET_DONE: break; } - switch (call->requests[GRPC_IOREQ_SEND_MESSAGES].state) { - case REQ_INITIAL: + switch (call->requests[GRPC_IOREQ_SEND_MESSAGE].set) { + case REQSET_EMPTY: return SEND_NOTHING; - case REQ_READY: + default: return SEND_MESSAGE; - case REQ_DONE: + case REQSET_DONE: break; } - switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state) { - case REQ_INITIAL: + switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set) { + case REQSET_EMPTY: return SEND_NOTHING; - case REQ_READY: + default: finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); return SEND_TRAILING_METADATA; - case REQ_DONE: + case REQSET_DONE: break; } - switch (call->requests[GRPC_IOREQ_SEND_CLOSE].state) { + switch (call->requests[GRPC_IOREQ_SEND_STATUS].set) { + case REQSET_EMPTY: + return SEND_NOTHING; default: + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); + return SEND_STATUS; + case REQSET_DONE: + break; + } + switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) { + case REQSET_EMPTY: + case REQSET_DONE: return SEND_NOTHING; - case REQ_READY: + default: return SEND_FINISH; } } @@ -458,6 +472,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { grpc_ioreq_data data; grpc_call_op op; size_t i; + char status_str[GPR_LTOA_MIN_BUFSIZE]; switch (sa) { case SEND_NOTHING: @@ -481,11 +496,11 @@ static void enact_send_action(grpc_call *call, send_action sa) { grpc_call_execute_op(call, &op); break; case SEND_MESSAGE: - data = call->requests[GRPC_IOREQ_SEND_MESSAGES].data; + data = call->requests[GRPC_IOREQ_SEND_MESSAGE].data; op.type = GRPC_SEND_MESSAGE; op.dir = GRPC_CALL_DOWN; op.flags = 0; - op.data.message = data.send_messages.messages[call->write_index++]; + op.data.message = data.send_message; op.done_cb = finish_write_step; op.user_data = call; grpc_call_execute_op(call, &op); @@ -504,28 +519,27 @@ static void enact_send_action(grpc_call *call, send_action sa) { unlock(call); grpc_call_internal_unref(call, 0); break; - case SEND_FINISH: - if (!call->is_client) { - /* TODO(ctiller): cache common status values */ - char status_str[GPR_LTOA_MIN_BUFSIZE]; - data = call->requests[GRPC_IOREQ_SEND_CLOSE].data; - gpr_ltoa(data.send_close.status, status_str); - send_metadata( - call, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); - if (data.send_close.details) { - send_metadata(call, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref( - grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_close.details))); - } + case SEND_STATUS: + /* TODO(ctiller): cache common status values */ + data = call->requests[GRPC_IOREQ_SEND_CLOSE].data; + gpr_ltoa(data.send_status.code, status_str); + send_metadata( + call, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, status_str))); + if (data.send_status.details) { + send_metadata(call, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref( + grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_status.details))); } + break; + case SEND_FINISH: op.type = GRPC_SEND_FINISH; op.dir = GRPC_CALL_DOWN; op.flags = 0; @@ -542,7 +556,7 @@ static grpc_call_error start_ioreq_error(grpc_call *call, size_t i; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { if (mutated_ops & (1 << i)) { - call->requests[i].master = NULL; + call->requests[i].set = REQSET_EMPTY; } } return ret; @@ -555,35 +569,32 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t i; gpr_uint32 have_ops = 0; grpc_ioreq_op op; - reqinfo *master = NULL; reqinfo *requests = call->requests; + reqinfo *master; grpc_ioreq_data data; + gpr_uint8 set; + + if (nreqs == 0) { + return GRPC_CALL_OK; + } + + set = reqs[0].op; + master = &requests[set]; for (i = 0; i < nreqs; i++) { op = reqs[i].op; - if (requests[op].master) { + if (requests[op].set < GRPC_IOREQ_OP_COUNT) { return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); - } - switch (requests[op].state) { - case REQ_INITIAL: - break; - case REQ_READY: - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); - case REQ_DONE: - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_ALREADY_INVOKED); - } - if (master == NULL) { - master = &requests[op]; + } else if (requests[op].set == REQSET_DONE) { + return start_ioreq_error(call, have_ops, + GRPC_CALL_ERROR_ALREADY_INVOKED); } have_ops |= 1 << op; data = reqs[i].data; - requests[op].state = REQ_READY; requests[op].data = data; - requests[op].master = master; + requests[op].set = set; } GPR_ASSERT(master != NULL); @@ -598,12 +609,10 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, switch (op) { default: break; - case GRPC_IOREQ_RECV_MESSAGES: - data.recv_messages->count = 0; - if (call->buffered_messages.count > 0 || call->read_closed) { - SWAP(grpc_byte_buffer_array, *data.recv_messages, - call->buffered_messages); - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); + case GRPC_IOREQ_RECV_MESSAGE: + *data.recv_message = grpc_bbq_pop(&call->incoming_queue); + if (*data.recv_message) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); } else { call->need_more_data = 1; } @@ -612,19 +621,18 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, } break; case GRPC_IOREQ_RECV_STATUS: - if (call->stream_closed && call->buffered_messages.count == 0) { + if (call->stream_closed) { finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); } break; - case GRPC_IOREQ_SEND_MESSAGES: + case GRPC_IOREQ_SEND_MESSAGE: if (call->stream_closed) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR); } - call->write_index = 0; break; case GRPC_IOREQ_SEND_CLOSE: - if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) { - requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE; + if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) { + requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE; } break; case GRPC_IOREQ_SEND_INITIAL_METADATA: @@ -1088,8 +1096,13 @@ void grpc_call_recv_message(grpc_call_element *elem, grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_byte_buffer_array *dest; lock(call); - if (call->requests[GRPC_IOREQ_RECV_MESSAGES].master != NULL) { - dest = call->requests[GRPC_IOREQ_RECV_MESSAGES].data.recv_messages; + if (call->requests[GRPC_IOREQ_RECV_MESSAGE].master != NULL) { + if (call->requests[GRPC_IOREQ_RECV_MESSAGE].state != REQ_READY) { + call->requests[GRPC_IOREQ_RECV_MESSAGE].status = GRPC_OP_ERROR; + } else { + *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer; + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + } } else { dest = &call->buffered_messages; } |