aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/surface/byte_buffer_queue.h2
-rw-r--r--src/core/surface/call.c102
-rw-r--r--src/core/surface/call.h2
3 files changed, 48 insertions, 58 deletions
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index ffd2616d9d..01cc06077e 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -49,5 +49,7 @@ typedef struct {
} grpc_byte_buffer_queue;
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
+int grpc_bbq_empty(grpc_byte_buffer_queue *q);
+void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
#endif /* __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4f8ac6193a..e2e8fe23a5 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -58,7 +58,11 @@ typedef struct {
/* input buffers */
grpc_metadata_array initial_md_in;
grpc_metadata_array trailing_md_in;
- grpc_recv_status status_in;
+
+ size_t details_capacity;
+ char *details;
+ grpc_status_code status;
+
size_t msg_in_read_idx;
grpc_byte_buffer *msg_in;
@@ -770,9 +774,9 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
static void maybe_finish_legacy(grpc_call *call) {
legacy_state *ls = get_legacy_state(call);
- if (ls->got_status && ls->msg_in_read_idx == ls->msg_in.count) {
+ if (ls->got_status) {
grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
- ls->status_in.status, ls->status_in.details,
+ ls->status, ls->details,
ls->trailing_md_in.metadata, ls->trailing_md_in.count);
}
}
@@ -811,7 +815,7 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status,
grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag, void *finished_tag,
gpr_uint32 flags) {
- grpc_ioreq reqs[2];
+ grpc_ioreq reqs[3];
legacy_state *ls;
grpc_call_error err;
@@ -840,7 +844,10 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
reqs[0].data.recv_metadata = &ls->trailing_md_in;
reqs[1].op = GRPC_IOREQ_RECV_STATUS;
- reqs[1].data.recv_status = &ls->status_in;
+ reqs[1].data.recv_status.details = &ls->details;
+ reqs[1].data.recv_status.details_capacity = &ls->details_capacity;
+ reqs[1].data.recv_status.code = &ls->status;
+ reqs[2].op = GRPC_IOREQ_RECV_CLOSE;
err = start_ioreq(call, reqs, 2, finish_status, NULL);
if (err != GRPC_CALL_OK) goto done;
@@ -852,22 +859,28 @@ done:
grpc_call_error grpc_call_server_accept(grpc_call *call,
grpc_completion_queue *cq,
void *finished_tag) {
- grpc_ioreq req;
+ grpc_ioreq reqs[2];
grpc_call_error err;
+ legacy_state *ls;
/* inform the completion queue of an incoming operation (corresponding to
finished_tag) */
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
lock(call);
+ ls = get_legacy_state(call);
+
err = bind_cq(call, cq);
if (err != GRPC_CALL_OK) return err;
- get_legacy_state(call)->finished_tag = finished_tag;
+ ls->finished_tag = finished_tag;
- req.op = GRPC_IOREQ_RECV_STATUS;
- req.data.recv_status = &get_legacy_state(call)->status_in;
- err = start_ioreq(call, &req, 1, finish_status, NULL);
+ reqs[0].op = GRPC_IOREQ_RECV_STATUS;
+ reqs[0].data.recv_status.details = NULL;
+ reqs[0].data.recv_status.details_capacity = 0;
+ reqs[0].data.recv_status.code = &ls->status;
+ reqs[1].op = GRPC_IOREQ_RECV_CLOSE;
+ err = start_ioreq(call, reqs, 2, finish_status, NULL);
unlock(call);
return err;
}
@@ -906,15 +919,11 @@ static void finish_read_event(void *p, grpc_op_error error) {
static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
legacy_state *ls;
+ grpc_byte_buffer *msg;
lock(call);
ls = get_legacy_state(call);
- if (ls->msg_in.count == 0) {
- grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL);
- } else {
- grpc_byte_buffer *msg = ls->msg_in.buffers[ls->msg_in_read_idx++];
- grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
- maybe_finish_legacy(call);
- }
+ msg = ls->msg_in;
+ grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
unlock(call);
}
@@ -922,24 +931,14 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
legacy_state *ls;
grpc_ioreq req;
grpc_call_error err;
- grpc_byte_buffer *msg;
grpc_cq_begin_op(call->cq, call, GRPC_READ);
lock(call);
ls = get_legacy_state(call);
-
- if (ls->msg_in_read_idx == ls->msg_in.count) {
- ls->msg_in_read_idx = 0;
- req.op = GRPC_IOREQ_RECV_MESSAGES;
- req.data.recv_messages = &ls->msg_in;
- err = start_ioreq(call, &req, 1, finish_read, tag);
- } else {
- err = GRPC_CALL_OK;
- msg = ls->msg_in.buffers[ls->msg_in_read_idx++];
- grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
- maybe_finish_legacy(call);
- }
+ req.op = GRPC_IOREQ_RECV_MESSAGE;
+ req.data.recv_message = &ls->msg_in;
+ err = start_ioreq(call, &req, 1, finish_read, tag);
unlock(call);
return err;
}
@@ -963,9 +962,8 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
lock(call);
ls = get_legacy_state(call);
ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
- req.op = GRPC_IOREQ_SEND_MESSAGES;
- req.data.send_messages.count = 1;
- req.data.send_messages.messages = &ls->msg_out;
+ req.op = GRPC_IOREQ_SEND_MESSAGE;
+ req.data.send_message = ls->msg_out;
err = start_ioreq(call, &req, 1, finish_write, tag);
unlock(call);
@@ -992,7 +990,7 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
grpc_call_error grpc_call_start_write_status(grpc_call *call,
grpc_status_code status,
const char *details, void *tag) {
- grpc_ioreq reqs[2];
+ grpc_ioreq reqs[3];
grpc_call_error err;
legacy_state *ls;
grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
@@ -1003,8 +1001,9 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call,
reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
reqs[1].op = GRPC_IOREQ_SEND_CLOSE;
- reqs[1].data.send_close.status = status;
- reqs[1].data.send_close.details = details;
+ reqs[1].data.send_status.code = status;
+ /* MEMLEAK */
+ reqs[1].data.send_status.details = gpr_strdup(details);
err = start_ioreq(call, reqs, 2, finish_finish, tag);
unlock(call);
@@ -1044,7 +1043,7 @@ void grpc_call_read_closed(grpc_call_element *elem) {
lock(call);
GPR_ASSERT(!call->read_closed);
call->read_closed = 1;
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
unlock(call);
@@ -1056,12 +1055,12 @@ void grpc_call_stream_closed(grpc_call_element *elem) {
GPR_ASSERT(!call->stream_closed);
if (!call->read_closed) {
call->read_closed = 1;
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
}
call->stream_closed = 1;
- if (call->buffered_messages.count == 0) {
+ if (grpc_bbq_empty(&call->incoming_queue)) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
}
unlock(call);
@@ -1094,25 +1093,14 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
void grpc_call_recv_message(grpc_call_element *elem,
grpc_byte_buffer *byte_buffer) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- grpc_byte_buffer_array *dest;
lock(call);
- if (call->requests[GRPC_IOREQ_RECV_MESSAGE].master != NULL) {
- if (call->requests[GRPC_IOREQ_RECV_MESSAGE].state != REQ_READY) {
- call->requests[GRPC_IOREQ_RECV_MESSAGE].status = GRPC_OP_ERROR;
- } else {
- *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- }
+ if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) {
+ /* there's an outstanding read */
+ *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
} else {
- dest = &call->buffered_messages;
- }
- if (dest->count == dest->capacity) {
- dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2);
- dest->buffers =
- gpr_realloc(dest->buffers, sizeof(grpc_byte_buffer *) * dest->capacity);
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
- dest->buffers[dest->count++] = byte_buffer;
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
unlock(call);
}
@@ -1131,13 +1119,13 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
grpc_mdelem_unref(md);
} else {
if (!call->got_initial_metadata) {
- dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY
+ dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
.data.recv_metadata
: &call->buffered_initial_metadata;
} else {
dest =
- call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].state == REQ_READY
+ call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
.data.recv_metadata
: &call->buffered_trailing_metadata;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 45816a312e..936fb29f2e 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -44,7 +44,7 @@ typedef enum {
GRPC_IOREQ_RECV_MESSAGE,
GRPC_IOREQ_RECV_TRAILING_METADATA,
GRPC_IOREQ_RECV_STATUS,
- GPRC_IOREQ_RECV_CLOSE,
+ GRPC_IOREQ_RECV_CLOSE,
GRPC_IOREQ_SEND_INITIAL_METADATA,
GRPC_IOREQ_SEND_MESSAGE,
GRPC_IOREQ_SEND_TRAILING_METADATA,