diff options
-rw-r--r-- | src/core/surface/byte_buffer_queue.h | 2 | ||||
-rw-r--r-- | src/core/surface/call.c | 102 | ||||
-rw-r--r-- | src/core/surface/call.h | 2 |
3 files changed, 48 insertions, 58 deletions
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index ffd2616d9d..01cc06077e 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -49,5 +49,7 @@ typedef struct { } grpc_byte_buffer_queue; grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); +int grpc_bbq_empty(grpc_byte_buffer_queue *q); +void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); #endif /* __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 4f8ac6193a..e2e8fe23a5 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -58,7 +58,11 @@ typedef struct { /* input buffers */ grpc_metadata_array initial_md_in; grpc_metadata_array trailing_md_in; - grpc_recv_status status_in; + + size_t details_capacity; + char *details; + grpc_status_code status; + size_t msg_in_read_idx; grpc_byte_buffer *msg_in; @@ -770,9 +774,9 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, static void maybe_finish_legacy(grpc_call *call) { legacy_state *ls = get_legacy_state(call); - if (ls->got_status && ls->msg_in_read_idx == ls->msg_in.count) { + if (ls->got_status) { grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, - ls->status_in.status, ls->status_in.details, + ls->status, ls->details, ls->trailing_md_in.metadata, ls->trailing_md_in.count); } } @@ -811,7 +815,7 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status, grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, void *metadata_read_tag, void *finished_tag, gpr_uint32 flags) { - grpc_ioreq reqs[2]; + grpc_ioreq reqs[3]; legacy_state *ls; grpc_call_error err; @@ -840,7 +844,10 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; reqs[0].data.recv_metadata = &ls->trailing_md_in; reqs[1].op = GRPC_IOREQ_RECV_STATUS; - reqs[1].data.recv_status = &ls->status_in; + reqs[1].data.recv_status.details = &ls->details; + reqs[1].data.recv_status.details_capacity = &ls->details_capacity; + reqs[1].data.recv_status.code = &ls->status; + reqs[2].op = GRPC_IOREQ_RECV_CLOSE; err = start_ioreq(call, reqs, 2, finish_status, NULL); if (err != GRPC_CALL_OK) goto done; @@ -852,22 +859,28 @@ done: grpc_call_error grpc_call_server_accept(grpc_call *call, grpc_completion_queue *cq, void *finished_tag) { - grpc_ioreq req; + grpc_ioreq reqs[2]; grpc_call_error err; + legacy_state *ls; /* inform the completion queue of an incoming operation (corresponding to finished_tag) */ grpc_cq_begin_op(cq, call, GRPC_FINISHED); lock(call); + ls = get_legacy_state(call); + err = bind_cq(call, cq); if (err != GRPC_CALL_OK) return err; - get_legacy_state(call)->finished_tag = finished_tag; + ls->finished_tag = finished_tag; - req.op = GRPC_IOREQ_RECV_STATUS; - req.data.recv_status = &get_legacy_state(call)->status_in; - err = start_ioreq(call, &req, 1, finish_status, NULL); + reqs[0].op = GRPC_IOREQ_RECV_STATUS; + reqs[0].data.recv_status.details = NULL; + reqs[0].data.recv_status.details_capacity = 0; + reqs[0].data.recv_status.code = &ls->status; + reqs[1].op = GRPC_IOREQ_RECV_CLOSE; + err = start_ioreq(call, reqs, 2, finish_status, NULL); unlock(call); return err; } @@ -906,15 +919,11 @@ static void finish_read_event(void *p, grpc_op_error error) { static void finish_read(grpc_call *call, grpc_op_error error, void *tag) { legacy_state *ls; + grpc_byte_buffer *msg; lock(call); ls = get_legacy_state(call); - if (ls->msg_in.count == 0) { - grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL); - } else { - grpc_byte_buffer *msg = ls->msg_in.buffers[ls->msg_in_read_idx++]; - grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg); - maybe_finish_legacy(call); - } + msg = ls->msg_in; + grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg); unlock(call); } @@ -922,24 +931,14 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) { legacy_state *ls; grpc_ioreq req; grpc_call_error err; - grpc_byte_buffer *msg; grpc_cq_begin_op(call->cq, call, GRPC_READ); lock(call); ls = get_legacy_state(call); - - if (ls->msg_in_read_idx == ls->msg_in.count) { - ls->msg_in_read_idx = 0; - req.op = GRPC_IOREQ_RECV_MESSAGES; - req.data.recv_messages = &ls->msg_in; - err = start_ioreq(call, &req, 1, finish_read, tag); - } else { - err = GRPC_CALL_OK; - msg = ls->msg_in.buffers[ls->msg_in_read_idx++]; - grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg); - maybe_finish_legacy(call); - } + req.op = GRPC_IOREQ_RECV_MESSAGE; + req.data.recv_message = &ls->msg_in; + err = start_ioreq(call, &req, 1, finish_read, tag); unlock(call); return err; } @@ -963,9 +962,8 @@ grpc_call_error grpc_call_start_write(grpc_call *call, lock(call); ls = get_legacy_state(call); ls->msg_out = grpc_byte_buffer_copy(byte_buffer); - req.op = GRPC_IOREQ_SEND_MESSAGES; - req.data.send_messages.count = 1; - req.data.send_messages.messages = &ls->msg_out; + req.op = GRPC_IOREQ_SEND_MESSAGE; + req.data.send_message = ls->msg_out; err = start_ioreq(call, &req, 1, finish_write, tag); unlock(call); @@ -992,7 +990,7 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) { grpc_call_error grpc_call_start_write_status(grpc_call *call, grpc_status_code status, const char *details, void *tag) { - grpc_ioreq reqs[2]; + grpc_ioreq reqs[3]; grpc_call_error err; legacy_state *ls; grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); @@ -1003,8 +1001,9 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call, reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; reqs[1].op = GRPC_IOREQ_SEND_CLOSE; - reqs[1].data.send_close.status = status; - reqs[1].data.send_close.details = details; + reqs[1].data.send_status.code = status; + /* MEMLEAK */ + reqs[1].data.send_status.details = gpr_strdup(details); err = start_ioreq(call, reqs, 2, finish_finish, tag); unlock(call); @@ -1044,7 +1043,7 @@ void grpc_call_read_closed(grpc_call_element *elem) { lock(call); GPR_ASSERT(!call->read_closed); call->read_closed = 1; - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); unlock(call); @@ -1056,12 +1055,12 @@ void grpc_call_stream_closed(grpc_call_element *elem) { GPR_ASSERT(!call->stream_closed); if (!call->read_closed) { call->read_closed = 1; - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); } call->stream_closed = 1; - if (call->buffered_messages.count == 0) { + if (grpc_bbq_empty(&call->incoming_queue)) { finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); } unlock(call); @@ -1094,25 +1093,14 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *byte_buffer) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_byte_buffer_array *dest; lock(call); - if (call->requests[GRPC_IOREQ_RECV_MESSAGE].master != NULL) { - if (call->requests[GRPC_IOREQ_RECV_MESSAGE].state != REQ_READY) { - call->requests[GRPC_IOREQ_RECV_MESSAGE].status = GRPC_OP_ERROR; - } else { - *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer; - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - } + if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) { + /* there's an outstanding read */ + *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer; + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); } else { - dest = &call->buffered_messages; - } - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2); - dest->buffers = - gpr_realloc(dest->buffers, sizeof(grpc_byte_buffer *) * dest->capacity); + grpc_bbq_push(&call->incoming_queue, byte_buffer); } - dest->buffers[dest->count++] = byte_buffer; - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); unlock(call); } @@ -1131,13 +1119,13 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { grpc_mdelem_unref(md); } else { if (!call->got_initial_metadata) { - dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY + dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < GRPC_IOREQ_OP_COUNT ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] .data.recv_metadata : &call->buffered_initial_metadata; } else { dest = - call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].state == REQ_READY + call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < GRPC_IOREQ_OP_COUNT ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA] .data.recv_metadata : &call->buffered_trailing_metadata; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 45816a312e..936fb29f2e 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -44,7 +44,7 @@ typedef enum { GRPC_IOREQ_RECV_MESSAGE, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_IOREQ_RECV_STATUS, - GPRC_IOREQ_RECV_CLOSE, + GRPC_IOREQ_RECV_CLOSE, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_IOREQ_SEND_MESSAGE, GRPC_IOREQ_SEND_TRAILING_METADATA, |