aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/surface/call.c387
1 files changed, 200 insertions, 187 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 96cdb9651b..4de453ffd0 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -48,26 +48,8 @@
#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0)
-typedef struct {
- gpr_uint8 md_out_buffer;
- size_t md_out_count[2];
- size_t md_out_capacity[2];
- grpc_metadata *md_out[2];
- grpc_byte_buffer *msg_out;
-
- /* input buffers */
- grpc_metadata_array initial_md_in;
- grpc_metadata_array trailing_md_in;
-
- size_t details_capacity;
- char *details;
- grpc_status_code status;
-
- size_t msg_in_read_idx;
- grpc_byte_buffer *msg_in;
-
- void *finished_tag;
-} legacy_state;
+typedef struct legacy_state legacy_state;
+static void destroy_legacy_state(legacy_state *ls);
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
@@ -200,18 +182,10 @@ grpc_call *grpc_call_create(grpc_channel *channel,
return call;
}
-legacy_state *get_legacy_state(grpc_call *call) {
- if (call->legacy_state == NULL) {
- call->legacy_state = gpr_malloc(sizeof(legacy_state));
- memset(call->legacy_state, 0, sizeof(legacy_state));
- }
- return call->legacy_state;
-}
-
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
static void destroy_call(void *call, int ignored_success) {
- size_t i, j;
+ size_t i;
grpc_call *c = call;
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
grpc_channel_internal_unref(c->channel);
@@ -228,16 +202,7 @@ static void destroy_call(void *call, int ignored_success) {
gpr_free(c->buffered_initial_metadata.metadata);
gpr_free(c->buffered_trailing_metadata.metadata);
if (c->legacy_state) {
- for (i = 0; i < 2; i++) {
- for (j = 0; j < c->legacy_state->md_out_count[i]; j++) {
- gpr_free(c->legacy_state->md_out[i][j].key);
- gpr_free(c->legacy_state->md_out[i][j].value);
- }
- gpr_free(c->legacy_state->md_out[i]);
- }
- gpr_free(c->legacy_state->initial_md_in.metadata);
- gpr_free(c->legacy_state->trailing_md_in.metadata);
- gpr_free(c->legacy_state);
+ destroy_legacy_state(c->legacy_state);
}
gpr_free(c);
}
@@ -749,6 +714,202 @@ void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
elem->filter->call_op(elem, NULL, op);
}
+grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
+ return CALL_FROM_TOP_ELEM(elem);
+}
+
+static void call_alarm(void *arg, int success) {
+ grpc_call *call = arg;
+ if (success) {
+ if (call->is_client) {
+ grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
+ "Deadline Exceeded");
+ } else {
+ grpc_call_cancel(call);
+ }
+ }
+ grpc_call_internal_unref(call, 1);
+}
+
+void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+
+ if (call->have_alarm) {
+ gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
+ }
+ grpc_call_internal_ref(call);
+ call->have_alarm = 1;
+ grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
+}
+
+static void mark_read_closed(grpc_call *call) {
+ call->read_closed = 1;
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
+}
+
+void grpc_call_read_closed(grpc_call_element *elem) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ GPR_ASSERT(!call->read_closed);
+ mark_read_closed(call);
+ unlock(call);
+}
+
+void grpc_call_stream_closed(grpc_call_element *elem) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ GPR_ASSERT(!call->stream_closed);
+ if (!call->read_closed) {
+ mark_read_closed(call);
+ }
+ call->stream_closed = 1;
+ if (grpc_bbq_empty(&call->incoming_queue)) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+ }
+ unlock(call);
+ grpc_call_internal_unref(call, 0);
+}
+
+/* we offset status by a small amount when storing it into transport metadata
+ as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
+ */
+#define STATUS_OFFSET 1
+static void destroy_status(void *ignored) {}
+
+static gpr_uint32 decode_status(grpc_mdelem *md) {
+ gpr_uint32 status;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ if (user_data) {
+ status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
+ } else {
+ if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
+ GPR_SLICE_LENGTH(md->value->slice),
+ &status)) {
+ status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
+ }
+ grpc_mdelem_set_user_data(md, destroy_status,
+ (void *)(gpr_intptr)(status + STATUS_OFFSET));
+ }
+ return status;
+}
+
+void grpc_call_recv_message(grpc_call_element *elem,
+ grpc_byte_buffer *byte_buffer) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) {
+ /* there's an outstanding read */
+ *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ } else {
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+ }
+ unlock(call);
+}
+
+void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ grpc_mdstr *key = md->key;
+ grpc_metadata_array *dest;
+ grpc_metadata *mdusr;
+
+ lock(call);
+ if (key == grpc_channel_get_status_string(call->channel)) {
+ set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
+ grpc_mdelem_unref(md);
+ } else if (key == grpc_channel_get_message_string(call->channel)) {
+ set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
+ grpc_mdelem_unref(md);
+ } else {
+ if (!call->got_initial_metadata) {
+ dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
+ GRPC_IOREQ_OP_COUNT
+ ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
+ .data.recv_metadata
+ : &call->buffered_initial_metadata;
+ } else {
+ dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set <
+ GRPC_IOREQ_OP_COUNT
+ ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
+ .data.recv_metadata
+ : &call->buffered_trailing_metadata;
+ }
+ if (dest->count == dest->capacity) {
+ dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
+ dest->metadata =
+ gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
+ }
+ mdusr = &dest->metadata[dest->count++];
+ mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
+ mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
+ mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
+ if (call->owned_metadata_count == call->owned_metadata_capacity) {
+ call->owned_metadata_capacity = GPR_MAX(
+ call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
+ call->owned_metadata =
+ gpr_realloc(call->owned_metadata,
+ sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
+ }
+ call->owned_metadata[call->owned_metadata_count++] = md;
+ }
+ unlock(call);
+}
+
+grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
+ return CALL_STACK_FROM_CALL(call);
+}
+
+/*
+ * LEGACY API IMPLEMENTATION
+ * All this code will disappear as soon as wrappings are updated
+ */
+
+struct legacy_state {
+ gpr_uint8 md_out_buffer;
+ size_t md_out_count[2];
+ size_t md_out_capacity[2];
+ grpc_metadata *md_out[2];
+ grpc_byte_buffer *msg_out;
+
+ /* input buffers */
+ grpc_metadata_array initial_md_in;
+ grpc_metadata_array trailing_md_in;
+
+ size_t details_capacity;
+ char *details;
+ grpc_status_code status;
+
+ size_t msg_in_read_idx;
+ grpc_byte_buffer *msg_in;
+
+ void *finished_tag;
+};
+
+static legacy_state *get_legacy_state(grpc_call *call) {
+ if (call->legacy_state == NULL) {
+ call->legacy_state = gpr_malloc(sizeof(legacy_state));
+ memset(call->legacy_state, 0, sizeof(legacy_state));
+ }
+ return call->legacy_state;
+}
+
+static void destroy_legacy_state(legacy_state *ls) {
+ size_t i, j;
+ for (i = 0; i < 2; i++) {
+ for (j = 0; j < ls->md_out_count[i]; j++) {
+ gpr_free(ls->md_out[i][j].key);
+ gpr_free(ls->md_out[i][j].value);
+ }
+ gpr_free(ls->md_out[i]);
+ }
+ gpr_free(ls->initial_md_in.metadata);
+ gpr_free(ls->trailing_md_in.metadata);
+ gpr_free(ls);
+}
+
grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
gpr_uint32 flags) {
legacy_state *ls;
@@ -1008,151 +1169,3 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call,
return err;
}
-
-grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
- return CALL_FROM_TOP_ELEM(elem);
-}
-
-static void call_alarm(void *arg, int success) {
- grpc_call *call = arg;
- if (success) {
- if (call->is_client) {
- grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
- "Deadline Exceeded");
- } else {
- grpc_call_cancel(call);
- }
- }
- grpc_call_internal_unref(call, 1);
-}
-
-void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-
- if (call->have_alarm) {
- gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
- }
- grpc_call_internal_ref(call);
- call->have_alarm = 1;
- grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
-}
-
-static void mark_read_closed(grpc_call *call) {
- call->read_closed = 1;
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
-}
-
-void grpc_call_read_closed(grpc_call_element *elem) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- GPR_ASSERT(!call->read_closed);
- mark_read_closed(call);
- unlock(call);
-}
-
-void grpc_call_stream_closed(grpc_call_element *elem) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- GPR_ASSERT(!call->stream_closed);
- if (!call->read_closed) {
- mark_read_closed(call);
- }
- call->stream_closed = 1;
- if (grpc_bbq_empty(&call->incoming_queue)) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
- }
- unlock(call);
- grpc_call_internal_unref(call, 0);
-}
-
-/* we offset status by a small amount when storing it into transport metadata
- as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
- */
-#define STATUS_OFFSET 1
-static void destroy_status(void *ignored) {}
-
-static gpr_uint32 decode_status(grpc_mdelem *md) {
- gpr_uint32 status;
- void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
- if (user_data) {
- status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
- } else {
- if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
- GPR_SLICE_LENGTH(md->value->slice),
- &status)) {
- status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
- }
- grpc_mdelem_set_user_data(md, destroy_status,
- (void *)(gpr_intptr)(status + STATUS_OFFSET));
- }
- return status;
-}
-
-void grpc_call_recv_message(grpc_call_element *elem,
- grpc_byte_buffer *byte_buffer) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) {
- /* there's an outstanding read */
- *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
- } else {
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
- }
- unlock(call);
-}
-
-void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- grpc_mdstr *key = md->key;
- grpc_metadata_array *dest;
- grpc_metadata *mdusr;
-
- lock(call);
- if (key == grpc_channel_get_status_string(call->channel)) {
- set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
- grpc_mdelem_unref(md);
- } else if (key == grpc_channel_get_message_string(call->channel)) {
- set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
- grpc_mdelem_unref(md);
- } else {
- if (!call->got_initial_metadata) {
- dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
- GRPC_IOREQ_OP_COUNT
- ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
- .data.recv_metadata
- : &call->buffered_initial_metadata;
- } else {
- dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set <
- GRPC_IOREQ_OP_COUNT
- ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
- .data.recv_metadata
- : &call->buffered_trailing_metadata;
- }
- if (dest->count == dest->capacity) {
- dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
- dest->metadata =
- gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
- }
- mdusr = &dest->metadata[dest->count++];
- mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
- mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
- mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
- if (call->owned_metadata_count == call->owned_metadata_capacity) {
- call->owned_metadata_capacity = GPR_MAX(
- call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
- call->owned_metadata =
- gpr_realloc(call->owned_metadata,
- sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
- }
- call->owned_metadata[call->owned_metadata_count++] = md;
- }
- unlock(call);
-}
-
-grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
- return CALL_STACK_FROM_CALL(call);
-}