aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-02-02 14:02:52 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-02-02 14:02:52 -0800
commit9cc6141f998c499c92053ad1148541e43d6641a9 (patch)
tree0a15ddb45540a04cc621720902d98e0c64c73bc3 /src/core/surface/call.c
parent39af15104d49dd3ba243105bb14c358e53c01b4b (diff)
clean up finish a little more
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c58
1 files changed, 26 insertions, 32 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index a2fcc45984..a8be3db728 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -554,18 +554,10 @@ static grpc_call_error start_ioreq_error(grpc_call *call,
return ret;
}
-static void dump_req_state(const char *debug, const char *stage, grpc_call *call) {
- size_t i;
- for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
- gpr_log(GPR_DEBUG, "%p:%s:%s:%d:%d", call, debug, stage, i, call->requests[i].set);
- }
-}
-
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs,
grpc_ioreq_completion_func completion,
- void *user_data,
- const char *debug) {
+ void *user_data) {
size_t i;
gpr_uint32 have_ops = 0;
grpc_ioreq_op op;
@@ -574,8 +566,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
grpc_ioreq_data data;
gpr_uint8 set;
- dump_req_state(debug, "before", call);
-
if (nreqs == 0) {
return GRPC_CALL_OK;
}
@@ -615,10 +605,20 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
*data.recv_message = grpc_bbq_pop(&call->incoming_queue);
if (*data.recv_message) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- } else if (call->stream_closed) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ if (call->stream_closed && grpc_bbq_empty(&call->incoming_queue)) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+ }
} else {
- call->need_more_data = 1;
+ /* no message: either end of stream or we need more bytes */
+ if (call->read_closed) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ if (call->stream_closed) {
+ /* stream closed AND we've drained all messages: signal to the application */
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+ }
+ } else {
+ call->need_more_data = 1;
+ }
}
break;
case GRPC_IOREQ_RECV_STATUS:
@@ -627,11 +627,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
finish_ioreq_op(call, op, GRPC_OP_OK);
}
break;
- case GRPC_IOREQ_SEND_MESSAGE:
- if (call->stream_closed) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
- }
- break;
case GRPC_IOREQ_SEND_CLOSE:
if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) {
requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE;
@@ -640,6 +635,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR);
}
break;
+ case GRPC_IOREQ_SEND_MESSAGE:
case GRPC_IOREQ_SEND_INITIAL_METADATA:
case GRPC_IOREQ_SEND_TRAILING_METADATA:
case GRPC_IOREQ_SEND_STATUS:
@@ -673,8 +669,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
}
}
- dump_req_state(debug, "after", call);
-
return GRPC_CALL_OK;
}
@@ -687,7 +681,7 @@ grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs, void *tag) {
grpc_call_error err;
lock(call);
- err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag, __FUNCTION__);
+ err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag);
unlock(call);
return err;
}
@@ -697,7 +691,7 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_ioreq_completion_func on_complete, void *user_data) {
grpc_call_error err;
lock(call);
- err = start_ioreq(call, reqs, nreqs, on_complete, user_data, __FUNCTION__);
+ err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
unlock(call);
return err;
}
@@ -833,12 +827,12 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
ls->md_out_buffer++;
- err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL, __FUNCTION__);
+ err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL);
if (err != GRPC_CALL_OK) goto done;
reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
reqs[0].data.recv_metadata = &ls->initial_md_in;
- err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag, __FUNCTION__);
+ err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag);
if (err != GRPC_CALL_OK) goto done;
reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
@@ -848,7 +842,7 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
reqs[1].data.recv_status.details_capacity = &ls->details_capacity;
reqs[1].data.recv_status.code = &ls->status;
reqs[2].op = GRPC_IOREQ_RECV_CLOSE;
- err = start_ioreq(call, reqs, 3, finish_status, NULL, __FUNCTION__);
+ err = start_ioreq(call, reqs, 3, finish_status, NULL);
if (err != GRPC_CALL_OK) goto done;
done:
@@ -880,7 +874,7 @@ grpc_call_error grpc_call_server_accept(grpc_call *call,
reqs[0].data.recv_status.details_capacity = 0;
reqs[0].data.recv_status.code = &ls->status;
reqs[1].op = GRPC_IOREQ_RECV_CLOSE;
- err = start_ioreq(call, reqs, 2, finish_status, NULL, __FUNCTION__);
+ err = start_ioreq(call, reqs, 2, finish_status, NULL);
unlock(call);
return err;
}
@@ -899,7 +893,7 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
- err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL, __FUNCTION__);
+ err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
unlock(call);
return err;
@@ -938,7 +932,7 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
ls = get_legacy_state(call);
req.op = GRPC_IOREQ_RECV_MESSAGE;
req.data.recv_message = &ls->msg_in;
- err = start_ioreq(call, &req, 1, finish_read, tag, __FUNCTION__);
+ err = start_ioreq(call, &req, 1, finish_read, tag);
unlock(call);
return err;
}
@@ -964,7 +958,7 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
req.op = GRPC_IOREQ_SEND_MESSAGE;
req.data.send_message = ls->msg_out;
- err = start_ioreq(call, &req, 1, finish_write, tag, __FUNCTION__);
+ err = start_ioreq(call, &req, 1, finish_write, tag);
unlock(call);
return err;
@@ -981,7 +975,7 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
lock(call);
req.op = GRPC_IOREQ_SEND_CLOSE;
- err = start_ioreq(call, &req, 1, finish_finish, tag, __FUNCTION__);
+ err = start_ioreq(call, &req, 1, finish_finish, tag);
unlock(call);
return err;
@@ -1005,7 +999,7 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call,
/* MEMLEAK */
reqs[1].data.send_status.details = gpr_strdup(details);
reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
- err = start_ioreq(call, reqs, 3, finish_finish, tag, __FUNCTION__);
+ err = start_ioreq(call, reqs, 3, finish_finish, tag);
unlock(call);
return err;