aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/connected_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r--src/core/channel/connected_channel.c72
1 files changed, 7 insertions, 65 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index f7fed7cae9..17abba06be 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -63,11 +63,6 @@ typedef struct connected_channel_call_data {
gpr_uint8 got_read_close;
gpr_slice_buffer incoming_message;
gpr_uint32 outgoing_buffer_length_estimate;
-
- grpc_linked_mdelem *incoming_metadata;
- size_t incoming_metadata_count;
- size_t incoming_metadata_capacity;
- gpr_timespec deadline;
} call_data;
/* We perform a small hack to locate transport data alongside the connected
@@ -120,26 +115,18 @@ static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
call_data *calld = elem->call_data;
- grpc_linked_mdelem *m;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_METADATA:
- for (m = op->data.metadata.list.head; m; m = m->next) {
- grpc_sopb_add_metadata(&calld->outgoing_sopb, m->md);
- }
- if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
- grpc_sopb_add_deadline(&calld->outgoing_sopb,
- op->data.metadata.deadline);
- }
+ grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
op->user_data);
break;
case GRPC_SEND_START:
grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
- grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
end_bufferable_op(op, chand, calld, 0);
break;
case GRPC_SEND_MESSAGE:
@@ -209,9 +196,6 @@ static void init_call_elem(grpc_call_element *elem,
calld->got_read_close = 0;
calld->outgoing_buffer_length_estimate = 0;
calld->max_message_length = chand->max_message_length;
- calld->incoming_metadata = NULL;
- calld->incoming_metadata_capacity = 0;
- calld->incoming_metadata_count = 0;
gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
@@ -328,50 +312,16 @@ static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_next_op(elem, &call_op);
}
-static void metadata_done_cb(void *ptr, grpc_op_error error) { gpr_free(ptr); }
-
-static void add_incoming_metadata(call_data *calld, grpc_mdelem *elem) {
- if (calld->incoming_metadata_count == calld->incoming_metadata_capacity) {
- calld->incoming_metadata_capacity =
- GPR_MAX(8, 2 * calld->incoming_metadata_capacity);
- calld->incoming_metadata = gpr_realloc(
- calld->incoming_metadata,
- sizeof(*calld->incoming_metadata) * calld->incoming_metadata_capacity);
- }
- calld->incoming_metadata[calld->incoming_metadata_count++].md = elem;
-}
-
-static void flush_metadata(grpc_call_element *elem) {
+static void got_metadata(grpc_call_element *elem, grpc_metadata_batch metadata) {
grpc_call_op op;
- call_data *calld = elem->call_data;
- size_t i;
-
- for (i = 1; i < calld->incoming_metadata_count; i++) {
- calld->incoming_metadata[i].prev = &calld->incoming_metadata[i - 1];
- }
- for (i = 0; i < calld->incoming_metadata_count - 1; i++) {
- calld->incoming_metadata[i].next = &calld->incoming_metadata[i + 1];
- }
-
- calld->incoming_metadata[0].prev =
- calld->incoming_metadata[calld->incoming_metadata_count - 1].next = NULL;
-
op.type = GRPC_RECV_METADATA;
op.dir = GRPC_CALL_UP;
op.flags = 0;
- op.data.metadata.list.head = &calld->incoming_metadata[0];
- op.data.metadata.list.tail =
- &calld->incoming_metadata[calld->incoming_metadata_count - 1];
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = calld->deadline;
- op.done_cb = metadata_done_cb;
- op.user_data = calld->incoming_metadata;
+ op.data.metadata = metadata;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
grpc_call_next_op(elem, &op);
-
- calld->incoming_metadata = NULL;
- calld->incoming_metadata_count = 0;
- calld->incoming_metadata_capacity = 0;
}
/* Handle incoming stream ops from the transport, translating them into
@@ -393,20 +343,12 @@ static void recv_batch(void *user_data, grpc_transport *transport,
stream_op = ops + i;
switch (stream_op->type) {
case GRPC_OP_FLOW_CTL_CB:
- gpr_log(GPR_ERROR,
- "should not receive flow control ops from transport");
- abort();
+ stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
break;
case GRPC_NO_OP:
break;
case GRPC_OP_METADATA:
- add_incoming_metadata(calld, stream_op->data.metadata);
- break;
- case GRPC_OP_DEADLINE:
- calld->deadline = stream_op->data.deadline;
- break;
- case GRPC_OP_METADATA_BOUNDARY:
- flush_metadata(elem);
+ got_metadata(elem, stream_op->data.metadata);
break;
case GRPC_OP_BEGIN_MESSAGE:
/* can't begin a message when we're still reading a message */