aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-02-02 16:15:53 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-02-02 16:15:53 -0800
commitdaceea867002a097e64e869837ac5348ac498bcc (patch)
treeb3c1c495d62d97b7a68d5176ccbf93a66f048797 /src/core/surface/call.c
parentc18c56e40c23586c0f87eac96973053bf725c8c9 (diff)
Add some documentation, simplify state
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c77
1 files changed, 50 insertions, 27 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index dfee93eb7d..da966c874a 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -77,7 +77,7 @@ typedef struct {
These structures are manipulated in sets, where a set is a set of
operations begin with the same call to start_ioreq and the various
public and private api's that call it. Each set has a master reqinfo
- in which we set a few additional fields. */
+ in which we set a few additional fields - see reqinfo_master. */
typedef struct {
/* User supplied parameters */
grpc_ioreq_data data;
@@ -93,25 +93,51 @@ typedef struct {
} reqinfo;
typedef struct {
+ /* Overall status of the operation: starts OK, may degrade to
+ non-OK */
grpc_op_error status;
+ /* Completion function to call at the end of the operation */
grpc_ioreq_completion_func on_complete;
void *user_data;
+ /* a bit mask of which request ops are needed (1 << opid) */
gpr_uint32 need_mask;
+ /* a bit mask of which request ops are now completed */
gpr_uint32 complete_mask;
} reqinfo_master;
+/* Status data for a request can come from several sources; this
+ enumerates them all, and acts as a priority sorting for which
+ status to return to the application - earlier entries override
+ later ones */
typedef enum {
+ /* Status came from the application layer overriding whatever
+ the wire says */
STATUS_FROM_API_OVERRIDE = 0,
+ /* Status came from 'the wire' - or somewhere below the surface
+ layer */
STATUS_FROM_WIRE,
STATUS_SOURCE_COUNT
} status_source;
typedef struct {
- gpr_uint8 set;
+ gpr_uint8 is_set;
grpc_status_code code;
grpc_mdstr *details;
} received_status;
+/* How far through the GRPC stream have we read? */
+typedef enum {
+ /* We are still waiting for initial metadata to complete */
+ READ_STATE_INITIAL,
+ /* We have gotten initial metadata, and are reading either
+ messages or trailing metadata */
+ READ_STATE_GOT_INITIAL_METADATA,
+ /* The stream is closed for reading */
+ READ_STATE_READ_CLOSED,
+ /* The stream is closed for reading & writing */
+ READ_STATE_STREAM_CLOSED
+} read_state;
+
struct grpc_call {
grpc_completion_queue *cq;
grpc_channel *channel;
@@ -120,10 +146,8 @@ struct grpc_call {
gpr_mu mu;
gpr_uint8 is_client;
- gpr_uint8 got_initial_metadata;
+ read_state read_state;
gpr_uint8 have_alarm;
- gpr_uint8 read_closed;
- gpr_uint8 stream_closed;
gpr_uint8 sending;
gpr_uint8 num_completed_requests;
gpr_uint8 need_more_data;
@@ -229,7 +253,7 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
- call->status[source].set = 1;
+ call->status[source].is_set = 1;
call->status[source].code = status;
}
@@ -308,7 +332,7 @@ static void unlock(grpc_call *call) {
static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].set) {
+ if (call->status[i].is_set) {
*args.code = call->status[i].code;
if (!args.details) return;
if (call->status[i].details) {
@@ -581,14 +605,14 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
*data.recv_message = grpc_bbq_pop(&call->incoming_queue);
if (*data.recv_message) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- if (call->stream_closed && grpc_bbq_empty(&call->incoming_queue)) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED && grpc_bbq_empty(&call->incoming_queue)) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
}
} else {
/* no message: either end of stream or we need more bytes */
- if (call->read_closed) {
+ if (call->read_state >= READ_STATE_READ_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- if (call->stream_closed) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED) {
/* stream closed AND we've drained all messages: signal to the application */
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
}
@@ -598,12 +622,12 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
}
break;
case GRPC_IOREQ_RECV_STATUS:
- if (call->read_closed) {
+ if (call->read_state >= READ_STATE_READ_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
}
break;
case GRPC_IOREQ_RECV_CLOSE:
- if (call->stream_closed) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
}
break;
@@ -611,7 +635,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) {
requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE;
}
- if (call->stream_closed) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR);
}
break;
@@ -619,7 +643,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
case GRPC_IOREQ_SEND_INITIAL_METADATA:
case GRPC_IOREQ_SEND_TRAILING_METADATA:
case GRPC_IOREQ_SEND_STATUS:
- if (call->stream_closed) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED) {
finish_ioreq_op(call, op, GRPC_OP_ERROR);
}
break;
@@ -629,11 +653,8 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
SWAP(grpc_metadata_array, *data.recv_metadata,
call->buffered_initial_metadata);
}
- if (call->got_initial_metadata) {
+ if (call->read_state >= READ_STATE_GOT_INITIAL_METADATA) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
- } else if (call->stream_closed) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA,
- GRPC_OP_ERROR);
}
break;
case GRPC_IOREQ_RECV_TRAILING_METADATA:
@@ -642,7 +663,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
SWAP(grpc_metadata_array, *data.recv_metadata,
call->buffered_trailing_metadata);
}
- if (call->read_closed) {
+ if (call->read_state >= READ_STATE_READ_CLOSED) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
}
break;
@@ -683,7 +704,7 @@ void grpc_call_destroy(grpc_call *c) {
grpc_alarm_cancel(&c->alarm);
c->have_alarm = 0;
}
- cancel = !c->stream_closed;
+ cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c);
if (cancel) grpc_call_cancel(c);
grpc_call_internal_unref(c, 1);
@@ -754,7 +775,7 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
}
static void mark_read_closed(grpc_call *call) {
- call->read_closed = 1;
+ call->read_state = READ_STATE_READ_CLOSED;
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
@@ -764,7 +785,7 @@ static void mark_read_closed(grpc_call *call) {
void grpc_call_read_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
- GPR_ASSERT(!call->read_closed);
+ GPR_ASSERT(call->read_state < READ_STATE_READ_CLOSED);
mark_read_closed(call);
unlock(call);
}
@@ -772,11 +793,11 @@ void grpc_call_read_closed(grpc_call_element *elem) {
void grpc_call_stream_closed(grpc_call_element *elem) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
lock(call);
- GPR_ASSERT(!call->stream_closed);
- if (!call->read_closed) {
+ GPR_ASSERT(call->read_state < READ_STATE_STREAM_CLOSED);
+ if (call->read_state < READ_STATE_READ_CLOSED) {
mark_read_closed(call);
}
- call->stream_closed = 1;
+ call->read_state = READ_STATE_STREAM_CLOSED;
if (grpc_bbq_empty(&call->incoming_queue)) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
}
@@ -835,7 +856,7 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md);
} else {
- if (!call->got_initial_metadata) {
+ if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) {
dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
@@ -1079,7 +1100,9 @@ grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element);
lock(call);
- call->got_initial_metadata = 1;
+ if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) {
+ call->read_state = READ_STATE_GOT_INITIAL_METADATA;
+ }
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
unlock(call);
}