aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/surface/call.c420
1 files changed, 194 insertions, 226 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index da966c874a..382909c865 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -46,8 +46,6 @@
#include <stdlib.h>
#include <string.h>
-#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0)
-
typedef struct legacy_state legacy_state;
static void destroy_legacy_state(legacy_state *ls);
@@ -67,31 +65,10 @@ typedef struct {
grpc_op_error status;
} completed_request;
-/* See reqinfo.set below for a description */
+/* See request_set in grpc_call below for a description */
#define REQSET_EMPTY 255
#define REQSET_DONE 254
-/* The state of an ioreq - we keep one of these on the call for each
- grpc_ioreq_op type.
-
- These structures are manipulated in sets, where a set is a set of
- operations begin with the same call to start_ioreq and the various
- public and private api's that call it. Each set has a master reqinfo
- in which we set a few additional fields - see reqinfo_master. */
-typedef struct {
- /* User supplied parameters */
- grpc_ioreq_data data;
- /* In which set is this ioreq?
- This value could be:
- - an element of grpc_ioreq_op enumeration, in which case
- it designates the master ioreq in a set of requests
- - REQSET_EMPTY, in which case this reqinfo type has no application
- request against it
- - REQSET_DONE, in which case this reqinfo has been satisfied for
- all time for this call, and no further use will be made of it */
- gpr_uint8 set;
-} reqinfo;
-
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
non-OK */
@@ -128,7 +105,7 @@ typedef struct {
/* How far through the GRPC stream have we read? */
typedef enum {
/* We are still waiting for initial metadata to complete */
- READ_STATE_INITIAL,
+ READ_STATE_INITIAL = 0,
/* We have gotten initial metadata, and are reading either
messages or trailing metadata */
READ_STATE_GOT_INITIAL_METADATA,
@@ -138,6 +115,12 @@ typedef enum {
READ_STATE_STREAM_CLOSED
} read_state;
+typedef enum {
+ WRITE_STATE_INITIAL = 0,
+ WRITE_STATE_STARTED,
+ WRITE_STATE_WRITE_CLOSED
+} write_state;
+
struct grpc_call {
grpc_completion_queue *cq;
grpc_channel *channel;
@@ -147,17 +130,18 @@ struct grpc_call {
gpr_uint8 is_client;
read_state read_state;
+ write_state write_state;
gpr_uint8 have_alarm;
gpr_uint8 sending;
gpr_uint8 num_completed_requests;
gpr_uint8 need_more_data;
- reqinfo requests[GRPC_IOREQ_OP_COUNT];
+ gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
+ grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
grpc_byte_buffer_queue incoming_queue;
- grpc_metadata_array buffered_initial_metadata;
- grpc_metadata_array buffered_trailing_metadata;
+ grpc_metadata_array buffered_metadata[2];
grpc_mdelem **owned_metadata;
size_t owned_metadata_count;
size_t owned_metadata_capacity;
@@ -171,7 +155,7 @@ struct grpc_call {
legacy_state *legacy_state;
};
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
@@ -200,11 +184,11 @@ grpc_call *grpc_call_create(grpc_channel *channel,
call->channel = channel;
call->is_client = server_transport_data == NULL;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
- call->requests[i].set = REQSET_EMPTY;
+ call->request_set[i] = REQSET_EMPTY;
}
if (call->is_client) {
- call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE;
- call->requests[GRPC_IOREQ_SEND_STATUS].set = REQSET_DONE;
+ call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
+ call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
}
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
@@ -233,8 +217,9 @@ static void destroy_call(void *call, int ignored_success) {
grpc_mdelem_unref(c->owned_metadata[i]);
}
gpr_free(c->owned_metadata);
- gpr_free(c->buffered_initial_metadata.metadata);
- gpr_free(c->buffered_trailing_metadata.metadata);
+ for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
+ gpr_free(c->buffered_metadata[i].metadata);
+ }
if (c->legacy_state) {
destroy_legacy_state(c->legacy_state);
}
@@ -284,6 +269,14 @@ static void request_more_data(grpc_call *call) {
grpc_call_execute_op(call, &op);
}
+static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
+ gpr_uint8 set = call->request_set[op];
+ reqinfo_master *master;
+ if (set >= GRPC_IOREQ_OP_COUNT) return 0;
+ master = &call->masters[set];
+ return (master->complete_mask & (1 << op)) == 0;
+}
+
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static void unlock(grpc_call *call) {
@@ -291,8 +284,7 @@ static void unlock(grpc_call *call) {
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int num_completed_requests = call->num_completed_requests;
int need_more_data =
- call->need_more_data &&
- call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set == REQSET_DONE;
+ call->need_more_data && !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA);
int i;
if (need_more_data) {
@@ -362,36 +354,70 @@ no_details:
**args.details = 0;
}
-static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
- grpc_op_error status) {
+static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
+ grpc_op_error status) {
completed_request *cr;
+ gpr_uint8 master_set = call->request_set[op];
+ reqinfo_master *master;
size_t i;
- if (call->requests[op].set < GRPC_IOREQ_OP_COUNT) {
- reqinfo_master *master = &call->masters[call->requests[op].set];
- /* ioreq is live: we need to do something */
- master->complete_mask |= 1 << op;
- if (status != GRPC_OP_OK) {
- master->status = status;
- }
- call->requests[op].set =
- (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE)
- ? REQSET_EMPTY
- : REQSET_DONE;
- if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) {
- if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
- get_final_status(
- call, call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
+ /* ioreq is live: we need to do something */
+ master = &call->masters[master_set];
+ master->complete_mask |= 1 << op;
+ if (status != GRPC_OP_OK) {
+ master->status = status;
+ master->complete_mask = master->need_mask;
+ }
+ if (master->complete_mask == master->need_mask) {
+ for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+ if (call->request_set[i] != master_set) {
+ continue;
}
- for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
- if (call->requests[i].set == op) {
- call->requests[i].set = REQSET_EMPTY;
- }
+ call->request_set[i] = REQSET_DONE;
+ switch ((grpc_ioreq_op)i) {
+ case GRPC_IOREQ_RECV_MESSAGE:
+ case GRPC_IOREQ_SEND_MESSAGE:
+ if (master->status == GRPC_OP_OK) {
+ call->request_set[i] = REQSET_EMPTY;
+ } else {
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ }
+ break;
+ case GRPC_IOREQ_RECV_CLOSE:
+ case GRPC_IOREQ_SEND_INITIAL_METADATA:
+ case GRPC_IOREQ_SEND_TRAILING_METADATA:
+ case GRPC_IOREQ_SEND_STATUS:
+ case GRPC_IOREQ_SEND_CLOSE:
+ break;
+ case GRPC_IOREQ_RECV_STATUS:
+ get_final_status(
+ call, call->request_data[GRPC_IOREQ_RECV_STATUS].recv_status);
+ break;
+ case GRPC_IOREQ_RECV_INITIAL_METADATA:
+ SWAP(grpc_metadata_array, call->buffered_metadata[0],
+ *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
+ .recv_metadata);
+ break;
+ case GRPC_IOREQ_RECV_TRAILING_METADATA:
+ SWAP(grpc_metadata_array, call->buffered_metadata[1],
+ *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
+ .recv_metadata);
+ break;
+ case GRPC_IOREQ_OP_COUNT:
+ abort();
+ break;
}
- cr = &call->completed_requests[call->num_completed_requests++];
- cr->status = master->status;
- cr->on_complete = master->on_complete;
- cr->user_data = master->user_data;
}
+ cr = &call->completed_requests[call->num_completed_requests++];
+ cr->status = master->status;
+ cr->on_complete = master->on_complete;
+ cr->user_data = master->user_data;
+ }
+}
+
+static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
+ grpc_op_error status) {
+ if (is_op_live(call, op)) {
+ finish_live_ioreq_op(call, op, status);
}
}
@@ -417,39 +443,32 @@ static void finish_start_step(void *pc, grpc_op_error error) {
}
static send_action choose_send_action(grpc_call *call) {
- switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set) {
- case REQSET_EMPTY:
- return SEND_NOTHING;
- default:
- return SEND_INITIAL_METADATA;
- case REQSET_DONE:
- break;
- }
- switch (call->requests[GRPC_IOREQ_SEND_MESSAGE].set) {
- case REQSET_EMPTY:
- return SEND_NOTHING;
- default:
- return SEND_MESSAGE;
- case REQSET_DONE:
- break;
- }
- switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) {
- case REQSET_EMPTY:
- case REQSET_DONE:
+ switch (call->write_state) {
+ case WRITE_STATE_INITIAL:
+ if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] !=
+ REQSET_EMPTY) {
+ call->write_state = WRITE_STATE_STARTED;
+ return SEND_INITIAL_METADATA;
+ }
return SEND_NOTHING;
- default:
- if (call->is_client) {
- return SEND_FINISH;
- } else if (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set !=
- REQSET_EMPTY &&
- call->requests[GRPC_IOREQ_SEND_STATUS].set != REQSET_EMPTY) {
+ case WRITE_STATE_STARTED:
+ if (call->request_set[GRPC_IOREQ_SEND_MESSAGE] != REQSET_EMPTY) {
+ return SEND_MESSAGE;
+ }
+ if (call->request_set[GRPC_IOREQ_SEND_CLOSE] != REQSET_EMPTY) {
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
- return SEND_TRAILING_METADATA_AND_FINISH;
- } else {
- return SEND_NOTHING;
+ return call->is_client ? SEND_FINISH
+ : SEND_TRAILING_METADATA_AND_FINISH;
}
+ return SEND_NOTHING;
+ case WRITE_STATE_WRITE_CLOSED:
+ return SEND_NOTHING;
}
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ return SEND_NOTHING;
}
static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
@@ -474,7 +493,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
abort();
break;
case SEND_INITIAL_METADATA:
- data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data;
+ data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i];
send_metadata(call,
@@ -491,7 +510,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_call_execute_op(call, &op);
break;
case SEND_MESSAGE:
- data = call->requests[GRPC_IOREQ_SEND_MESSAGE].data;
+ data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
@@ -502,7 +521,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
break;
case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */
- data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
+ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i];
send_metadata(call,
@@ -512,7 +531,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
}
/* send status */
/* TODO(ctiller): cache common status values */
- data = call->requests[GRPC_IOREQ_SEND_STATUS].data;
+ data = call->request_data[GRPC_IOREQ_SEND_STATUS];
gpr_ltoa(data.send_status.code, status_str);
send_metadata(
call,
@@ -547,12 +566,66 @@ static grpc_call_error start_ioreq_error(grpc_call *call,
size_t i;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (mutated_ops & (1 << i)) {
- call->requests[i].set = REQSET_EMPTY;
+ call->request_set[i] = REQSET_EMPTY;
}
}
return ret;
}
+static void finish_read_ops(grpc_call *call) {
+ int empty;
+
+ if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) {
+ empty =
+ (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message =
+ grpc_bbq_pop(&call->incoming_queue)));
+ if (!empty) {
+ finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ empty = grpc_bbq_empty(&call->incoming_queue);
+ }
+ } else {
+ empty = grpc_bbq_empty(&call->incoming_queue);
+ }
+
+ switch (call->read_state) {
+ case READ_STATE_STREAM_CLOSED:
+ if (empty) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+ }
+ /* fallthrough */
+ case READ_STATE_READ_CLOSED:
+ if (empty) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ }
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
+ /* fallthrough */
+ case READ_STATE_GOT_INITIAL_METADATA:
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
+ /* fallthrough */
+ case READ_STATE_INITIAL:
+ /* do nothing */
+ break;
+ }
+}
+
+static void early_out_write_ops(grpc_call *call) {
+ switch (call->write_state) {
+ case WRITE_STATE_WRITE_CLOSED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
+ /* fallthrough */
+ case WRITE_STATE_STARTED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_ERROR);
+ /* fallthrough */
+ case WRITE_STATE_INITIAL:
+ /* do nothing */
+ break;
+ }
+}
+
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs,
grpc_ioreq_completion_func completion,
@@ -560,7 +633,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t i;
gpr_uint32 have_ops = 0;
grpc_ioreq_op op;
- reqinfo *requests = call->requests;
reqinfo_master *master;
grpc_ioreq_data data;
gpr_uint8 set;
@@ -573,17 +645,17 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
for (i = 0; i < nreqs; i++) {
op = reqs[i].op;
- if (requests[op].set < GRPC_IOREQ_OP_COUNT) {
+ if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) {
return start_ioreq_error(call, have_ops,
GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
- } else if (requests[op].set == REQSET_DONE) {
+ } else if (call->request_set[op] == REQSET_DONE) {
return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
}
have_ops |= 1 << op;
data = reqs[i].data;
- requests[op].data = data;
- requests[op].set = set;
+ call->request_data[op] = data;
+ call->request_set[op] = set;
}
master = &call->masters[set];
@@ -593,83 +665,13 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
master->on_complete = completion;
master->user_data = user_data;
- for (i = 0; i < nreqs; i++) {
- op = reqs[i].op;
- data = reqs[i].data;
- switch (op) {
- case GRPC_IOREQ_OP_COUNT:
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- break;
- case GRPC_IOREQ_RECV_MESSAGE:
- *data.recv_message = grpc_bbq_pop(&call->incoming_queue);
- if (*data.recv_message) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- if (call->read_state == READ_STATE_STREAM_CLOSED && grpc_bbq_empty(&call->incoming_queue)) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
- }
- } else {
- /* no message: either end of stream or we need more bytes */
- if (call->read_state >= READ_STATE_READ_CLOSED) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- if (call->read_state == READ_STATE_STREAM_CLOSED) {
- /* stream closed AND we've drained all messages: signal to the application */
- finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
- }
- } else {
- call->need_more_data = 1;
- }
- }
- break;
- case GRPC_IOREQ_RECV_STATUS:
- if (call->read_state >= READ_STATE_READ_CLOSED) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
- }
- break;
- case GRPC_IOREQ_RECV_CLOSE:
- if (call->read_state == READ_STATE_STREAM_CLOSED) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
- }
- break;
- case GRPC_IOREQ_SEND_CLOSE:
- if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) {
- requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE;
- }
- if (call->read_state == READ_STATE_STREAM_CLOSED) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR);
- }
- break;
- case GRPC_IOREQ_SEND_MESSAGE:
- case GRPC_IOREQ_SEND_INITIAL_METADATA:
- case GRPC_IOREQ_SEND_TRAILING_METADATA:
- case GRPC_IOREQ_SEND_STATUS:
- if (call->read_state == READ_STATE_STREAM_CLOSED) {
- finish_ioreq_op(call, op, GRPC_OP_ERROR);
- }
- break;
- case GRPC_IOREQ_RECV_INITIAL_METADATA:
- data.recv_metadata->count = 0;
- if (call->buffered_initial_metadata.count > 0) {
- SWAP(grpc_metadata_array, *data.recv_metadata,
- call->buffered_initial_metadata);
- }
- if (call->read_state >= READ_STATE_GOT_INITIAL_METADATA) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
- }
- break;
- case GRPC_IOREQ_RECV_TRAILING_METADATA:
- data.recv_metadata->count = 0;
- if (call->buffered_trailing_metadata.count > 0) {
- SWAP(grpc_metadata_array, *data.recv_metadata,
- call->buffered_trailing_metadata);
- }
- if (call->read_state >= READ_STATE_READ_CLOSED) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
- }
- break;
- }
+ if (have_ops & (1 << GRPC_IOREQ_RECV_MESSAGE)) {
+ call->need_more_data = 1;
}
+ finish_read_ops(call);
+ early_out_write_ops(call);
+
return GRPC_CALL_OK;
}
@@ -774,34 +776,21 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-static void mark_read_closed(grpc_call *call) {
- call->read_state = READ_STATE_READ_CLOSED;
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
+static void set_read_state(grpc_call *call, read_state state) {
+ lock(call);
+ GPR_ASSERT(call->read_state < state);
+ call->read_state = state;
+ finish_read_ops(call);
+ unlock(call);
}
void grpc_call_read_closed(grpc_call_element *elem) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- GPR_ASSERT(call->read_state < READ_STATE_READ_CLOSED);
- mark_read_closed(call);
- unlock(call);
+ set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
}
void grpc_call_stream_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- GPR_ASSERT(call->read_state < READ_STATE_STREAM_CLOSED);
- if (call->read_state < READ_STATE_READ_CLOSED) {
- mark_read_closed(call);
- }
- call->read_state = READ_STATE_STREAM_CLOSED;
- if (grpc_bbq_empty(&call->incoming_queue)) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
- }
- unlock(call);
+ set_read_state(call, READ_STATE_STREAM_CLOSED);
grpc_call_internal_unref(call, 0);
}
@@ -815,7 +804,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
- status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
+ status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
@@ -832,13 +821,8 @@ void grpc_call_recv_message(grpc_call_element *elem,
grpc_byte_buffer *byte_buffer) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
- if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) {
- /* there's an outstanding read */
- *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- } else {
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
- }
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+ finish_read_ops(call);
unlock(call);
}
@@ -856,19 +840,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md);
} else {
- if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) {
- dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
- GRPC_IOREQ_OP_COUNT
- ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
- .data.recv_metadata
- : &call->buffered_initial_metadata;
- } else {
- dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set <
- GRPC_IOREQ_OP_COUNT
- ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
- .data.recv_metadata
- : &call->buffered_trailing_metadata;
- }
+ dest = &call->buffered_metadata[call->read_state >=
+ READ_STATE_GOT_INITIAL_METADATA];
if (dest->count == dest->capacity) {
dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
dest->metadata =
@@ -894,6 +867,11 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
+void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
+ grpc_call *call = grpc_call_from_top_element(surface_element);
+ set_read_state(call, READ_STATE_GOT_INITIAL_METADATA);
+}
+
/*
* LEGACY API IMPLEMENTATION
* All this code will disappear as soon as wrappings are updated
@@ -1097,16 +1075,6 @@ grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
return err;
}
-void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
- grpc_call *call = grpc_call_from_top_element(surface_element);
- lock(call);
- if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) {
- call->read_state = READ_STATE_GOT_INITIAL_METADATA;
- }
- finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
- unlock(call);
-}
-
static void finish_read_event(void *p, grpc_op_error error) {
if (p) grpc_byte_buffer_destroy(p);
}