diff options
-rw-r--r-- | src/core/surface/call.c | 387 |
1 files changed, 200 insertions, 187 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 96cdb9651b..4de453ffd0 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -48,26 +48,8 @@ #define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0) -typedef struct { - gpr_uint8 md_out_buffer; - size_t md_out_count[2]; - size_t md_out_capacity[2]; - grpc_metadata *md_out[2]; - grpc_byte_buffer *msg_out; - - /* input buffers */ - grpc_metadata_array initial_md_in; - grpc_metadata_array trailing_md_in; - - size_t details_capacity; - char *details; - grpc_status_code status; - - size_t msg_in_read_idx; - grpc_byte_buffer *msg_in; - - void *finished_tag; -} legacy_state; +typedef struct legacy_state legacy_state; +static void destroy_legacy_state(legacy_state *ls); typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; @@ -200,18 +182,10 @@ grpc_call *grpc_call_create(grpc_channel *channel, return call; } -legacy_state *get_legacy_state(grpc_call *call) { - if (call->legacy_state == NULL) { - call->legacy_state = gpr_malloc(sizeof(legacy_state)); - memset(call->legacy_state, 0, sizeof(legacy_state)); - } - return call->legacy_state; -} - void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } static void destroy_call(void *call, int ignored_success) { - size_t i, j; + size_t i; grpc_call *c = call; grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); grpc_channel_internal_unref(c->channel); @@ -228,16 +202,7 @@ static void destroy_call(void *call, int ignored_success) { gpr_free(c->buffered_initial_metadata.metadata); gpr_free(c->buffered_trailing_metadata.metadata); if (c->legacy_state) { - for (i = 0; i < 2; i++) { - for (j = 0; j < c->legacy_state->md_out_count[i]; j++) { - gpr_free(c->legacy_state->md_out[i][j].key); - gpr_free(c->legacy_state->md_out[i][j].value); - } - gpr_free(c->legacy_state->md_out[i]); - } - gpr_free(c->legacy_state->initial_md_in.metadata); - gpr_free(c->legacy_state->trailing_md_in.metadata); - gpr_free(c->legacy_state); + destroy_legacy_state(c->legacy_state); } gpr_free(c); } @@ -749,6 +714,202 @@ void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { elem->filter->call_op(elem, NULL, op); } +grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { + return CALL_FROM_TOP_ELEM(elem); +} + +static void call_alarm(void *arg, int success) { + grpc_call *call = arg; + if (success) { + if (call->is_client) { + grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, + "Deadline Exceeded"); + } else { + grpc_call_cancel(call); + } + } + grpc_call_internal_unref(call, 1); +} + +void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + + if (call->have_alarm) { + gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); + } + grpc_call_internal_ref(call); + call->have_alarm = 1; + grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); +} + +static void mark_read_closed(grpc_call *call) { + call->read_closed = 1; + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); +} + +void grpc_call_read_closed(grpc_call_element *elem) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + GPR_ASSERT(!call->read_closed); + mark_read_closed(call); + unlock(call); +} + +void grpc_call_stream_closed(grpc_call_element *elem) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + GPR_ASSERT(!call->stream_closed); + if (!call->read_closed) { + mark_read_closed(call); + } + call->stream_closed = 1; + if (grpc_bbq_empty(&call->incoming_queue)) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); + } + unlock(call); + grpc_call_internal_unref(call, 0); +} + +/* we offset status by a small amount when storing it into transport metadata + as metadata cannot store a 0 value (which is used as OK for grpc_status_codes + */ +#define STATUS_OFFSET 1 +static void destroy_status(void *ignored) {} + +static gpr_uint32 decode_status(grpc_mdelem *md) { + gpr_uint32 status; + void *user_data = grpc_mdelem_get_user_data(md, destroy_status); + if (user_data) { + status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; + } else { + if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), + GPR_SLICE_LENGTH(md->value->slice), + &status)) { + status = GRPC_STATUS_UNKNOWN; /* could not parse status code */ + } + grpc_mdelem_set_user_data(md, destroy_status, + (void *)(gpr_intptr)(status + STATUS_OFFSET)); + } + return status; +} + +void grpc_call_recv_message(grpc_call_element *elem, + grpc_byte_buffer *byte_buffer) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) { + /* there's an outstanding read */ + *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer; + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + } else { + grpc_bbq_push(&call->incoming_queue, byte_buffer); + } + unlock(call); +} + +void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + grpc_mdstr *key = md->key; + grpc_metadata_array *dest; + grpc_metadata *mdusr; + + lock(call); + if (key == grpc_channel_get_status_string(call->channel)) { + set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); + grpc_mdelem_unref(md); + } else if (key == grpc_channel_get_message_string(call->channel)) { + set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + grpc_mdelem_unref(md); + } else { + if (!call->got_initial_metadata) { + dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < + GRPC_IOREQ_OP_COUNT + ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] + .data.recv_metadata + : &call->buffered_initial_metadata; + } else { + dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < + GRPC_IOREQ_OP_COUNT + ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA] + .data.recv_metadata + : &call->buffered_trailing_metadata; + } + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = (char *)grpc_mdstr_as_c_string(md->key); + mdusr->value = (char *)grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + if (call->owned_metadata_count == call->owned_metadata_capacity) { + call->owned_metadata_capacity = GPR_MAX( + call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); + call->owned_metadata = + gpr_realloc(call->owned_metadata, + sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + } + call->owned_metadata[call->owned_metadata_count++] = md; + } + unlock(call); +} + +grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { + return CALL_STACK_FROM_CALL(call); +} + +/* + * LEGACY API IMPLEMENTATION + * All this code will disappear as soon as wrappings are updated + */ + +struct legacy_state { + gpr_uint8 md_out_buffer; + size_t md_out_count[2]; + size_t md_out_capacity[2]; + grpc_metadata *md_out[2]; + grpc_byte_buffer *msg_out; + + /* input buffers */ + grpc_metadata_array initial_md_in; + grpc_metadata_array trailing_md_in; + + size_t details_capacity; + char *details; + grpc_status_code status; + + size_t msg_in_read_idx; + grpc_byte_buffer *msg_in; + + void *finished_tag; +}; + +static legacy_state *get_legacy_state(grpc_call *call) { + if (call->legacy_state == NULL) { + call->legacy_state = gpr_malloc(sizeof(legacy_state)); + memset(call->legacy_state, 0, sizeof(legacy_state)); + } + return call->legacy_state; +} + +static void destroy_legacy_state(legacy_state *ls) { + size_t i, j; + for (i = 0; i < 2; i++) { + for (j = 0; j < ls->md_out_count[i]; j++) { + gpr_free(ls->md_out[i][j].key); + gpr_free(ls->md_out[i][j].value); + } + gpr_free(ls->md_out[i]); + } + gpr_free(ls->initial_md_in.metadata); + gpr_free(ls->trailing_md_in.metadata); + gpr_free(ls); +} + grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, gpr_uint32 flags) { legacy_state *ls; @@ -1008,151 +1169,3 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call, return err; } - -grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { - return CALL_FROM_TOP_ELEM(elem); -} - -static void call_alarm(void *arg, int success) { - grpc_call *call = arg; - if (success) { - if (call->is_client) { - grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, - "Deadline Exceeded"); - } else { - grpc_call_cancel(call); - } - } - grpc_call_internal_unref(call, 1); -} - -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - - if (call->have_alarm) { - gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); - } - grpc_call_internal_ref(call); - call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); -} - -static void mark_read_closed(grpc_call *call) { - call->read_closed = 1; - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); -} - -void grpc_call_read_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - GPR_ASSERT(!call->read_closed); - mark_read_closed(call); - unlock(call); -} - -void grpc_call_stream_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - GPR_ASSERT(!call->stream_closed); - if (!call->read_closed) { - mark_read_closed(call); - } - call->stream_closed = 1; - if (grpc_bbq_empty(&call->incoming_queue)) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); - } - unlock(call); - grpc_call_internal_unref(call, 0); -} - -/* we offset status by a small amount when storing it into transport metadata - as metadata cannot store a 0 value (which is used as OK for grpc_status_codes - */ -#define STATUS_OFFSET 1 -static void destroy_status(void *ignored) {} - -static gpr_uint32 decode_status(grpc_mdelem *md) { - gpr_uint32 status; - void *user_data = grpc_mdelem_get_user_data(md, destroy_status); - if (user_data) { - status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; - } else { - if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &status)) { - status = GRPC_STATUS_UNKNOWN; /* could not parse status code */ - } - grpc_mdelem_set_user_data(md, destroy_status, - (void *)(gpr_intptr)(status + STATUS_OFFSET)); - } - return status; -} - -void grpc_call_recv_message(grpc_call_element *elem, - grpc_byte_buffer *byte_buffer) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) { - /* there's an outstanding read */ - *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer; - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); - } else { - grpc_bbq_push(&call->incoming_queue, byte_buffer); - } - unlock(call); -} - -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdstr *key = md->key; - grpc_metadata_array *dest; - grpc_metadata *mdusr; - - lock(call); - if (key == grpc_channel_get_status_string(call->channel)) { - set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); - grpc_mdelem_unref(md); - } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); - } else { - if (!call->got_initial_metadata) { - dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < - GRPC_IOREQ_OP_COUNT - ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] - .data.recv_metadata - : &call->buffered_initial_metadata; - } else { - dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < - GRPC_IOREQ_OP_COUNT - ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA] - .data.recv_metadata - : &call->buffered_trailing_metadata; - } - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = (char *)grpc_mdstr_as_c_string(md->key); - mdusr->value = (char *)grpc_mdstr_as_c_string(md->value); - mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = GPR_MAX( - call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); - } - call->owned_metadata[call->owned_metadata_count++] = md; - } - unlock(call); -} - -grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { - return CALL_STACK_FROM_CALL(call); -} |