aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/connected_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r--src/core/channel/connected_channel.c92
1 files changed, 61 insertions, 31 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 62611e08f3..519f05cb66 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -60,10 +60,14 @@ typedef struct connected_channel_call_data {
gpr_uint32 max_message_length;
gpr_uint32 incoming_message_length;
gpr_uint8 reading_message;
- gpr_uint8 got_metadata_boundary;
gpr_uint8 got_read_close;
gpr_slice_buffer incoming_message;
gpr_uint32 outgoing_buffer_length_estimate;
+
+ grpc_linked_mdelem *incoming_metadata;
+ size_t incoming_metadata_count;
+ size_t incoming_metadata_capacity;
+ gpr_timespec deadline;
} call_data;
/* We perform a small hack to locate transport data alongside the connected
@@ -116,18 +120,19 @@ static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
call_data *calld = elem->call_data;
+ grpc_linked_mdelem *m;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_METADATA:
- grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
- op->user_data);
- break;
- case GRPC_SEND_DEADLINE:
- grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline);
+ for (m = op->data.metadata.list.head; m; m = m->next) {
+ grpc_sopb_add_metadata(&calld->outgoing_sopb, m->md);
+ }
+ if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
+ grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.metadata.deadline);
+ }
grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
op->user_data);
break;
@@ -200,10 +205,12 @@ static void init_call_elem(grpc_call_element *elem,
grpc_sopb_init(&calld->outgoing_sopb);
calld->reading_message = 0;
- calld->got_metadata_boundary = 0;
calld->got_read_close = 0;
calld->outgoing_buffer_length_estimate = 0;
calld->max_message_length = chand->max_message_length;
+ calld->incoming_metadata = NULL;
+ calld->incoming_metadata_capacity = 0;
+ calld->incoming_metadata_count = 0;
gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
@@ -320,6 +327,49 @@ static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_next_op(elem, &call_op);
}
+static void metadata_done_cb(void *ptr, grpc_op_error error) {
+ gpr_free(ptr);
+}
+
+static void add_incoming_metadata(call_data *calld, grpc_mdelem *elem) {
+ if (calld->incoming_metadata_count == calld->incoming_metadata_capacity) {
+ calld->incoming_metadata_capacity = GPR_MAX(8, 2 * calld->incoming_metadata_capacity);
+ calld->incoming_metadata = gpr_realloc(calld->incoming_metadata, sizeof(*calld->incoming_metadata) * calld->incoming_metadata_capacity);
+ }
+ calld->incoming_metadata[calld->incoming_metadata_count++].md = elem;
+}
+
+static void flush_metadata(grpc_call_element *elem) {
+ grpc_call_op op;
+ call_data *calld = elem->call_data;
+ size_t i;
+
+ for (i = 1; i < calld->incoming_metadata_count; i++) {
+ calld->incoming_metadata[i].prev = &calld->incoming_metadata[i-1];
+ }
+ for (i = 0; i < calld->incoming_metadata_count - 1; i++) {
+ calld->incoming_metadata[i].next = &calld->incoming_metadata[i+1];
+ }
+
+ calld->incoming_metadata[0].prev = calld->incoming_metadata[calld->incoming_metadata_count-1].next = NULL;
+
+ op.type = GRPC_RECV_METADATA;
+ op.dir = GRPC_CALL_UP;
+ op.flags = 0;
+ op.data.metadata.list.head = &calld->incoming_metadata[0];
+ op.data.metadata.list.tail = &calld->incoming_metadata[calld->incoming_metadata_count - 1];
+ op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
+ op.data.metadata.deadline = calld->deadline;
+ op.done_cb = metadata_done_cb;
+ op.user_data = calld->incoming_metadata;
+
+ grpc_call_next_op(elem, &op);
+
+ calld->incoming_metadata = NULL;
+ calld->incoming_metadata_count = 0;
+ calld->incoming_metadata_capacity = 0;
+}
+
/* Handle incoming stream ops from the transport, translating them into
call_ops to pass up the call stack */
static void recv_batch(void *user_data, grpc_transport *transport,
@@ -346,33 +396,13 @@ static void recv_batch(void *user_data, grpc_transport *transport,
case GRPC_NO_OP:
break;
case GRPC_OP_METADATA:
- call_op.type = GRPC_RECV_METADATA;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.data.metadata = stream_op->data.metadata;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
+ add_incoming_metadata(calld, stream_op->data.metadata);
break;
case GRPC_OP_DEADLINE:
- call_op.type = GRPC_RECV_DEADLINE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.data.deadline = stream_op->data.deadline;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
+ calld->deadline = stream_op->data.deadline;
break;
case GRPC_OP_METADATA_BOUNDARY:
- if (!calld->got_metadata_boundary) {
- calld->got_metadata_boundary = 1;
- call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
+ flush_metadata(elem);
break;
case GRPC_OP_BEGIN_MESSAGE:
/* can't begin a message when we're still reading a message */