diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 198 |
1 files changed, 133 insertions, 65 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 46502fb6b1..8bb0779bf7 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -35,11 +35,11 @@ #include "src/core/channel/channel_stack.h" #include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" +#include "src/core/support/string.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/string.h> #include <stdio.h> #include <stdlib.h> @@ -173,11 +173,14 @@ struct grpc_call { /* protects variables in this section */ gpr_mu read_mu; + gpr_uint8 received_start; + gpr_uint8 start_ok; gpr_uint8 reads_done; gpr_uint8 received_finish; gpr_uint8 received_metadata; gpr_uint8 have_read; gpr_uint8 have_alarm; + gpr_uint8 pending_writes_done; gpr_uint8 got_status_code; /* The current outstanding read message tag (only valid if have_read == 1) */ void *read_tag; @@ -190,6 +193,8 @@ struct grpc_call { /* The current outstanding send message/context/invoke/end tag (only valid if have_write == 1) */ void *write_tag; + grpc_byte_buffer *pending_write; + gpr_uint32 pending_write_flags; /* The final status of the call */ grpc_status_code status_code; @@ -198,7 +203,7 @@ struct grpc_call { gpr_refcount internal_refcount; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -227,11 +232,15 @@ grpc_call *grpc_call_create(grpc_channel *channel, call->have_alarm = 0; call->received_metadata = 0; call->got_status_code = 0; + call->start_ok = 0; call->status_code = server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN; call->status_details = NULL; call->received_finish = 0; call->reads_done = 0; + call->received_start = 0; + call->pending_write = NULL; + call->pending_writes_done = 0; grpc_metadata_buffer_init(&call->incoming_metadata); gpr_ref_init(&call->internal_refcount, 1); grpc_call_stack_init(channel_stack, server_transport_data, @@ -360,16 +369,6 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, return GRPC_CALL_OK; } -static void done_invoke(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error); -} - static void finish_call(grpc_call *call) { size_t count; grpc_metadata *elements; @@ -384,11 +383,81 @@ static void finish_call(grpc_call *call) { elements, count); } -grpc_call_error grpc_call_start_invoke(grpc_call *call, - grpc_completion_queue *cq, - void *invoke_accepted_tag, - void *metadata_read_tag, - void *finished_tag, gpr_uint32 flags) { +static void done_write(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + void *tag = call->write_tag; + + GPR_ASSERT(call->have_write); + call->have_write = 0; + call->write_tag = INVALID_TAG; + grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error); +} + +static void done_writes_done(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + void *tag = call->write_tag; + + GPR_ASSERT(call->have_write); + call->have_write = 0; + call->write_tag = INVALID_TAG; + grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error); +} + +static void call_started(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + grpc_call_element *elem; + grpc_byte_buffer *pending_write = NULL; + gpr_uint32 pending_write_flags = 0; + gpr_uint8 pending_writes_done = 0; + int ok; + grpc_call_op op; + + gpr_mu_lock(&call->read_mu); + GPR_ASSERT(!call->received_start); + call->received_start = 1; + ok = call->start_ok = (error == GRPC_OP_OK); + pending_write = call->pending_write; + pending_write_flags = call->pending_write_flags; + pending_writes_done = call->pending_writes_done; + gpr_mu_unlock(&call->read_mu); + + if (pending_write) { + if (ok) { + op.type = GRPC_SEND_MESSAGE; + op.dir = GRPC_CALL_DOWN; + op.flags = pending_write_flags; + op.done_cb = done_write; + op.user_data = call; + op.data.message = pending_write; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, &op); + } else { + done_write(call, error); + } + grpc_byte_buffer_destroy(pending_write); + } + if (pending_writes_done) { + if (ok) { + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = done_writes_done; + op.user_data = call; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, &op); + } else { + done_writes_done(call, error); + } + } + + grpc_call_internal_unref(call); +} + +grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, + void *metadata_read_tag, void *finished_tag, + gpr_uint32 flags) { grpc_call_element *elem; grpc_call_op op; @@ -420,7 +489,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call, /* inform the completion queue of an incoming operation */ grpc_cq_begin_op(cq, call, GRPC_FINISHED); grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); - grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED); gpr_mu_lock(&call->read_mu); @@ -431,8 +499,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call, if (call->received_finish) { /* handle early cancellation */ - grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL, - GRPC_OP_ERROR); grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL, NULL, 0, NULL); finish_call(call); @@ -442,20 +508,18 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call, return GRPC_CALL_OK; } - call->write_tag = invoke_accepted_tag; call->metadata_tag = metadata_read_tag; - call->have_write = 1; - gpr_mu_unlock(&call->read_mu); /* call down the filter stack */ op.type = GRPC_SEND_START; op.dir = GRPC_CALL_DOWN; op.flags = flags; - op.done_cb = done_invoke; + op.done_cb = call_started; op.data.start.pollset = grpc_cq_pollset(cq); op.user_data = call; + grpc_call_internal_ref(call); elem = CALL_ELEM_FROM_CALL(call, 0); elem->filter->call_op(elem, NULL, &op); @@ -486,6 +550,7 @@ grpc_call_error grpc_call_server_accept(grpc_call *call, call->state = CALL_BOUNDCQ; call->cq = cq; call->finished_tag = finished_tag; + call->received_start = 1; if (prq_is_empty(&call->prq) && call->received_finish) { finish_call(call); @@ -535,26 +600,6 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, return GRPC_CALL_OK; } -static void done_writes_done(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error); -} - -static void done_write(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error); -} - void grpc_call_client_initial_metadata_complete( grpc_call_element *surface_element) { grpc_call *call = grpc_call_from_top_element(surface_element); @@ -617,7 +662,7 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) { } else { call->read_tag = tag; call->have_read = 1; - request_more = 1; + request_more = call->received_start; } } else if (prq_is_empty(&call->prq) && call->received_finish) { finish_call(call); @@ -654,8 +699,6 @@ grpc_call_error grpc_call_start_write(grpc_call *call, grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); - /* for now we do no buffering, so a NULL byte_buffer can have no impact - on our behavior -- succeed immediately */ /* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a flush, and that flush should be propogated down from here */ if (byte_buffer == NULL) { @@ -666,15 +709,25 @@ grpc_call_error grpc_call_start_write(grpc_call *call, call->write_tag = tag; call->have_write = 1; - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = done_write; - op.user_data = call; - op.data.message = byte_buffer; + gpr_mu_lock(&call->read_mu); + if (!call->received_start) { + call->pending_write = grpc_byte_buffer_copy(byte_buffer); + call->pending_write_flags = flags; - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + gpr_mu_unlock(&call->read_mu); + } else { + gpr_mu_unlock(&call->read_mu); + + op.type = GRPC_SEND_MESSAGE; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.done_cb = done_write; + op.user_data = call; + op.data.message = byte_buffer; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, &op); + } return GRPC_CALL_OK; } @@ -706,14 +759,23 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) { call->write_tag = tag; call->have_write = 1; - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = done_writes_done; - op.user_data = call; + gpr_mu_lock(&call->read_mu); + if (!call->received_start) { + call->pending_writes_done = 1; - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + gpr_mu_unlock(&call->read_mu); + } else { + gpr_mu_unlock(&call->read_mu); + + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = done_writes_done; + op.user_data = call; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, &op); + } return GRPC_CALL_OK; } @@ -760,8 +822,8 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call, /* always send status */ { grpc_mdelem *md; - char buffer[32]; - sprintf(buffer, "%d", status); + char buffer[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(status, buffer); md = grpc_mdelem_from_strings(call->metadata_context, "grpc-status", buffer); @@ -801,7 +863,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -818,6 +880,7 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_mdelem *md = op->data.metadata; grpc_mdstr *key = md->key; + if (key == grpc_channel_get_status_string(call->channel)) { maybe_set_status_code(call, decode_status(md)); grpc_mdelem_unref(md); @@ -917,3 +980,8 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { call->have_alarm = 1; grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } + +grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { + return CALL_STACK_FROM_CALL(call); +} + |