diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 225 |
1 files changed, 153 insertions, 72 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dba63058b8..2949805622 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -33,7 +33,6 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" @@ -41,6 +40,7 @@ #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <assert.h> #include <stdio.h> #include <stdlib.h> @@ -68,8 +68,10 @@ typedef struct { } completed_request; /* See request_set in grpc_call below for a description */ -#define REQSET_EMPTY 255 -#define REQSET_DONE 254 +#define REQSET_EMPTY 'X' +#define REQSET_DONE 'Y' + +#define MAX_SEND_INITIAL_METADATA_COUNT 3 typedef struct { /* Overall status of the operation: starts OK, may degrade to @@ -92,6 +94,8 @@ typedef enum { /* Status came from the application layer overriding whatever the wire says */ STATUS_FROM_API_OVERRIDE = 0, + /* Status was created by some internal channel stack operation */ + STATUS_FROM_CORE, /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, @@ -204,12 +208,18 @@ struct grpc_call { /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; + grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT]; + grpc_linked_mdelem status_link; + grpc_linked_mdelem details_link; + size_t send_initial_metadata_count; + gpr_timespec send_deadline; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -226,9 +236,13 @@ struct grpc_call { static void do_nothing(void *ignored, grpc_op_error also_ignored) {} static send_action choose_send_action(grpc_call *call); static void enact_send_action(grpc_call *call, send_action sa); +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data) { + const void *server_transport_data, + grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, + gpr_timespec send_deadline) { size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = @@ -245,6 +259,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; } + GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); + for (i = 0; i < add_initial_metadata_count; i++) { + call->send_initial_metadata[i].md = add_initial_metadata[i]; + } + call->send_initial_metadata_count = add_initial_metadata_count; + call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); /* one ref is dropped in response to destroy, the other in @@ -252,6 +272,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_ref_init(&call->internal_refcount, 2); grpc_call_stack_init(channel_stack, server_transport_data, CALL_STACK_FROM_CALL(call)); + if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, send_deadline); + } return call; } @@ -284,6 +307,9 @@ static void destroy_call(void *call, int ignored_success) { for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { gpr_free(c->buffered_metadata[i].metadata); } + for (i = 0; i < c->send_initial_metadata_count; i++) { + grpc_mdelem_unref(c->send_initial_metadata[i].md); + } if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } @@ -342,6 +368,7 @@ static void request_more_data(grpc_call *call) { op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); } @@ -587,15 +614,29 @@ static send_action choose_send_action(grpc_call *call) { return SEND_NOTHING; } -static void send_metadata(grpc_call *call, grpc_mdelem *elem) { - grpc_call_op op; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = GRPC_WRITE_BUFFER_HINT; - op.data.metadata = elem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); +static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, + grpc_metadata *metadata) { + size_t i; + grpc_mdelem_list out; + if (count == 0) { + out.head = out.tail = NULL; + return out; + } + for (i = 0; i < count; i++) { + grpc_metadata *md = &metadata[i]; + grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; + grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); + l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length); + l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; + l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; + } + out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); + out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); + return out; } static void enact_send_action(grpc_call *call, send_action sa) { @@ -614,19 +655,21 @@ static void enact_send_action(grpc_call *call, send_action sa) { /* fallthrough */ case SEND_INITIAL_METADATA: data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); - } - op.type = GRPC_SEND_START; + op.type = GRPC_SEND_METADATA; op.dir = GRPC_CALL_DOWN; op.flags = flags; - op.data.start.pollset = grpc_cq_pollset(call->cq); + op.data.metadata.list = chain_metadata_from_app( + call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; + for (i = 0; i < call->send_initial_metadata_count; i++) { + grpc_metadata_batch_link_head(&op.data.metadata, + &call->send_initial_metadata[i]); + } + call->send_initial_metadata_count = 0; op.done_cb = finish_start_step; op.user_data = call; + op.bind_pollset = grpc_cq_pollset(call->cq); grpc_call_execute_op(call, &op); break; case SEND_BUFFERED_MESSAGE: @@ -640,37 +683,42 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.data.message = data.send_message; op.done_cb = finish_write_step; op.user_data = call; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); break; case SEND_TRAILING_METADATA_AND_FINISH: /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); - } + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.data.metadata.list = chain_metadata_from_app( + call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; + op.bind_pollset = NULL; /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; gpr_ltoa(data.send_status.code, status_str); - send_metadata( - call, + grpc_metadata_batch_add_tail( + &op.data.metadata, &call->status_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, status_str))); if (data.send_status.details) { - send_metadata( - call, + grpc_metadata_batch_add_tail( + &op.data.metadata, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, data.send_status.details))); } + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); /* fallthrough: see choose_send_action for details */ case SEND_FINISH: op.type = GRPC_SEND_FINISH; @@ -678,6 +726,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.flags = 0; op.done_cb = finish_finish_step; op.user_data = call; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); break; } @@ -831,6 +880,7 @@ grpc_call_error grpc_call_cancel(grpc_call *c) { op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; + op.bind_pollset = NULL; elem = CALL_ELEM_FROM_CALL(c, 0); elem->filter->call_op(elem, NULL, &op); @@ -875,9 +925,7 @@ static void call_alarm(void *arg, int success) { grpc_call_internal_unref(call, 1); } -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { if (call->have_alarm) { gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); } @@ -886,11 +934,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state(grpc_call *call, read_state state) { - lock(call); +static void set_read_state_locked(grpc_call *call, read_state state) { GPR_ASSERT(call->read_state < state); call->read_state = state; finish_read_ops(call); +} + +static void set_read_state(grpc_call *call, read_state state) { + lock(call); + set_read_state_locked(call, state); unlock(call); } @@ -914,7 +966,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -936,52 +988,81 @@ void grpc_call_recv_message(grpc_call_element *elem, unlock(call); } -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { +void grpc_call_recv_synthetic_status(grpc_call_element *elem, + grpc_status_code status, + const char *message) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdstr *key = md->key; + lock(call); + set_status_code(call, STATUS_FROM_CORE, status); + set_status_details(call, STATUS_FROM_CORE, + grpc_mdstr_from_string(call->metadata_context, message)); + unlock(call); +} + +int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; + int is_trailing; + grpc_mdctx *mdctx = call->metadata_context; lock(call); - if (key == grpc_channel_get_status_string(call->channel)) { - set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); - grpc_mdelem_unref(md); - } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); - } else { - dest = &call->buffered_metadata[call->read_state >= - READ_STATE_GOT_INITIAL_METADATA]; - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(md->key); - mdusr->value = grpc_mdstr_as_c_string(md->value); - mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = GPR_MAX( - call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; + for (l = md->list.head; l != NULL; l = l->next) { + grpc_mdelem *md = l->md; + grpc_mdstr *key = md->key; + if (key == grpc_channel_get_status_string(call->channel)) { + set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); + } else if (key == grpc_channel_get_message_string(call->channel)) { + set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + } else { + dest = &call->buffered_metadata[is_trailing]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(md->key); + mdusr->value = grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + if (call->owned_metadata_count == call->owned_metadata_capacity) { + call->owned_metadata_capacity = + GPR_MAX(call->owned_metadata_capacity + 8, + call->owned_metadata_capacity * 2); + call->owned_metadata = + gpr_realloc(call->owned_metadata, + sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + } + call->owned_metadata[call->owned_metadata_count++] = md; + l->md = 0; } - call->owned_metadata[call->owned_metadata_count++] = md; + } + if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, md->deadline); + } + if (!is_trailing) { + set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); } unlock(call); + + grpc_mdctx_lock(mdctx); + for (l = md->list.head; l; l = l->next) { + if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + for (l = md->garbage.head; l; l = l->next) { + grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + grpc_mdctx_unlock(mdctx); + + return !is_trailing; } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { - grpc_call *call = grpc_call_from_top_element(surface_element); - set_read_state(call, READ_STATE_GOT_INITIAL_METADATA); -} - /* * BATCH API IMPLEMENTATION */ |