diff options
-rw-r--r-- | src/core/surface/call.c | 420 |
1 files changed, 194 insertions, 226 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index da966c874a..382909c865 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -46,8 +46,6 @@ #include <stdlib.h> #include <string.h> -#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0) - typedef struct legacy_state legacy_state; static void destroy_legacy_state(legacy_state *ls); @@ -67,31 +65,10 @@ typedef struct { grpc_op_error status; } completed_request; -/* See reqinfo.set below for a description */ +/* See request_set in grpc_call below for a description */ #define REQSET_EMPTY 255 #define REQSET_DONE 254 -/* The state of an ioreq - we keep one of these on the call for each - grpc_ioreq_op type. - - These structures are manipulated in sets, where a set is a set of - operations begin with the same call to start_ioreq and the various - public and private api's that call it. Each set has a master reqinfo - in which we set a few additional fields - see reqinfo_master. */ -typedef struct { - /* User supplied parameters */ - grpc_ioreq_data data; - /* In which set is this ioreq? - This value could be: - - an element of grpc_ioreq_op enumeration, in which case - it designates the master ioreq in a set of requests - - REQSET_EMPTY, in which case this reqinfo type has no application - request against it - - REQSET_DONE, in which case this reqinfo has been satisfied for - all time for this call, and no further use will be made of it */ - gpr_uint8 set; -} reqinfo; - typedef struct { /* Overall status of the operation: starts OK, may degrade to non-OK */ @@ -128,7 +105,7 @@ typedef struct { /* How far through the GRPC stream have we read? */ typedef enum { /* We are still waiting for initial metadata to complete */ - READ_STATE_INITIAL, + READ_STATE_INITIAL = 0, /* We have gotten initial metadata, and are reading either messages or trailing metadata */ READ_STATE_GOT_INITIAL_METADATA, @@ -138,6 +115,12 @@ typedef enum { READ_STATE_STREAM_CLOSED } read_state; +typedef enum { + WRITE_STATE_INITIAL = 0, + WRITE_STATE_STARTED, + WRITE_STATE_WRITE_CLOSED +} write_state; + struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; @@ -147,17 +130,18 @@ struct grpc_call { gpr_uint8 is_client; read_state read_state; + write_state write_state; gpr_uint8 have_alarm; gpr_uint8 sending; gpr_uint8 num_completed_requests; gpr_uint8 need_more_data; - reqinfo requests[GRPC_IOREQ_OP_COUNT]; + gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; + grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; grpc_byte_buffer_queue incoming_queue; - grpc_metadata_array buffered_initial_metadata; - grpc_metadata_array buffered_trailing_metadata; + grpc_metadata_array buffered_metadata[2]; grpc_mdelem **owned_metadata; size_t owned_metadata_count; size_t owned_metadata_capacity; @@ -171,7 +155,7 @@ struct grpc_call { legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -200,11 +184,11 @@ grpc_call *grpc_call_create(grpc_channel *channel, call->channel = channel; call->is_client = server_transport_data == NULL; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - call->requests[i].set = REQSET_EMPTY; + call->request_set[i] = REQSET_EMPTY; } if (call->is_client) { - call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE; - call->requests[GRPC_IOREQ_SEND_STATUS].set = REQSET_DONE; + call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; + call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; } grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); @@ -233,8 +217,9 @@ static void destroy_call(void *call, int ignored_success) { grpc_mdelem_unref(c->owned_metadata[i]); } gpr_free(c->owned_metadata); - gpr_free(c->buffered_initial_metadata.metadata); - gpr_free(c->buffered_trailing_metadata.metadata); + for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { + gpr_free(c->buffered_metadata[i].metadata); + } if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } @@ -284,6 +269,14 @@ static void request_more_data(grpc_call *call) { grpc_call_execute_op(call, &op); } +static int is_op_live(grpc_call *call, grpc_ioreq_op op) { + gpr_uint8 set = call->request_set[op]; + reqinfo_master *master; + if (set >= GRPC_IOREQ_OP_COUNT) return 0; + master = &call->masters[set]; + return (master->complete_mask & (1 << op)) == 0; +} + static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static void unlock(grpc_call *call) { @@ -291,8 +284,7 @@ static void unlock(grpc_call *call) { completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int num_completed_requests = call->num_completed_requests; int need_more_data = - call->need_more_data && - call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set == REQSET_DONE; + call->need_more_data && !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA); int i; if (need_more_data) { @@ -362,36 +354,70 @@ no_details: **args.details = 0; } -static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, - grpc_op_error status) { +static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, + grpc_op_error status) { completed_request *cr; + gpr_uint8 master_set = call->request_set[op]; + reqinfo_master *master; size_t i; - if (call->requests[op].set < GRPC_IOREQ_OP_COUNT) { - reqinfo_master *master = &call->masters[call->requests[op].set]; - /* ioreq is live: we need to do something */ - master->complete_mask |= 1 << op; - if (status != GRPC_OP_OK) { - master->status = status; - } - call->requests[op].set = - (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE) - ? REQSET_EMPTY - : REQSET_DONE; - if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) { - if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { - get_final_status( - call, call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status); + /* ioreq is live: we need to do something */ + master = &call->masters[master_set]; + master->complete_mask |= 1 << op; + if (status != GRPC_OP_OK) { + master->status = status; + master->complete_mask = master->need_mask; + } + if (master->complete_mask == master->need_mask) { + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (call->request_set[i] != master_set) { + continue; } - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (call->requests[i].set == op) { - call->requests[i].set = REQSET_EMPTY; - } + call->request_set[i] = REQSET_DONE; + switch ((grpc_ioreq_op)i) { + case GRPC_IOREQ_RECV_MESSAGE: + case GRPC_IOREQ_SEND_MESSAGE: + if (master->status == GRPC_OP_OK) { + call->request_set[i] = REQSET_EMPTY; + } else { + call->write_state = WRITE_STATE_WRITE_CLOSED; + } + break; + case GRPC_IOREQ_RECV_CLOSE: + case GRPC_IOREQ_SEND_INITIAL_METADATA: + case GRPC_IOREQ_SEND_TRAILING_METADATA: + case GRPC_IOREQ_SEND_STATUS: + case GRPC_IOREQ_SEND_CLOSE: + break; + case GRPC_IOREQ_RECV_STATUS: + get_final_status( + call, call->request_data[GRPC_IOREQ_RECV_STATUS].recv_status); + break; + case GRPC_IOREQ_RECV_INITIAL_METADATA: + SWAP(grpc_metadata_array, call->buffered_metadata[0], + *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA] + .recv_metadata); + break; + case GRPC_IOREQ_RECV_TRAILING_METADATA: + SWAP(grpc_metadata_array, call->buffered_metadata[1], + *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA] + .recv_metadata); + break; + case GRPC_IOREQ_OP_COUNT: + abort(); + break; } - cr = &call->completed_requests[call->num_completed_requests++]; - cr->status = master->status; - cr->on_complete = master->on_complete; - cr->user_data = master->user_data; } + cr = &call->completed_requests[call->num_completed_requests++]; + cr->status = master->status; + cr->on_complete = master->on_complete; + cr->user_data = master->user_data; + } +} + +static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, + grpc_op_error status) { + if (is_op_live(call, op)) { + finish_live_ioreq_op(call, op, status); } } @@ -417,39 +443,32 @@ static void finish_start_step(void *pc, grpc_op_error error) { } static send_action choose_send_action(grpc_call *call) { - switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set) { - case REQSET_EMPTY: - return SEND_NOTHING; - default: - return SEND_INITIAL_METADATA; - case REQSET_DONE: - break; - } - switch (call->requests[GRPC_IOREQ_SEND_MESSAGE].set) { - case REQSET_EMPTY: - return SEND_NOTHING; - default: - return SEND_MESSAGE; - case REQSET_DONE: - break; - } - switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) { - case REQSET_EMPTY: - case REQSET_DONE: + switch (call->write_state) { + case WRITE_STATE_INITIAL: + if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] != + REQSET_EMPTY) { + call->write_state = WRITE_STATE_STARTED; + return SEND_INITIAL_METADATA; + } return SEND_NOTHING; - default: - if (call->is_client) { - return SEND_FINISH; - } else if (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set != - REQSET_EMPTY && - call->requests[GRPC_IOREQ_SEND_STATUS].set != REQSET_EMPTY) { + case WRITE_STATE_STARTED: + if (call->request_set[GRPC_IOREQ_SEND_MESSAGE] != REQSET_EMPTY) { + return SEND_MESSAGE; + } + if (call->request_set[GRPC_IOREQ_SEND_CLOSE] != REQSET_EMPTY) { + call->write_state = WRITE_STATE_WRITE_CLOSED; finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); - return SEND_TRAILING_METADATA_AND_FINISH; - } else { - return SEND_NOTHING; + return call->is_client ? SEND_FINISH + : SEND_TRAILING_METADATA_AND_FINISH; } + return SEND_NOTHING; + case WRITE_STATE_WRITE_CLOSED: + return SEND_NOTHING; } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return SEND_NOTHING; } static void send_metadata(grpc_call *call, grpc_mdelem *elem) { @@ -474,7 +493,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { abort(); break; case SEND_INITIAL_METADATA: - data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data; + data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; for (i = 0; i < data.send_metadata.count; i++) { const grpc_metadata *md = &data.send_metadata.metadata[i]; send_metadata(call, @@ -491,7 +510,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { grpc_call_execute_op(call, &op); break; case SEND_MESSAGE: - data = call->requests[GRPC_IOREQ_SEND_MESSAGE].data; + data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; op.type = GRPC_SEND_MESSAGE; op.dir = GRPC_CALL_DOWN; op.flags = 0; @@ -502,7 +521,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { break; case SEND_TRAILING_METADATA_AND_FINISH: /* send trailing metadata */ - data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data; + data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; for (i = 0; i < data.send_metadata.count; i++) { const grpc_metadata *md = &data.send_metadata.metadata[i]; send_metadata(call, @@ -512,7 +531,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { } /* send status */ /* TODO(ctiller): cache common status values */ - data = call->requests[GRPC_IOREQ_SEND_STATUS].data; + data = call->request_data[GRPC_IOREQ_SEND_STATUS]; gpr_ltoa(data.send_status.code, status_str); send_metadata( call, @@ -547,12 +566,66 @@ static grpc_call_error start_ioreq_error(grpc_call *call, size_t i; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { if (mutated_ops & (1 << i)) { - call->requests[i].set = REQSET_EMPTY; + call->request_set[i] = REQSET_EMPTY; } } return ret; } +static void finish_read_ops(grpc_call *call) { + int empty; + + if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) { + empty = + (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message = + grpc_bbq_pop(&call->incoming_queue))); + if (!empty) { + finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + empty = grpc_bbq_empty(&call->incoming_queue); + } + } else { + empty = grpc_bbq_empty(&call->incoming_queue); + } + + switch (call->read_state) { + case READ_STATE_STREAM_CLOSED: + if (empty) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); + } + /* fallthrough */ + case READ_STATE_READ_CLOSED: + if (empty) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + } + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); + /* fallthrough */ + case READ_STATE_GOT_INITIAL_METADATA: + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); + /* fallthrough */ + case READ_STATE_INITIAL: + /* do nothing */ + break; + } +} + +static void early_out_write_ops(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_WRITE_CLOSED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK); + /* fallthrough */ + case WRITE_STATE_STARTED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_ERROR); + /* fallthrough */ + case WRITE_STATE_INITIAL: + /* do nothing */ + break; + } +} + static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func completion, @@ -560,7 +633,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t i; gpr_uint32 have_ops = 0; grpc_ioreq_op op; - reqinfo *requests = call->requests; reqinfo_master *master; grpc_ioreq_data data; gpr_uint8 set; @@ -573,17 +645,17 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, for (i = 0; i < nreqs; i++) { op = reqs[i].op; - if (requests[op].set < GRPC_IOREQ_OP_COUNT) { + if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) { return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); - } else if (requests[op].set == REQSET_DONE) { + } else if (call->request_set[op] == REQSET_DONE) { return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED); } have_ops |= 1 << op; data = reqs[i].data; - requests[op].data = data; - requests[op].set = set; + call->request_data[op] = data; + call->request_set[op] = set; } master = &call->masters[set]; @@ -593,83 +665,13 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, master->on_complete = completion; master->user_data = user_data; - for (i = 0; i < nreqs; i++) { - op = reqs[i].op; - data = reqs[i].data; - switch (op) { - case GRPC_IOREQ_OP_COUNT: - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - break; - case GRPC_IOREQ_RECV_MESSAGE: - *data.recv_message = grpc_bbq_pop(&call->incoming_queue); - if (*data.recv_message) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - if (call->read_state == READ_STATE_STREAM_CLOSED && grpc_bbq_empty(&call->incoming_queue)) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); - } - } else { - /* no message: either end of stream or we need more bytes */ - if (call->read_state >= READ_STATE_READ_CLOSED) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - if (call->read_state == READ_STATE_STREAM_CLOSED) { - /* stream closed AND we've drained all messages: signal to the application */ - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); - } - } else { - call->need_more_data = 1; - } - } - break; - case GRPC_IOREQ_RECV_STATUS: - if (call->read_state >= READ_STATE_READ_CLOSED) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); - } - break; - case GRPC_IOREQ_RECV_CLOSE: - if (call->read_state == READ_STATE_STREAM_CLOSED) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); - } - break; - case GRPC_IOREQ_SEND_CLOSE: - if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) { - requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE; - } - if (call->read_state == READ_STATE_STREAM_CLOSED) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR); - } - break; - case GRPC_IOREQ_SEND_MESSAGE: - case GRPC_IOREQ_SEND_INITIAL_METADATA: - case GRPC_IOREQ_SEND_TRAILING_METADATA: - case GRPC_IOREQ_SEND_STATUS: - if (call->read_state == READ_STATE_STREAM_CLOSED) { - finish_ioreq_op(call, op, GRPC_OP_ERROR); - } - break; - case GRPC_IOREQ_RECV_INITIAL_METADATA: - data.recv_metadata->count = 0; - if (call->buffered_initial_metadata.count > 0) { - SWAP(grpc_metadata_array, *data.recv_metadata, - call->buffered_initial_metadata); - } - if (call->read_state >= READ_STATE_GOT_INITIAL_METADATA) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); - } - break; - case GRPC_IOREQ_RECV_TRAILING_METADATA: - data.recv_metadata->count = 0; - if (call->buffered_trailing_metadata.count > 0) { - SWAP(grpc_metadata_array, *data.recv_metadata, - call->buffered_trailing_metadata); - } - if (call->read_state >= READ_STATE_READ_CLOSED) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); - } - break; - } + if (have_ops & (1 << GRPC_IOREQ_RECV_MESSAGE)) { + call->need_more_data = 1; } + finish_read_ops(call); + early_out_write_ops(call); + return GRPC_CALL_OK; } @@ -774,34 +776,21 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void mark_read_closed(grpc_call *call) { - call->read_state = READ_STATE_READ_CLOSED; - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); +static void set_read_state(grpc_call *call, read_state state) { + lock(call); + GPR_ASSERT(call->read_state < state); + call->read_state = state; + finish_read_ops(call); + unlock(call); } void grpc_call_read_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - GPR_ASSERT(call->read_state < READ_STATE_READ_CLOSED); - mark_read_closed(call); - unlock(call); + set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); } void grpc_call_stream_closed(grpc_call_element *elem) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - GPR_ASSERT(call->read_state < READ_STATE_STREAM_CLOSED); - if (call->read_state < READ_STATE_READ_CLOSED) { - mark_read_closed(call); - } - call->read_state = READ_STATE_STREAM_CLOSED; - if (grpc_bbq_empty(&call->incoming_queue)) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); - } - unlock(call); + set_read_state(call, READ_STATE_STREAM_CLOSED); grpc_call_internal_unref(call, 0); } @@ -815,7 +804,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -832,13 +821,8 @@ void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *byte_buffer) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); lock(call); - if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) { - /* there's an outstanding read */ - *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer; - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - } else { - grpc_bbq_push(&call->incoming_queue, byte_buffer); - } + grpc_bbq_push(&call->incoming_queue, byte_buffer); + finish_read_ops(call); unlock(call); } @@ -856,19 +840,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); grpc_mdelem_unref(md); } else { - if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) { - dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < - GRPC_IOREQ_OP_COUNT - ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] - .data.recv_metadata - : &call->buffered_initial_metadata; - } else { - dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < - GRPC_IOREQ_OP_COUNT - ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA] - .data.recv_metadata - : &call->buffered_trailing_metadata; - } + dest = &call->buffered_metadata[call->read_state >= + READ_STATE_GOT_INITIAL_METADATA]; if (dest->count == dest->capacity) { dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); dest->metadata = @@ -894,6 +867,11 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } +void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { + grpc_call *call = grpc_call_from_top_element(surface_element); + set_read_state(call, READ_STATE_GOT_INITIAL_METADATA); +} + /* * LEGACY API IMPLEMENTATION * All this code will disappear as soon as wrappings are updated @@ -1097,16 +1075,6 @@ grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call, return err; } -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { - grpc_call *call = grpc_call_from_top_element(surface_element); - lock(call); - if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) { - call->read_state = READ_STATE_GOT_INITIAL_METADATA; - } - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); - unlock(call); -} - static void finish_read_event(void *p, grpc_op_error error) { if (p) grpc_byte_buffer_destroy(p); } |