aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/call_op_string.c51
-rw-r--r--src/core/channel/census_filter.c24
-rw-r--r--src/core/channel/channel_stack.c57
-rw-r--r--src/core/channel/channel_stack.h31
-rw-r--r--src/core/channel/client_channel.c99
-rw-r--r--src/core/channel/connected_channel.c77
-rw-r--r--src/core/channel/http_client_filter.c77
-rw-r--r--src/core/channel/http_server_filter.c240
-rw-r--r--src/core/channel/metadata_buffer.c149
-rw-r--r--src/core/channel/metadata_buffer.h70
-rw-r--r--src/core/security/auth.c61
-rw-r--r--src/core/surface/call.c225
-rw-r--r--src/core/surface/call.h25
-rw-r--r--src/core/surface/channel.c52
-rw-r--r--src/core/surface/client.c28
-rw-r--r--src/core/surface/lame_client.c42
-rw-r--r--src/core/surface/server.c40
-rw-r--r--src/core/transport/chttp2/stream_encoder.c42
-rw-r--r--src/core/transport/chttp2_transport.c106
-rw-r--r--src/core/transport/metadata.c2
-rw-r--r--src/core/transport/stream_op.c198
-rw-r--r--src/core/transport/stream_op.h54
22 files changed, 834 insertions, 916 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c
index 08f2e95deb..5f7e1be268 100644
--- a/src/core/channel/call_op_string.c
+++ b/src/core/channel/call_op_string.c
@@ -43,12 +43,27 @@
static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
gpr_strvec_add(b, gpr_strdup(" key="));
- gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
- GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
+ gpr_strvec_add(
+ b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
gpr_strvec_add(b, gpr_strdup(" value="));
gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
- GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT));
+ GPR_SLICE_LENGTH(md->value->slice),
+ GPR_HEXDUMP_PLAINTEXT));
+}
+
+static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
+ grpc_linked_mdelem *m;
+ for (m = md.list.head; m != NULL; m = m->next) {
+ put_metadata(b, m->md);
+ }
+ if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ char *tmp;
+ gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
+ md.deadline.tv_nsec);
+ gpr_strvec_add(b, tmp);
+ }
}
char *grpc_call_op_string(grpc_call_op *op) {
@@ -69,16 +84,7 @@ char *grpc_call_op_string(grpc_call_op *op) {
switch (op->type) {
case GRPC_SEND_METADATA:
gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
- put_metadata(&b, op->data.metadata);
- break;
- case GRPC_SEND_DEADLINE:
- gpr_asprintf(&tmp, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec,
- op->data.deadline.tv_nsec);
- gpr_strvec_add(&b, tmp);
- break;
- case GRPC_SEND_START:
- gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset);
- gpr_strvec_add(&b, tmp);
+ put_metadata_list(&b, op->data.metadata);
break;
case GRPC_SEND_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
@@ -94,15 +100,7 @@ char *grpc_call_op_string(grpc_call_op *op) {
break;
case GRPC_RECV_METADATA:
gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
- put_metadata(&b, op->data.metadata);
- break;
- case GRPC_RECV_DEADLINE:
- gpr_asprintf(&tmp, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec,
- op->data.deadline.tv_nsec);
- gpr_strvec_add(&b, tmp);
- break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- gpr_strvec_add(&b, gpr_strdup("RECV_END_OF_INITIAL_METADATA"));
+ put_metadata_list(&b, op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
@@ -113,12 +111,21 @@ char *grpc_call_op_string(grpc_call_op *op) {
case GRPC_RECV_FINISH:
gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
break;
+ case GRPC_RECV_SYNTHETIC_STATUS:
+ gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'",
+ op->data.synthetic_status.status,
+ op->data.synthetic_status.message);
+ gpr_strvec_add(&b, tmp);
+ break;
case GRPC_CANCEL_OP:
gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
break;
}
gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
gpr_strvec_add(&b, tmp);
+ if (op->bind_pollset) {
+ gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
+ }
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index ba7b7ba59c..9c0c20af22 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -62,11 +62,13 @@ static void init_rpc_stats(census_rpc_stats* stats) {
static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
channel_data* chand) {
- if (op->data.metadata->key == chand->path_str) {
- gpr_log(GPR_DEBUG,
- (const char*)GPR_SLICE_START_PTR(op->data.metadata->value->slice));
- census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
- op->data.metadata->value->slice));
+ grpc_linked_mdelem* m;
+ for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
+ if (m->md->key == chand->path_str) {
+ gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ census_add_method_tag(
+ calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ }
}
}
@@ -178,11 +180,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
- client_call_op, channel_op, sizeof(call_data),
- client_init_call_elem, client_destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "census-client"};
+ client_call_op, channel_op, sizeof(call_data), client_init_call_elem,
+ client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
- server_call_op, channel_op, sizeof(call_data),
- server_init_call_elem, server_destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "census-server"};
+ server_call_op, channel_op, sizeof(call_data), server_init_call_elem,
+ server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "census-server"};
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 21df9771ce..3a3a3a75b7 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -77,9 +77,9 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
return size;
}
-#define CHANNEL_ELEMS_FROM_STACK(stk) \
- ((grpc_channel_element *)( \
- (char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack))))
+#define CHANNEL_ELEMS_FROM_STACK(stk) \
+ ((grpc_channel_element *)((char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE( \
+ sizeof(grpc_channel_stack))))
#define CALL_ELEMS_FROM_STACK(stk) \
((grpc_call_element *)((char *)(stk) + \
@@ -183,6 +183,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
grpc_call_element *next_elem = elem + op->dir;
+ if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) {
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ }
next_elem->filter->call_op(next_elem, elem, op);
}
@@ -193,42 +196,17 @@ void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_channel_element *elem) {
- return (grpc_channel_stack *)((char *)(elem) -
- ROUND_UP_TO_ALIGNMENT_SIZE(
- sizeof(grpc_channel_stack)));
+ return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_channel_stack)));
}
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
- return (grpc_call_stack *)((char *)(elem) - ROUND_UP_TO_ALIGNMENT_SIZE(
- sizeof(grpc_call_stack)));
+ return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_call_stack)));
}
static void do_nothing(void *user_data, grpc_op_error error) {}
-void grpc_call_element_recv_metadata(grpc_call_element *cur_elem,
- grpc_mdelem *mdelem) {
- grpc_call_op metadata_op;
- metadata_op.type = GRPC_RECV_METADATA;
- metadata_op.dir = GRPC_CALL_UP;
- metadata_op.done_cb = do_nothing;
- metadata_op.user_data = NULL;
- metadata_op.flags = 0;
- metadata_op.data.metadata = mdelem;
- grpc_call_next_op(cur_elem, &metadata_op);
-}
-
-void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
- grpc_mdelem *mdelem) {
- grpc_call_op metadata_op;
- metadata_op.type = GRPC_SEND_METADATA;
- metadata_op.dir = GRPC_CALL_DOWN;
- metadata_op.done_cb = do_nothing;
- metadata_op.user_data = NULL;
- metadata_op.flags = 0;
- metadata_op.data.metadata = mdelem;
- grpc_call_next_op(cur_elem, &metadata_op);
-}
-
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
grpc_call_op cancel_op;
cancel_op.type = GRPC_CANCEL_OP;
@@ -236,6 +214,7 @@ void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
cancel_op.done_cb = do_nothing;
cancel_op.user_data = NULL;
cancel_op.flags = 0;
+ cancel_op.bind_pollset = NULL;
grpc_call_next_op(cur_elem, &cancel_op);
}
@@ -246,5 +225,19 @@ void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
finish_op.flags = 0;
+ finish_op.bind_pollset = NULL;
grpc_call_next_op(cur_elem, &finish_op);
}
+
+void grpc_call_element_recv_status(grpc_call_element *cur_elem,
+ grpc_status_code status,
+ const char *message) {
+ grpc_call_op op;
+ op.type = GRPC_RECV_SYNTHETIC_STATUS;
+ op.dir = GRPC_CALL_UP;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ op.data.synthetic_status.status = status;
+ op.data.synthetic_status.message = message;
+ grpc_call_next_op(cur_elem, &op);
+}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index ef1da7b33b..addc92b272 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -62,10 +62,6 @@ typedef struct grpc_call_element grpc_call_element;
typedef enum {
/* send metadata to the channels peer */
GRPC_SEND_METADATA,
- /* send a deadline */
- GRPC_SEND_DEADLINE,
- /* start a connection (corresponds to start_invoke/accept) */
- GRPC_SEND_START,
/* send a message to the channels peer */
GRPC_SEND_MESSAGE,
/* send a pre-formatted message to the channels peer */
@@ -76,16 +72,14 @@ typedef enum {
GRPC_REQUEST_DATA,
/* metadata was received from the channels peer */
GRPC_RECV_METADATA,
- /* receive a deadline */
- GRPC_RECV_DEADLINE,
- /* the end of the first batch of metadata was received */
- GRPC_RECV_END_OF_INITIAL_METADATA,
/* a message was received from the channels peer */
GRPC_RECV_MESSAGE,
/* half-close was received from the channels peer */
GRPC_RECV_HALF_CLOSE,
/* full close was received from the channels peer */
GRPC_RECV_FINISH,
+ /* a status has been sythesized locally */
+ GRPC_RECV_SYNTHETIC_STATUS,
/* the call has been abnormally terminated */
GRPC_CANCEL_OP
} grpc_call_op_type;
@@ -109,14 +103,16 @@ typedef struct {
/* Argument data, matching up with grpc_call_op_type names */
union {
- struct {
- grpc_pollset *pollset;
- } start;
grpc_byte_buffer *message;
- grpc_mdelem *metadata;
- gpr_timespec deadline;
+ grpc_metadata_batch metadata;
+ struct {
+ grpc_status_code status;
+ const char *message;
+ } synthetic_status;
} data;
+ grpc_pollset *bind_pollset;
+
/* Must be called when processing of this call-op is complete.
Signature chosen to match transport flow control callbacks */
void (*done_cb)(void *user_data, grpc_op_error error);
@@ -291,16 +287,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_call_op *op);
-void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
- grpc_mdelem *elem);
-void grpc_call_element_recv_metadata(grpc_call_element *cur_elem,
- grpc_mdelem *elem);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
void grpc_call_element_send_finish(grpc_call_element *cur_elem);
+void grpc_call_element_recv_status(grpc_call_element *cur_elem,
+ grpc_status_code status,
+ const char *message);
extern int grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
if (grpc_trace_channel) grpc_call_log_op(sev, elem, op)
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_STACK_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_STACK_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9791f98be8..bc481e59ca 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -38,7 +38,6 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
@@ -70,9 +69,6 @@ typedef struct {
int transport_setup_initiated;
grpc_channel_args *args;
-
- /* metadata cache */
- grpc_mdelem *cancel_status;
} channel_data;
typedef enum {
@@ -86,20 +82,16 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
+ gpr_uint8 got_first_send;
+
call_state state;
- grpc_metadata_buffer pending_metadata;
gpr_timespec deadline;
union {
struct {
/* our child call stack */
grpc_child_call *child_call;
} active;
- struct {
- void (*on_complete)(void *user_data, grpc_op_error error);
- void *on_complete_user_data;
- gpr_uint32 start_flags;
- grpc_pollset *pollset;
- } waiting;
+ grpc_call_op waiting_op;
} s;
};
@@ -127,20 +119,6 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
GPR_ASSERT(calld->state == CALL_ACTIVE);
- /* sending buffered metadata down the stack before the start call */
- grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem);
-
- if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) {
- grpc_call_op dop;
- dop.type = GRPC_SEND_DEADLINE;
- dop.dir = GRPC_CALL_DOWN;
- dop.flags = 0;
- dop.data.deadline = calld->deadline;
- dop.done_cb = do_nothing;
- dop.user_data = NULL;
- child_elem->filter->call_op(child_elem, elem, &dop);
- }
-
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
child_elem->filter->call_op(child_elem, elem, op);
@@ -152,6 +130,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
gpr_mu_lock(&chand->mu);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&chand->mu);
+ grpc_metadata_batch_destroy(&op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_ERROR);
return;
}
@@ -184,10 +163,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
gpr_realloc(chand->waiting_children,
chand->waiting_child_capacity * sizeof(call_data *));
}
- calld->s.waiting.on_complete = op->done_cb;
- calld->s.waiting.on_complete_user_data = op->user_data;
- calld->s.waiting.start_flags = op->flags;
- calld->s.waiting.pollset = op->data.start.pollset;
+ calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld;
gpr_mu_unlock(&chand->mu);
@@ -212,15 +188,8 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
static void send_up_cancelled_ops(grpc_call_element *elem) {
grpc_call_op finish_op;
- channel_data *chand = elem->channel_data;
/* send up a synthesized status */
- finish_op.type = GRPC_RECV_METADATA;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+ grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
/* send up a finish */
finish_op.type = GRPC_RECV_FINISH;
finish_op.dir = GRPC_CALL_UP;
@@ -243,12 +212,12 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
child_elem->filter->call_op(child_elem, elem, op);
return; /* early out */
case CALL_WAITING:
+ grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
send_up_cancelled_ops(elem);
- calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
- GRPC_OP_ERROR);
+ calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
return; /* early out */
case CALL_CREATED:
calld->state = CALL_CANCELLED;
@@ -271,15 +240,13 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_SEND_METADATA:
- grpc_metadata_buffer_queue(&calld->pending_metadata, op);
- break;
- case GRPC_SEND_DEADLINE:
- calld->deadline = op->data.deadline;
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_SEND_START:
- /* filter out the start event to find which child to send on */
- start_rpc(elem, op);
+ if (!calld->got_first_send) {
+ /* filter out the start event to find which child to send on */
+ calld->got_first_send = 1;
+ start_rpc(elem, op);
+ } else {
+ grpc_call_next_op(elem, op);
+ }
break;
case GRPC_CANCEL_OP:
cancel_rpc(elem, op);
@@ -382,12 +349,6 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void error_bad_on_complete(void *arg, grpc_op_error error) {
- gpr_log(GPR_ERROR,
- "Waiting finished but not started? Bad on_complete callback");
- abort();
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data) {
@@ -398,23 +359,22 @@ static void init_call_elem(grpc_call_element *elem,
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->s.waiting.on_complete = error_bad_on_complete;
- calld->s.waiting.on_complete_user_data = NULL;
- grpc_metadata_buffer_init(&calld->pending_metadata);
+ calld->got_first_send = 0;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- /* if the metadata buffer is not flushed, destroy it here. */
- grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
+ if (calld->state == CALL_WAITING) {
+ grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
+ }
}
/* Constructor for channel_data */
@@ -423,7 +383,6 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
channel_data *chand = elem->channel_data;
- char temp[GPR_LTOA_MIN_BUFSIZE];
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
@@ -437,10 +396,6 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->transport_setup = NULL;
chand->transport_setup_initiated = 0;
chand->args = grpc_channel_args_copy(args);
-
- gpr_ltoa(GRPC_STATUS_CANCELLED, temp);
- chand->cancel_status =
- grpc_mdelem_from_strings(metadata_context, "grpc-status", temp);
}
/* Destructor for channel_data */
@@ -455,7 +410,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
grpc_channel_args_destroy(chand->args);
- grpc_mdelem_unref(chand->cancel_status);
gpr_mu_destroy(&chand->mu);
GPR_ASSERT(chand->waiting_child_count == 0);
@@ -463,9 +417,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "client-channel", };
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ "client-channel",
+};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
grpc_channel_stack *channel_stack, grpc_transport *transport,
@@ -520,13 +475,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
- call_ops[i].type = GRPC_SEND_START;
- call_ops[i].dir = GRPC_CALL_DOWN;
- call_ops[i].flags = waiting_children[i]->s.waiting.start_flags;
- call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
- call_ops[i].user_data =
- waiting_children[i]->s.waiting.on_complete_user_data;
- call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
+ call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 62611e08f3..711274bfe1 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -60,7 +60,6 @@ typedef struct connected_channel_call_data {
gpr_uint32 max_message_length;
gpr_uint32 incoming_message_length;
gpr_uint8 reading_message;
- gpr_uint8 got_metadata_boundary;
gpr_uint8 got_read_close;
gpr_slice_buffer incoming_message;
gpr_uint32 outgoing_buffer_length_estimate;
@@ -120,27 +119,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ if (op->bind_pollset) {
+ grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
+ }
+
switch (op->type) {
case GRPC_SEND_METADATA:
grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
- op->user_data);
- break;
- case GRPC_SEND_DEADLINE:
- grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline);
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
- op->user_data);
- break;
- case GRPC_SEND_START:
- grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
- grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
end_bufferable_op(op, chand, calld, 0);
break;
case GRPC_SEND_MESSAGE:
grpc_sopb_add_begin_message(&calld->outgoing_sopb,
grpc_byte_buffer_length(op->data.message),
op->flags);
- /* fall-through */
+ /* fall-through */
case GRPC_SEND_PREFORMATTED_MESSAGE:
copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
calld->outgoing_buffer_length_estimate +=
@@ -200,7 +192,6 @@ static void init_call_elem(grpc_call_element *elem,
grpc_sopb_init(&calld->outgoing_sopb);
calld->reading_message = 0;
- calld->got_metadata_boundary = 0;
calld->got_read_close = 0;
calld->outgoing_buffer_length_estimate = 0;
calld->max_message_length = chand->max_message_length;
@@ -259,9 +250,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "connected", };
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
+};
static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
grpc_stream *stream, size_t size_hint) {
@@ -307,8 +298,8 @@ static void finish_message(channel_data *chand, call_data *calld) {
call_op.type = GRPC_RECV_MESSAGE;
call_op.done_cb = do_nothing;
/* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.data.message = grpc_byte_buffer_create(
- calld->incoming_message.slices, calld->incoming_message.count);
+ call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
+ calld->incoming_message.count);
gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
/* disable window updates until we get a request more from above */
@@ -320,6 +311,19 @@ static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_next_op(elem, &call_op);
}
+static void got_metadata(grpc_call_element *elem,
+ grpc_metadata_batch metadata) {
+ grpc_call_op op;
+ op.type = GRPC_RECV_METADATA;
+ op.dir = GRPC_CALL_UP;
+ op.flags = 0;
+ op.data.metadata = metadata;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+
+ grpc_call_next_op(elem, &op);
+}
+
/* Handle incoming stream ops from the transport, translating them into
call_ops to pass up the call stack */
static void recv_batch(void *user_data, grpc_transport *transport,
@@ -339,40 +343,12 @@ static void recv_batch(void *user_data, grpc_transport *transport,
stream_op = ops + i;
switch (stream_op->type) {
case GRPC_OP_FLOW_CTL_CB:
- gpr_log(GPR_ERROR,
- "should not receive flow control ops from transport");
- abort();
+ stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
break;
case GRPC_NO_OP:
break;
case GRPC_OP_METADATA:
- call_op.type = GRPC_RECV_METADATA;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.data.metadata = stream_op->data.metadata;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- break;
- case GRPC_OP_DEADLINE:
- call_op.type = GRPC_RECV_DEADLINE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.data.deadline = stream_op->data.deadline;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- break;
- case GRPC_OP_METADATA_BOUNDARY:
- if (!calld->got_metadata_boundary) {
- calld->got_metadata_boundary = 1;
- call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
+ got_metadata(elem, stream_op->data.metadata);
break;
case GRPC_OP_BEGIN_MESSAGE:
/* can't begin a message when we're still reading a message */
@@ -495,7 +471,8 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
const grpc_transport_callbacks connected_channel_transport_callbacks = {
alloc_recv_buffer, accept_stream, recv_batch,
- transport_goaway, transport_closed, };
+ transport_goaway, transport_closed,
+};
grpc_transport_setup_result grpc_connected_channel_bind_transport(
grpc_channel_stack *channel_stack, grpc_transport *transport) {
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 3ccc39b717..56e12342d7 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -35,7 +35,10 @@
#include <grpc/support/log.h>
typedef struct call_data {
- int sent_headers;
+ grpc_linked_mdelem method;
+ grpc_linked_mdelem scheme;
+ grpc_linked_mdelem te_trailers;
+ grpc_linked_mdelem content_type;
} call_data;
typedef struct channel_data {
@@ -49,6 +52,18 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
+static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
+ channel_data *channeld = elem->channel_data;
+ if (md == channeld->status) {
+ return NULL;
+ } else if (md->key == channeld->status->key) {
+ grpc_call_element_send_cancel(elem);
+ return NULL;
+ }
+ return md;
+}
+
/* Called either:
- in response to an API call (or similar) from above, to send something
- a network event (or similar) from below, to receive something
@@ -61,42 +76,23 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- ignore_unused(calld);
-
switch (op->type) {
case GRPC_SEND_METADATA:
- if (!calld->sent_headers) {
- /* Send : prefixed headers, which have to be before any application
- * layer headers. */
- calld->sent_headers = 1;
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method));
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme));
- }
- grpc_call_next_op(elem, op);
- break;
- case GRPC_SEND_START:
- if (!calld->sent_headers) {
- /* Send : prefixed headers, if we haven't already */
- calld->sent_headers = 1;
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method));
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme));
- }
- /* Send non : prefixed headers */
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->te_trailers));
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->content_type));
+ /* Send : prefixed headers, which have to be before any application
+ * layer headers. */
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
+ grpc_mdelem_ref(channeld->method));
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
+ grpc_mdelem_ref(channeld->scheme));
+ grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers,
+ grpc_mdelem_ref(channeld->te_trailers));
+ grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
+ grpc_mdelem_ref(channeld->content_type));
grpc_call_next_op(elem, op);
break;
case GRPC_RECV_METADATA:
- if (op->data.metadata == channeld->status) {
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- } else if (op->data.metadata->key == channeld->status->key) {
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- } else {
- grpc_call_next_op(elem, op);
- }
+ grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
+ grpc_call_next_op(elem, op);
break;
default:
/* pass control up or down the stack depending on op->dir */
@@ -124,16 +120,7 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- /* initialize members */
- calld->sent_headers = 0;
-}
+ const void *server_transport_data) {}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
@@ -194,6 +181,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http-client"};
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ "http-client"};
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 9da8b333ca..0bfe2f2e30 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -38,8 +38,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef enum { NOT_RECEIVED, POST, GET } known_method_type;
-
typedef struct {
grpc_mdelem *path;
grpc_mdelem *content_type;
@@ -47,16 +45,17 @@ typedef struct {
} gettable;
typedef struct call_data {
- known_method_type seen_method;
+ gpr_uint8 got_initial_metadata;
+ gpr_uint8 seen_path;
+ gpr_uint8 seen_post;
gpr_uint8 sent_status;
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
- grpc_mdelem *path;
+ grpc_linked_mdelem status;
} call_data;
typedef struct channel_data {
grpc_mdelem *te_trailers;
- grpc_mdelem *method_get;
grpc_mdelem *method_post;
grpc_mdelem *http_scheme;
grpc_mdelem *https_scheme;
@@ -78,38 +77,70 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
-/* Handle 'GET': not technically grpc, so probably a web browser hitting
- us */
-static void payload_done(void *elem, grpc_op_error error) {
- if (error == GRPC_OP_OK) {
- grpc_call_element_send_finish(elem);
- }
-}
-
-static void handle_get(grpc_call_element *elem) {
+static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_call_op op;
- size_t i;
- for (i = 0; i < channeld->gettable_count; i++) {
- if (channeld->gettables[i].path == calld->path) {
- grpc_call_element_send_metadata(elem,
- grpc_mdelem_ref(channeld->status_ok));
- grpc_call_element_send_metadata(
- elem, grpc_mdelem_ref(channeld->gettables[i].content_type));
- op.type = GRPC_SEND_PREFORMATTED_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.data.message = channeld->gettables[i].content;
- op.done_cb = payload_done;
- op.user_data = elem;
- grpc_call_next_op(elem, &op);
+ /* Check if it is one of the headers we care about. */
+ if (md == channeld->te_trailers || md == channeld->method_post ||
+ md == channeld->http_scheme || md == channeld->https_scheme ||
+ md == channeld->grpc_scheme || md == channeld->content_type) {
+ /* swallow it */
+ if (md == channeld->method_post) {
+ calld->seen_post = 1;
+ } else if (md->key == channeld->http_scheme->key) {
+ calld->seen_scheme = 1;
+ } else if (md == channeld->te_trailers) {
+ calld->seen_te_trailers = 1;
}
+ /* TODO(klempner): Track that we've seen all the headers we should
+ require */
+ return NULL;
+ } else if (md->key == channeld->content_type->key) {
+ if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) ==
+ 0) {
+ /* Although the C implementation doesn't (currently) generate them,
+ any custom +-suffix is explicitly valid. */
+ /* TODO(klempner): We should consider preallocating common values such
+ as +proto or +json, or at least stashing them if we see them. */
+ /* TODO(klempner): Should we be surfacing this to application code? */
+ } else {
+ /* TODO(klempner): We're currently allowing this, but we shouldn't
+ see it without a proxy so log for now. */
+ gpr_log(GPR_INFO, "Unexpected content-type %s",
+ channeld->content_type->key);
+ }
+ return NULL;
+ } else if (md->key == channeld->te_trailers->key ||
+ md->key == channeld->method_post->key ||
+ md->key == channeld->http_scheme->key ||
+ md->key == channeld->content_type->key) {
+ gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
+ /* swallow it and error everything out. */
+ /* TODO(klempner): We ought to generate more descriptive error messages
+ on the wire here. */
+ grpc_call_element_send_cancel(elem);
+ return NULL;
+ } else if (md->key == channeld->path_key) {
+ if (calld->seen_path) {
+ gpr_log(GPR_ERROR, "Received :path twice");
+ return NULL;
+ }
+ calld->seen_path = 1;
+ return md;
+ } else if (md->key == channeld->host_key) {
+ /* translate host to :authority since :authority may be
+ omitted */
+ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
+ channeld->mdctx, grpc_mdstr_ref(channeld->authority_key),
+ grpc_mdstr_ref(md->value));
+ grpc_mdelem_unref(md);
+ return authority;
+ } else {
+ return md;
}
- grpc_call_element_send_metadata(elem,
- grpc_mdelem_ref(channeld->status_not_found));
- grpc_call_element_send_finish(elem);
}
/* Called either:
@@ -126,115 +157,44 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_RECV_METADATA:
- /* Check if it is one of the headers we care about. */
- if (op->data.metadata == channeld->te_trailers ||
- op->data.metadata == channeld->method_get ||
- op->data.metadata == channeld->method_post ||
- op->data.metadata == channeld->http_scheme ||
- op->data.metadata == channeld->https_scheme ||
- op->data.metadata == channeld->grpc_scheme ||
- op->data.metadata == channeld->content_type) {
- /* swallow it */
- if (op->data.metadata == channeld->method_get) {
- calld->seen_method = GET;
- } else if (op->data.metadata == channeld->method_post) {
- calld->seen_method = POST;
- } else if (op->data.metadata->key == channeld->http_scheme->key) {
- calld->seen_scheme = 1;
- } else if (op->data.metadata == channeld->te_trailers) {
- calld->seen_te_trailers = 1;
- }
- /* TODO(klempner): Track that we've seen all the headers we should
- require */
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- } else if (op->data.metadata->key == channeld->content_type->key) {
- if (strncmp(grpc_mdstr_as_c_string(op->data.metadata->value),
- "application/grpc+", 17) == 0) {
- /* Although the C implementation doesn't (currently) generate them,
- any
- custom +-suffix is explicitly valid. */
- /* TODO(klempner): We should consider preallocating common values such
- as +proto or +json, or at least stashing them if we see them. */
- /* TODO(klempner): Should we be surfacing this to application code? */
+ grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
+ if (!calld->got_initial_metadata) {
+ calld->got_initial_metadata = 1;
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path) {
+ grpc_call_next_op(elem, op);
} else {
- /* TODO(klempner): We're currently allowing this, but we shouldn't
- see it without a proxy so log for now. */
- gpr_log(GPR_INFO, "Unexpected content-type %s",
- channeld->content_type->key);
- }
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- } else if (op->data.metadata->key == channeld->te_trailers->key ||
- op->data.metadata->key == channeld->method_post->key ||
- op->data.metadata->key == channeld->http_scheme->key ||
- op->data.metadata->key == channeld->content_type->key) {
- gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
- grpc_mdstr_as_c_string(op->data.metadata->key),
- grpc_mdstr_as_c_string(op->data.metadata->value));
- /* swallow it and error everything out. */
- /* TODO(klempner): We ought to generate more descriptive error messages
- on the wire here. */
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- } else if (op->data.metadata->key == channeld->path_key) {
- if (calld->path != NULL) {
- gpr_log(GPR_ERROR, "Received :path twice");
- grpc_mdelem_unref(calld->path);
+ if (!calld->seen_path) {
+ gpr_log(GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_post) {
+ gpr_log(GPR_ERROR, "Missing :method header");
+ }
+ if (!calld->seen_scheme) {
+ gpr_log(GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers) {
+ gpr_log(GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ grpc_metadata_batch_destroy(&op->data.metadata);
+ op->done_cb(op->user_data, GRPC_OP_OK);
+ grpc_call_element_send_cancel(elem);
}
- calld->path = op->data.metadata;
- op->done_cb(op->user_data, GRPC_OP_OK);
- } else if (op->data.metadata->key == channeld->host_key) {
- /* translate host to :authority since :authority may be
- omitted */
- grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
- channeld->mdctx, grpc_mdstr_ref(channeld->authority_key),
- grpc_mdstr_ref(op->data.metadata->value));
- grpc_mdelem_unref(op->data.metadata);
- op->data.metadata = authority;
- /* pass the event up */
- grpc_call_next_op(elem, op);
} else {
- /* pass the event up */
grpc_call_next_op(elem, op);
}
break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- /* Have we seen the required http2 transport headers?
- (:method, :scheme, content-type, with :path and :authority covered
- at the channel level right now) */
- if (calld->seen_method == POST && calld->seen_scheme &&
- calld->seen_te_trailers && calld->path) {
- grpc_call_element_recv_metadata(elem, calld->path);
- calld->path = NULL;
- grpc_call_next_op(elem, op);
- } else if (calld->seen_method == GET) {
- handle_get(elem);
- } else {
- if (calld->seen_method == NOT_RECEIVED) {
- gpr_log(GPR_ERROR, "Missing :method header");
- }
- if (!calld->seen_scheme) {
- gpr_log(GPR_ERROR, "Missing :scheme header");
- }
- if (!calld->seen_te_trailers) {
- gpr_log(GPR_ERROR, "Missing te trailers header");
- }
- /* Error this call out */
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- }
- break;
- case GRPC_SEND_START:
case GRPC_SEND_METADATA:
/* If we haven't sent status 200 yet, we need to so so because it needs to
come before any non : prefixed metadata. */
if (!calld->sent_status) {
calld->sent_status = 1;
- /* status is reffed by grpc_call_element_send_metadata */
- grpc_call_element_send_metadata(elem,
- grpc_mdelem_ref(channeld->status_ok));
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+ grpc_mdelem_ref(channeld->status_ok));
}
grpc_call_next_op(elem, op);
break;
@@ -272,25 +232,11 @@ static void init_call_elem(grpc_call_element *elem,
ignore_unused(channeld);
/* initialize members */
- calld->path = NULL;
- calld->sent_status = 0;
- calld->seen_scheme = 0;
- calld->seen_method = NOT_RECEIVED;
- calld->seen_te_trailers = 0;
+ memset(calld, 0, sizeof(*calld));
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- if (calld->path) {
- grpc_mdelem_unref(calld->path);
- }
-}
+static void destroy_call_elem(grpc_call_element *elem) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,
@@ -314,7 +260,6 @@ static void init_channel_elem(grpc_channel_element *elem,
channeld->status_not_found =
grpc_mdelem_from_strings(mdctx, ":status", "404");
channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->method_get = grpc_mdelem_from_strings(mdctx, ":method", "GET");
channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http");
channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https");
channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
@@ -369,7 +314,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
grpc_mdelem_unref(channeld->status_ok);
grpc_mdelem_unref(channeld->status_not_found);
grpc_mdelem_unref(channeld->method_post);
- grpc_mdelem_unref(channeld->method_get);
grpc_mdelem_unref(channeld->http_scheme);
grpc_mdelem_unref(channeld->https_scheme);
grpc_mdelem_unref(channeld->grpc_scheme);
diff --git a/src/core/channel/metadata_buffer.c b/src/core/channel/metadata_buffer.c
deleted file mode 100644
index eac852e4a4..0000000000
--- a/src/core/channel/metadata_buffer.c
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/metadata_buffer.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-#include <string.h>
-
-#define INITIAL_ELEM_CAP 8
-
-/* One queued call; we track offsets to string data in a shared buffer to
- reduce allocations. See grpc_metadata_buffer_impl for the memory use
- strategy */
-typedef struct {
- grpc_mdelem *md;
- void (*cb)(void *user_data, grpc_op_error error);
- void *user_data;
- gpr_uint32 flags;
-} qelem;
-
-/* Memory layout:
-
- grpc_metadata_buffer_impl
- followed by an array of qelem */
-struct grpc_metadata_buffer_impl {
- /* number of elements in q */
- size_t elems;
- /* capacity of q */
- size_t elem_cap;
-};
-
-#define ELEMS(buffer) ((qelem *)((buffer) + 1))
-
-void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer) {
- /* start buffer as NULL, indicating no elements */
- *buffer = NULL;
-}
-
-void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer,
- grpc_op_error error) {
- size_t i;
- qelem *qe;
- if (*buffer) {
- for (i = 0; i < (*buffer)->elems; i++) {
- qe = &ELEMS(*buffer)[i];
- grpc_mdelem_unref(qe->md);
- qe->cb(qe->user_data, error);
- }
- gpr_free(*buffer);
- }
-}
-
-void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer,
- grpc_call_op *op) {
- grpc_metadata_buffer_impl *impl = *buffer;
- qelem *qe;
- size_t bytes;
-
- GPR_ASSERT(op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA);
-
- if (!impl) {
- /* this is the first element: allocate enough space to hold the
- header object and the initial element capacity of qelems */
- bytes =
- sizeof(grpc_metadata_buffer_impl) + INITIAL_ELEM_CAP * sizeof(qelem);
- impl = gpr_malloc(bytes);
- /* initialize the header object */
- impl->elems = 0;
- impl->elem_cap = INITIAL_ELEM_CAP;
- } else if (impl->elems == impl->elem_cap) {
- /* more qelems than what we can deal with: grow by doubling size */
- impl->elem_cap *= 2;
- bytes = sizeof(grpc_metadata_buffer_impl) + impl->elem_cap * sizeof(qelem);
- impl = gpr_realloc(impl, bytes);
- }
-
- /* append an element to the queue */
- qe = &ELEMS(impl)[impl->elems];
- impl->elems++;
-
- qe->md = op->data.metadata;
- qe->cb = op->done_cb;
- qe->user_data = op->user_data;
- qe->flags = op->flags;
-
- /* header object may have changed location: store it back */
- *buffer = impl;
-}
-
-void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer,
- grpc_call_element *elem) {
- grpc_metadata_buffer_impl *impl = *buffer;
- grpc_call_op op;
- qelem *qe;
- size_t i;
-
- if (!impl) {
- /* nothing to send */
- return;
- }
-
- /* construct call_op's, and push them down the stack */
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- for (i = 0; i < impl->elems; i++) {
- qe = &ELEMS(impl)[i];
- op.done_cb = qe->cb;
- op.user_data = qe->user_data;
- op.flags = qe->flags;
- op.data.metadata = qe->md;
- grpc_call_next_op(elem, &op);
- }
-
- /* free data structures and reset to NULL: we can only flush once */
- gpr_free(impl);
- *buffer = NULL;
-}
diff --git a/src/core/channel/metadata_buffer.h b/src/core/channel/metadata_buffer.h
deleted file mode 100644
index b7cc5170d1..0000000000
--- a/src/core/channel/metadata_buffer.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* Utility code to buffer GRPC_SEND_METADATA calls and pass them down the stack
- all at once at some otherwise-determined time. Useful for implementing
- filters that want to queue metadata until a START event chooses some
- underlying filter stack to send an rpc on. */
-
-/* Clients should declare a member of grpc_metadata_buffer. This may at some
- point become a typedef for a struct, but for now a pointer suffices */
-typedef struct grpc_metadata_buffer_impl grpc_metadata_buffer_impl;
-typedef grpc_metadata_buffer_impl *grpc_metadata_buffer;
-
-/* Initializes the metadata buffer. Allocates no memory. */
-void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer);
-/* Destroy the metadata buffer. */
-void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer,
- grpc_op_error error);
-/* Append a call to the end of a metadata buffer: may allocate memory */
-void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, grpc_call_op *op);
-/* Flush all queued operations from the metadata buffer to the element below
- self */
-void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer,
- grpc_call_element *self);
-/* Count the number of queued elements in the buffer. */
-size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer);
-/* Extract elements as a grpc_metadata*, for presentation to applications.
- The returned buffer must be freed with
- grpc_metadata_buffer_cleanup_elements.
- Clears the metadata buffer (this is a one-shot operation) */
-grpc_metadata *grpc_metadata_buffer_extract_elements(
- grpc_metadata_buffer *buffer);
-void grpc_metadata_buffer_cleanup_elements(void *elements, grpc_op_error error);
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H */
diff --git a/src/core/security/auth.c b/src/core/security/auth.c
index 130698c11b..4af2c67d83 100644
--- a/src/core/security/auth.c
+++ b/src/core/security/auth.c
@@ -44,12 +44,15 @@
#include "src/core/security/credentials.h"
#include "src/core/surface/call.h"
+#define MAX_CREDENTIALS_METADATA_COUNT 4
+
/* We can have a per-call credentials. */
typedef struct {
grpc_credentials *creds;
grpc_mdstr *host;
grpc_mdstr *method;
grpc_call_op op;
+ grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
} call_data;
/* We can have a per-channel credentials. */
@@ -62,30 +65,8 @@ typedef struct {
grpc_mdstr *status_key;
} channel_data;
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
static void bubbleup_error(grpc_call_element *elem, const char *error_msg) {
- grpc_call_op finish_op;
- channel_data *channeld = elem->channel_data;
- char status[GPR_LTOA_MIN_BUFSIZE];
-
- gpr_log(GPR_ERROR, "%s", error_msg);
- finish_op.type = GRPC_RECV_METADATA;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.data.metadata = grpc_mdelem_from_metadata_strings(
- channeld->md_ctx, grpc_mdstr_ref(channeld->error_msg_key),
- grpc_mdstr_from_string(channeld->md_ctx, error_msg));
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
-
- gpr_ltoa(GRPC_STATUS_UNAUTHENTICATED, status);
- finish_op.data.metadata = grpc_mdelem_from_metadata_strings(
- channeld->md_ctx, grpc_mdstr_ref(channeld->status_key),
- grpc_mdstr_from_string(channeld->md_ctx, status));
- grpc_call_next_op(elem, &finish_op);
-
+ grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg);
grpc_call_element_send_cancel(elem);
}
@@ -93,11 +74,15 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
size_t num_md,
grpc_credentials_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
+ call_data *calld = elem->call_data;
+ grpc_call_op op = calld->op;
size_t i;
+ GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
for (i = 0; i < num_md; i++) {
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(md_elems[i]));
+ grpc_metadata_batch_add_tail(&op.data.metadata, &calld->md_links[i],
+ grpc_mdelem_ref(md_elems[i]));
}
- grpc_call_next_op(elem, &((call_data *)elem->call_data)->op);
+ grpc_call_next_op(elem, &op);
}
static char *build_service_url(const char *url_scheme, call_data *calld) {
@@ -159,6 +144,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
bubbleup_error(elem, error_msg);
+ grpc_metadata_batch_destroy(&calld->op.data.metadata);
gpr_free(error_msg);
calld->op.done_cb(calld->op.user_data, GRPC_OP_ERROR);
}
@@ -174,21 +160,22 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
+ grpc_linked_mdelem *l;
switch (op->type) {
case GRPC_SEND_METADATA:
- /* Pointer comparison is OK for md_elems created from the same context. */
- if (op->data.metadata->key == channeld->authority_string) {
- if (calld->host != NULL) grpc_mdstr_unref(calld->host);
- calld->host = grpc_mdstr_ref(op->data.metadata->value);
- } else if (op->data.metadata->key == channeld->path_string) {
- if (calld->method != NULL) grpc_mdstr_unref(calld->method);
- calld->method = grpc_mdstr_ref(op->data.metadata->value);
+ for (l = op->data.metadata.list.head; l != NULL; l = l->next) {
+ grpc_mdelem *md = l->md;
+ /* Pointer comparison is OK for md_elems created from the same context.
+ */
+ if (md->key == channeld->authority_string) {
+ if (calld->host != NULL) grpc_mdstr_unref(calld->host);
+ calld->host = grpc_mdstr_ref(md->value);
+ } else if (md->key == channeld->path_string) {
+ if (calld->method != NULL) grpc_mdstr_unref(calld->method);
+ calld->method = grpc_mdstr_ref(md->value);
+ }
}
- grpc_call_next_op(elem, op);
- break;
-
- case GRPC_SEND_START:
if (calld->host != NULL) {
grpc_security_status status;
const char *call_host = grpc_mdstr_as_c_string(calld->host);
@@ -202,6 +189,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
"Invalid host %s set in :authority metadata.",
call_host);
bubbleup_error(elem, error_msg);
+ grpc_metadata_batch_destroy(&calld->op.data.metadata);
gpr_free(error_msg);
op->done_cb(op->user_data, GRPC_OP_ERROR);
}
@@ -210,7 +198,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
}
send_security_metadata(elem, op);
break;
-
default:
/* pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index dba63058b8..2949805622 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -33,7 +33,6 @@
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
-#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
@@ -41,6 +40,7 @@
#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
@@ -68,8 +68,10 @@ typedef struct {
} completed_request;
/* See request_set in grpc_call below for a description */
-#define REQSET_EMPTY 255
-#define REQSET_DONE 254
+#define REQSET_EMPTY 'X'
+#define REQSET_DONE 'Y'
+
+#define MAX_SEND_INITIAL_METADATA_COUNT 3
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
@@ -92,6 +94,8 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
@@ -204,12 +208,18 @@ struct grpc_call {
/* Call refcount - to keep the call alive during asynchronous operations */
gpr_refcount internal_refcount;
+ grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT];
+ grpc_linked_mdelem status_link;
+ grpc_linked_mdelem details_link;
+ size_t send_initial_metadata_count;
+ gpr_timespec send_deadline;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
};
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
@@ -226,9 +236,13 @@ struct grpc_call {
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
static send_action choose_send_action(grpc_call *call);
static void enact_send_action(grpc_call *call, send_action sa);
+static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_mdelem **add_initial_metadata,
+ size_t add_initial_metadata_count,
+ gpr_timespec send_deadline) {
size_t i;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
@@ -245,6 +259,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
}
+ GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
+ for (i = 0; i < add_initial_metadata_count; i++) {
+ call->send_initial_metadata[i].md = add_initial_metadata[i];
+ }
+ call->send_initial_metadata_count = add_initial_metadata_count;
+ call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
/* one ref is dropped in response to destroy, the other in
@@ -252,6 +272,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_ref_init(&call->internal_refcount, 2);
grpc_call_stack_init(channel_stack, server_transport_data,
CALL_STACK_FROM_CALL(call));
+ if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
+ set_deadline_alarm(call, send_deadline);
+ }
return call;
}
@@ -284,6 +307,9 @@ static void destroy_call(void *call, int ignored_success) {
for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
gpr_free(c->buffered_metadata[i].metadata);
}
+ for (i = 0; i < c->send_initial_metadata_count; i++) {
+ grpc_mdelem_unref(c->send_initial_metadata[i].md);
+ }
if (c->legacy_state) {
destroy_legacy_state(c->legacy_state);
}
@@ -342,6 +368,7 @@ static void request_more_data(grpc_call *call) {
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
}
@@ -587,15 +614,29 @@ static send_action choose_send_action(grpc_call *call) {
return SEND_NOTHING;
}
-static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
- grpc_call_op op;
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = GRPC_WRITE_BUFFER_HINT;
- op.data.metadata = elem;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
+static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
+ grpc_metadata *metadata) {
+ size_t i;
+ grpc_mdelem_list out;
+ if (count == 0) {
+ out.head = out.tail = NULL;
+ return out;
+ }
+ for (i = 0; i < count; i++) {
+ grpc_metadata *md = &metadata[i];
+ grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
+ grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
+ grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
+ l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value,
+ md->value_length);
+ l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
+ l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
+ }
+ out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
+ out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
+ return out;
}
static void enact_send_action(grpc_call *call, send_action sa) {
@@ -614,19 +655,21 @@ static void enact_send_action(grpc_call *call, send_action sa) {
/* fallthrough */
case SEND_INITIAL_METADATA:
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- for (i = 0; i < data.send_metadata.count; i++) {
- const grpc_metadata *md = &data.send_metadata.metadata[i];
- send_metadata(call,
- grpc_mdelem_from_string_and_buffer(
- call->metadata_context, md->key,
- (const gpr_uint8 *)md->value, md->value_length));
- }
- op.type = GRPC_SEND_START;
+ op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
- op.data.start.pollset = grpc_cq_pollset(call->cq);
+ op.data.metadata.list = chain_metadata_from_app(
+ call, data.send_metadata.count, data.send_metadata.metadata);
+ op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
+ op.data.metadata.deadline = call->send_deadline;
+ for (i = 0; i < call->send_initial_metadata_count; i++) {
+ grpc_metadata_batch_link_head(&op.data.metadata,
+ &call->send_initial_metadata[i]);
+ }
+ call->send_initial_metadata_count = 0;
op.done_cb = finish_start_step;
op.user_data = call;
+ op.bind_pollset = grpc_cq_pollset(call->cq);
grpc_call_execute_op(call, &op);
break;
case SEND_BUFFERED_MESSAGE:
@@ -640,37 +683,42 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.data.message = data.send_message;
op.done_cb = finish_write_step;
op.user_data = call;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
- for (i = 0; i < data.send_metadata.count; i++) {
- const grpc_metadata *md = &data.send_metadata.metadata[i];
- send_metadata(call,
- grpc_mdelem_from_string_and_buffer(
- call->metadata_context, md->key,
- (const gpr_uint8 *)md->value, md->value_length));
- }
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.data.metadata.list = chain_metadata_from_app(
+ call, data.send_metadata.count, data.send_metadata.metadata);
+ op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
+ op.data.metadata.deadline = call->send_deadline;
+ op.bind_pollset = NULL;
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
gpr_ltoa(data.send_status.code, status_str);
- send_metadata(
- call,
+ grpc_metadata_batch_add_tail(
+ &op.data.metadata, &call->status_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_status.details) {
- send_metadata(
- call,
+ grpc_metadata_batch_add_tail(
+ &op.data.metadata, &call->details_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context,
data.send_status.details)));
}
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ grpc_call_execute_op(call, &op);
/* fallthrough: see choose_send_action for details */
case SEND_FINISH:
op.type = GRPC_SEND_FINISH;
@@ -678,6 +726,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.flags = 0;
op.done_cb = finish_finish_step;
op.user_data = call;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
}
@@ -831,6 +880,7 @@ grpc_call_error grpc_call_cancel(grpc_call *c) {
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
+ op.bind_pollset = NULL;
elem = CALL_ELEM_FROM_CALL(c, 0);
elem->filter->call_op(elem, NULL, &op);
@@ -875,9 +925,7 @@ static void call_alarm(void *arg, int success) {
grpc_call_internal_unref(call, 1);
}
-void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-
+static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
}
@@ -886,11 +934,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-static void set_read_state(grpc_call *call, read_state state) {
- lock(call);
+static void set_read_state_locked(grpc_call *call, read_state state) {
GPR_ASSERT(call->read_state < state);
call->read_state = state;
finish_read_ops(call);
+}
+
+static void set_read_state(grpc_call *call, read_state state) {
+ lock(call);
+ set_read_state_locked(call, state);
unlock(call);
}
@@ -914,7 +966,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
- status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
+ status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
@@ -936,52 +988,81 @@ void grpc_call_recv_message(grpc_call_element *elem,
unlock(call);
}
-void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
+void grpc_call_recv_synthetic_status(grpc_call_element *elem,
+ grpc_status_code status,
+ const char *message) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- grpc_mdstr *key = md->key;
+ lock(call);
+ set_status_code(call, STATUS_FROM_CORE, status);
+ set_status_details(call, STATUS_FROM_CORE,
+ grpc_mdstr_from_string(call->metadata_context, message));
+ unlock(call);
+}
+
+int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
+ int is_trailing;
+ grpc_mdctx *mdctx = call->metadata_context;
lock(call);
- if (key == grpc_channel_get_status_string(call->channel)) {
- set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
- grpc_mdelem_unref(md);
- } else if (key == grpc_channel_get_message_string(call->channel)) {
- set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
- grpc_mdelem_unref(md);
- } else {
- dest = &call->buffered_metadata[call->read_state >=
- READ_STATE_GOT_INITIAL_METADATA];
- if (dest->count == dest->capacity) {
- dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
- dest->metadata =
- gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
- }
- mdusr = &dest->metadata[dest->count++];
- mdusr->key = grpc_mdstr_as_c_string(md->key);
- mdusr->value = grpc_mdstr_as_c_string(md->value);
- mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
- if (call->owned_metadata_count == call->owned_metadata_capacity) {
- call->owned_metadata_capacity = GPR_MAX(
- call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
- call->owned_metadata =
- gpr_realloc(call->owned_metadata,
- sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
+ is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
+ for (l = md->list.head; l != NULL; l = l->next) {
+ grpc_mdelem *md = l->md;
+ grpc_mdstr *key = md->key;
+ if (key == grpc_channel_get_status_string(call->channel)) {
+ set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
+ } else if (key == grpc_channel_get_message_string(call->channel)) {
+ set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
+ } else {
+ dest = &call->buffered_metadata[is_trailing];
+ if (dest->count == dest->capacity) {
+ dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
+ dest->metadata =
+ gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
+ }
+ mdusr = &dest->metadata[dest->count++];
+ mdusr->key = grpc_mdstr_as_c_string(md->key);
+ mdusr->value = grpc_mdstr_as_c_string(md->value);
+ mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
+ if (call->owned_metadata_count == call->owned_metadata_capacity) {
+ call->owned_metadata_capacity =
+ GPR_MAX(call->owned_metadata_capacity + 8,
+ call->owned_metadata_capacity * 2);
+ call->owned_metadata =
+ gpr_realloc(call->owned_metadata,
+ sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
+ }
+ call->owned_metadata[call->owned_metadata_count++] = md;
+ l->md = 0;
}
- call->owned_metadata[call->owned_metadata_count++] = md;
+ }
+ if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
+ set_deadline_alarm(call, md->deadline);
+ }
+ if (!is_trailing) {
+ set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
}
unlock(call);
+
+ grpc_mdctx_lock(mdctx);
+ for (l = md->list.head; l; l = l->next) {
+ if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ for (l = md->garbage.head; l; l = l->next) {
+ grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ grpc_mdctx_unlock(mdctx);
+
+ return !is_trailing;
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
-void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
- grpc_call *call = grpc_call_from_top_element(surface_element);
- set_read_state(call, READ_STATE_GOT_INITIAL_METADATA);
-}
-
/*
* BATCH API IMPLEMENTATION
*/
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 06434f87ac..f8d0915349 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -35,7 +35,6 @@
#define GRPC_INTERNAL_CORE_SURFACE_CALL_H
#include "src/core/channel/channel_stack.h"
-#include "src/core/channel/metadata_buffer.h"
#include <grpc/grpc.h>
/* Primitive operation types - grpc_op's get rewritten into these */
@@ -67,7 +66,7 @@ typedef union {
} recv_status_details;
struct {
size_t count;
- const grpc_metadata *metadata;
+ grpc_metadata *metadata;
} send_metadata;
grpc_byte_buffer *send_message;
struct {
@@ -86,7 +85,10 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
- const void *server_transport_data);
+ const void *server_transport_data,
+ grpc_mdelem **add_initial_metadata,
+ size_t add_initial_metadata_count,
+ gpr_timespec send_deadline);
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
@@ -96,8 +98,9 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
/* Helpers for grpc_client, grpc_server filters to publish received data to
the completion queue/surface layer */
-void grpc_call_recv_metadata(grpc_call_element *surface_element,
- grpc_mdelem *md);
+/* receive metadata - returns 1 if this was initial metadata */
+int grpc_call_recv_metadata(grpc_call_element *surface_element,
+ grpc_metadata_batch *md);
void grpc_call_recv_message(grpc_call_element *surface_element,
grpc_byte_buffer *message);
void grpc_call_read_closed(grpc_call_element *surface_element);
@@ -108,14 +111,12 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
-/* Called when it's known that the initial batch of metadata is complete */
-void grpc_call_initial_metadata_complete(grpc_call_element *surface_element);
-
-void grpc_call_set_deadline(grpc_call_element *surface_element,
- gpr_timespec deadline);
-
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
+void grpc_call_recv_synthetic_status(grpc_call_element *elem,
+ grpc_status_code status,
+ const char *message);
+
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
@@ -128,4 +129,4 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
#define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \
if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag)
-#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index d3962a00c4..29b042e7c1 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -62,7 +62,7 @@ struct grpc_channel {
registered_call *registered_calls;
};
-#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \
(((grpc_channel *)(channel_stack)) - 1)
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
@@ -91,44 +91,25 @@ grpc_channel *grpc_channel_create_from_filters(
return channel;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
- grpc_call *call;
- grpc_call_op op;
+ grpc_mdelem *send_metadata[2];
- if (!channel->is_client) {
- gpr_log(GPR_ERROR, "Cannot create a call on the server.");
- return NULL;
- }
+ GPR_ASSERT(channel->is_client);
- call = grpc_call_create(channel, cq, NULL);
+ send_metadata[0] = path_mdelem;
+ send_metadata[1] = authority_mdelem;
- /* Add :path and :authority headers. */
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.data.metadata = path_mdelem;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
-
- op.data.metadata = authority_mdelem;
- grpc_call_execute_op(call, &op);
-
- if (0 != gpr_time_cmp(deadline, gpr_inf_future)) {
- op.type = GRPC_SEND_DEADLINE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.data.deadline = deadline;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
- }
+ return grpc_call_create(channel, cq, NULL, send_metadata,
+ GPR_ARRAY_SIZE(send_metadata), deadline);
+}
- return call;
+grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
+ const char *method, const char *host,
+ gpr_timespec absolute_deadline) {
+ return grpc_channel_create_call(channel, NULL, method, host,
+ absolute_deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
@@ -146,13 +127,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
deadline);
}
-grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
- const char *method, const char *host,
- gpr_timespec absolute_deadline) {
- return grpc_channel_create_call(channel, NULL, method, host,
- absolute_deadline);
-}
-
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host) {
registered_call *rc = gpr_malloc(sizeof(registered_call));
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 4d54865d16..2f898ff7d7 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -39,28 +39,17 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct {
- void *unused;
-} call_data;
+typedef struct { void *unused; } call_data;
-typedef struct {
- void *unused;
-} channel_data;
+typedef struct { void *unused; } channel_data;
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
- case GRPC_SEND_DEADLINE:
- grpc_call_set_deadline(elem, op->data.deadline);
- grpc_call_next_op(elem, op);
- break;
case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, op->data.metadata);
- break;
- case GRPC_RECV_DEADLINE:
- gpr_log(GPR_ERROR, "Deadline received by client (ignored)");
+ grpc_call_recv_metadata(elem, &op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message);
@@ -72,8 +61,9 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
case GRPC_RECV_FINISH:
grpc_call_stream_closed(elem);
break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- grpc_call_initial_metadata_complete(elem);
+ case GRPC_RECV_SYNTHETIC_STATUS:
+ grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status,
+ op->data.synthetic_status.message);
break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
@@ -114,6 +104,6 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
const grpc_channel_filter grpc_client_surface_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "client", };
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client",
+};
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index b40c48381f..78170806f1 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -42,28 +42,20 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct {
- void *unused;
-} call_data;
+typedef struct { void *unused; } call_data;
-typedef struct {
- grpc_mdelem *status;
- grpc_mdelem *message;
-} channel_data;
+typedef struct { void *unused; } channel_data;
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
- channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
- case GRPC_SEND_START:
- grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status));
- grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message));
- grpc_call_stream_closed(elem);
- break;
case GRPC_SEND_METADATA:
- grpc_mdelem_unref(op->data.metadata);
+ grpc_metadata_batch_destroy(&op->data.metadata);
+ grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN,
+ "Rpc sent on a lame channel.");
+ grpc_call_stream_closed(elem);
break;
default:
break;
@@ -94,29 +86,17 @@ static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
- channel_data *channeld = elem->channel_data;
- char status[12];
-
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
-
- channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message",
- "Rpc sent on a lame channel.");
- gpr_ltoa(GRPC_STATUS_UNKNOWN, status);
- channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status);
}
-static void destroy_channel_elem(grpc_channel_element *elem) {
- channel_data *channeld = elem->channel_data;
-
- grpc_mdelem_unref(channeld->message);
- grpc_mdelem_unref(channeld->status);
-}
+static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "lame-client", };
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ "lame-client",
+};
grpc_channel *grpc_lame_client_channel_create(void) {
static const grpc_channel_filter *filters[] = {&lame_filter};
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 17cba9a505..e771929870 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -411,29 +411,32 @@ static void read_closed(grpc_call_element *elem) {
gpr_mu_unlock(&chand->server->mu);
}
+static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (md->key == chand->path_key) {
+ calld->path = grpc_mdstr_ref(md->value);
+ return NULL;
+ } else if (md->key == chand->authority_key) {
+ calld->host = grpc_mdstr_ref(md->value);
+ return NULL;
+ }
+ return md;
+}
+
static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
grpc_call_op *op) {
- channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_mdelem *md;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_RECV_METADATA:
- md = op->data.metadata;
- if (md->key == chand->path_key) {
- calld->path = grpc_mdstr_ref(md->value);
- grpc_mdelem_unref(md);
- } else if (md->key == chand->authority_key) {
- calld->host = grpc_mdstr_ref(md->value);
- grpc_mdelem_unref(md);
- } else {
- grpc_call_recv_metadata(elem, md);
+ grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
+ if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ calld->deadline = op->data.metadata.deadline;
+ start_new_rpc(elem);
}
break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- start_new_rpc(elem);
- grpc_call_initial_metadata_complete(elem);
- break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message);
op->done_cb(op->user_data, GRPC_OP_OK);
@@ -444,10 +447,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
case GRPC_RECV_FINISH:
stream_closed(elem);
break;
- case GRPC_RECV_DEADLINE:
- grpc_call_set_deadline(elem, op->data.deadline);
- ((call_data *)elem->call_data)->deadline = op->data.deadline;
- break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
grpc_call_next_op(elem, op);
@@ -464,7 +463,8 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_ACCEPT_CALL:
/* create a call */
grpc_call_create(chand->channel, NULL,
- op->data.accept_call.transport_server_data);
+ op->data.accept_call.transport_server_data, NULL, 0,
+ gpr_inf_future);
break;
case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 708bb06c7f..5ca31d6bc7 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -43,7 +43,7 @@
#include "src/core/transport/chttp2/timeout_encoding.h"
#include "src/core/transport/chttp2/varint.h"
-#define HASH_FRAGMENT_1(x) ((x) & 255)
+#define HASH_FRAGMENT_1(x) ((x)&255)
#define HASH_FRAGMENT_2(x) ((x >> 8) & 255)
#define HASH_FRAGMENT_3(x) ((x >> 16) & 255)
#define HASH_FRAGMENT_4(x) ((x >> 24) & 255)
@@ -479,10 +479,9 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
/* skip */
curop++;
break;
- case GRPC_OP_FLOW_CTL_CB:
- case GRPC_OP_DEADLINE:
case GRPC_OP_METADATA:
- case GRPC_OP_METADATA_BOUNDARY:
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ case GRPC_OP_FLOW_CTL_CB:
/* these just get copied as they don't impact the number of flow
controlled bytes */
grpc_sopb_append(outops, op, 1);
@@ -529,6 +528,12 @@ exit_loop:
*inops_count -= curop;
memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op));
+ for (curop = 0; curop < *inops_count; curop++) {
+ if (inops[curop].type == GRPC_OP_METADATA) {
+ grpc_metadata_batch_assert_ok(&inops[curop].data.metadata);
+ }
+ }
+
return flow_controlled_bytes_taken;
}
@@ -543,6 +548,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
gpr_uint32 curop = 0;
gpr_uint32 unref_op;
grpc_mdctx *mdctx = compressor->mdctx;
+ grpc_linked_mdelem *l;
int need_unref = 0;
GPR_ASSERT(stream_id != 0);
@@ -566,19 +572,19 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
curop++;
break;
case GRPC_OP_METADATA:
- /* Encode a metadata element; store the returned value, representing
+ /* Encode a metadata batch; store the returned values, representing
a metadata element that needs to be unreffed back into the metadata
slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
updated). After this loop, we'll do a batch unref of elements. */
- op->data.metadata = hpack_enc(compressor, op->data.metadata, &st);
- need_unref |= op->data.metadata != NULL;
- curop++;
- break;
- case GRPC_OP_DEADLINE:
- deadline_enc(compressor, op->data.deadline, &st);
- curop++;
- break;
- case GRPC_OP_METADATA_BOUNDARY:
+ need_unref |= op->data.metadata.garbage.head != NULL;
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ for (l = op->data.metadata.list.head; l; l = l->next) {
+ l->md = hpack_enc(compressor, l->md, &st);
+ need_unref |= l->md != NULL;
+ }
+ if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
+ deadline_enc(compressor, op->data.metadata.deadline, &st);
+ }
ensure_frame_type(&st, HEADER, 0);
finish_frame(&st, 1, 0);
st.last_was_header = 0; /* force a new header frame */
@@ -614,8 +620,12 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
for (unref_op = 0; unref_op < curop; unref_op++) {
op = &ops[unref_op];
if (op->type != GRPC_OP_METADATA) continue;
- if (!op->data.metadata) continue;
- grpc_mdctx_locked_mdelem_unref(mdctx, op->data.metadata);
+ for (l = op->data.metadata.list.head; l; l = l->next) {
+ if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ for (l = op->data.metadata.garbage.head; l; l = l->next) {
+ grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
}
grpc_mdctx_unlock(mdctx);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 995d64015a..e32ee284e0 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -68,10 +68,10 @@ int grpc_http_trace = 0;
typedef struct transport transport;
typedef struct stream stream;
-#define IF_TRACING(stmt) \
- if (!(grpc_http_trace)) \
- ; \
- else \
+#define IF_TRACING(stmt) \
+ if (!(grpc_http_trace)) \
+ ; \
+ else \
stmt
/* streams are kept in various linked lists depending on what things need to
@@ -292,6 +292,12 @@ struct stream {
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
+ /* incoming metadata */
+ grpc_linked_mdelem *incoming_metadata;
+ size_t incoming_metadata_count;
+ size_t incoming_metadata_capacity;
+ gpr_timespec incoming_deadline;
+
/* sops from application */
grpc_stream_op_buffer outgoing_sopb;
/* sops that have passed flow control to be written */
@@ -577,7 +583,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
lock(t);
s->id = 0;
} else {
- s->id = (gpr_uint32)(gpr_uintptr) server_data;
+ s->id = (gpr_uint32)(gpr_uintptr)server_data;
t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
}
@@ -593,6 +599,10 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->cancelled = 0;
s->allow_window_updates = 0;
s->published_close = 0;
+ s->incoming_metadata_count = 0;
+ s->incoming_metadata_capacity = 0;
+ s->incoming_metadata = NULL;
+ s->incoming_deadline = gpr_inf_future;
memset(&s->links, 0, sizeof(s->links));
memset(&s->included, 0, sizeof(s->included));
grpc_sopb_init(&s->outgoing_sopb);
@@ -698,7 +708,8 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
- if (id == PENDING_CALLBACKS) GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
+ if (id == PENDING_CALLBACKS)
+ GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@@ -760,7 +771,7 @@ static void unlock(transport *t) {
if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back = 1;
- t->cb = NULL; /* no more callbacks */
+ t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->num_pending_goaways) {
@@ -782,8 +793,7 @@ static void unlock(transport *t) {
/* perform some callbacks if necessary */
for (i = 0; i < num_goaways; i++) {
- cb->goaway(t->cb_user_data, &t->base, goaways[i].status,
- goaways[i].debug);
+ cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
}
if (perform_callbacks) {
@@ -1058,6 +1068,17 @@ static void finalize_cancellations(transport *t) {
}
}
+static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
+ if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
+ s->incoming_metadata_capacity =
+ GPR_MAX(8, 2 * s->incoming_metadata_capacity);
+ s->incoming_metadata =
+ gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) *
+ s->incoming_metadata_capacity);
+ }
+ s->incoming_metadata[s->incoming_metadata_count++].md = elem;
+}
+
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
@@ -1077,9 +1098,18 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
stream_list_join(t, s, CANCELLED);
gpr_ltoa(local_status, buffer);
- grpc_sopb_add_metadata(
- &s->parser.incoming_sopb,
+ add_incoming_metadata(
+ t, s,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
stream_list_join(t, s, PENDING_CALLBACKS);
}
@@ -1255,11 +1285,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
}
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
- grpc_sopb_add_deadline(&s->parser.incoming_sopb,
- gpr_time_add(gpr_now(), *cached_timeout));
+ s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
grpc_mdelem_unref(md);
} else {
- grpc_sopb_add_metadata(&s->parser.incoming_sopb, md);
+ add_incoming_metadata(t, s, md);
}
}
@@ -1304,7 +1333,7 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
t->incoming_stream = NULL;
/* if stream is accepted, we set incoming_stream in init_stream */
t->cb->accept_stream(t->cb_user_data, &t->base,
- (void *)(gpr_uintptr) t->incoming_stream_id);
+ (void *)(gpr_uintptr)t->incoming_stream_id);
s = t->incoming_stream;
if (!s) {
gpr_log(GPR_ERROR, "stream not accepted");
@@ -1435,6 +1464,35 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
+static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
+
+static void add_metadata_batch(transport *t, stream *s) {
+ grpc_metadata_batch b;
+ size_t i;
+
+ b.list.head = &s->incoming_metadata[0];
+ b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
+ b.garbage.head = b.garbage.tail = NULL;
+ b.deadline = s->incoming_deadline;
+
+ for (i = 1; i < s->incoming_metadata_count; i++) {
+ s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
+ s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
+ }
+ s->incoming_metadata[0].prev = NULL;
+ s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
+
+ grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
+ grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
+ s->incoming_metadata);
+
+ /* reset */
+ s->incoming_deadline = gpr_inf_future;
+ s->incoming_metadata = NULL;
+ s->incoming_metadata_count = 0;
+ s->incoming_metadata_capacity = 0;
+}
+
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
grpc_chttp2_parse_state st;
size_t i;
@@ -1449,8 +1507,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
}
if (st.metadata_boundary) {
- grpc_sopb_add_metadata_boundary(
- &t->incoming_stream->parser.incoming_sopb);
+ add_metadata_batch(t, t->incoming_stream);
stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
}
if (st.ack_settings) {
@@ -1580,8 +1637,8 @@ static int process_read(transport *t, gpr_slice slice) {
"Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
"at byte %d",
CLIENT_CONNECT_STRING[t->deframe_state],
- (int)(gpr_uint8) CLIENT_CONNECT_STRING[t->deframe_state],
- *cur, (int)*cur, t->deframe_state);
+ (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur,
+ (int)*cur, t->deframe_state);
drop_connection(t);
return 0;
}
@@ -1778,17 +1835,20 @@ static int prepare_callbacks(transport *t) {
int n = 0;
while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
int execute = 1;
- grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
if (s->callback_state == GRPC_STREAM_CLOSED) {
remove_from_stream_map(t, s);
if (s->published_close) {
execute = 0;
+ } else if (s->incoming_metadata_count) {
+ add_metadata_batch(t, s);
}
s->published_close = 1;
}
+ grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
+
if (execute) {
stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
n = 1;
@@ -1825,9 +1885,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
- sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- add_to_pollset, destroy_stream, abort_stream, goaway,
- close_transport, send_ping, destroy_transport};
+ sizeof(stream), init_stream, send_batch, set_allow_window_updates,
+ add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
+ send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 44f6591c95..74e94b2c24 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -120,7 +120,7 @@ static void unlock(grpc_mdctx *ctx) {
if (ctx->refs == 0) {
/* uncomment if you're having trouble diagnosing an mdelem leak to make
things clearer (slows down destruction a lot, however) */
- /* gc_mdtab(ctx); */
+ gc_mdtab(ctx);
if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) {
discard_metadata(ctx);
}
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index c30e3a27f1..882c078d51 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -33,11 +33,11 @@
#include "src/core/transport/stream_op.h"
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <string.h>
-
/* Exponential growth function: Given x, return a larger x.
Currently we grow by 1.5 times upon reallocation. */
#define GROW(x) (3 * (x) / 2)
@@ -79,33 +79,46 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
gpr_slice_unref(ops[i].data.slice);
break;
case GRPC_OP_METADATA:
- grpc_mdelem_unref(ops[i].data.metadata);
+ grpc_metadata_batch_destroy(&ops[i].data.metadata);
break;
case GRPC_OP_FLOW_CTL_CB:
ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR);
break;
case GRPC_NO_OP:
- case GRPC_OP_DEADLINE:
- case GRPC_OP_METADATA_BOUNDARY:
case GRPC_OP_BEGIN_MESSAGE:
break;
}
}
}
+static void assert_contained_metadata_ok(grpc_stream_op *ops, size_t nops) {
+#ifndef NDEBUG
+ size_t i;
+ for (i = 0; i < nops; i++) {
+ if (ops[i].type == GRPC_OP_METADATA) {
+ grpc_metadata_batch_assert_ok(&ops[i].data.metadata);
+ }
+ }
+#endif /* NDEBUG */
+}
+
static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) {
sopb->capacity = new_capacity;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
if (sopb->ops == sopb->inlined_ops) {
sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity);
memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op));
} else {
sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity);
}
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
grpc_stream_op *out;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
+
if (sopb->nops == sopb->capacity) {
expandto(sopb, GROW(sopb->capacity));
}
@@ -116,6 +129,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) {
add(sopb)->type = GRPC_NO_OP;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
@@ -124,30 +138,24 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
op->type = GRPC_OP_BEGIN_MESSAGE;
op->data.begin_message.length = length;
op->data.begin_message.flags = flags;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
-void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_METADATA_BOUNDARY;
-}
-
-void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *md) {
+void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb,
+ grpc_metadata_batch b) {
grpc_stream_op *op = add(sopb);
+ grpc_metadata_batch_assert_ok(&b);
op->type = GRPC_OP_METADATA;
- op->data.metadata = md;
-}
-
-void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb,
- gpr_timespec deadline) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_DEADLINE;
- op->data.deadline = deadline;
+ op->data.metadata = b;
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
grpc_stream_op *op = add(sopb);
op->type = GRPC_OP_SLICE;
op->data.slice = slice;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
@@ -157,6 +165,7 @@ void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
op->type = GRPC_OP_FLOW_CTL_CB;
op->data.flow_ctl_cb.cb = cb;
op->data.flow_ctl_cb.arg = arg;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
@@ -164,10 +173,161 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t orig_nops = sopb->nops;
size_t new_nops = orig_nops + nops;
+ assert_contained_metadata_ok(ops, nops);
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
if (new_nops > sopb->capacity) {
expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops));
}
memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops);
sopb->nops = new_nops;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
+}
+
+static void assert_valid_list(grpc_mdelem_list *list) {
+#ifndef NDEBUG
+ grpc_linked_mdelem *l;
+
+ GPR_ASSERT((list->head == NULL) == (list->tail == NULL));
+ if (!list->head) return;
+ GPR_ASSERT(list->head->prev == NULL);
+ GPR_ASSERT(list->tail->next == NULL);
+ GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL));
+
+ for (l = list->head; l; l = l->next) {
+ GPR_ASSERT(l->md);
+ GPR_ASSERT((l->prev == NULL) == (l == list->head));
+ GPR_ASSERT((l->next == NULL) == (l == list->tail));
+ if (l->next) GPR_ASSERT(l->next->prev == l);
+ if (l->prev) GPR_ASSERT(l->prev->next == l);
+ }
+#endif /* NDEBUG */
+}
+
+#ifndef NDEBUG
+void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd) {
+ assert_valid_list(&comd->list);
+ assert_valid_list(&comd->garbage);
+}
+#endif /* NDEBUG */
+
+void grpc_metadata_batch_init(grpc_metadata_batch *comd) {
+ comd->list.head = comd->list.tail = comd->garbage.head = comd->garbage.tail =
+ NULL;
+ comd->deadline = gpr_inf_future;
+}
+
+void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) {
+ grpc_linked_mdelem *l;
+ for (l = comd->list.head; l; l = l->next) {
+ grpc_mdelem_unref(l->md);
+ }
+ for (l = comd->garbage.head; l; l = l->next) {
+ grpc_mdelem_unref(l->md);
+ }
+}
+
+void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
+ grpc_linked_mdelem *storage,
+ grpc_mdelem *elem_to_add) {
+ GPR_ASSERT(elem_to_add);
+ storage->md = elem_to_add;
+ grpc_metadata_batch_link_head(comd, storage);
+}
+
+static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
+ assert_valid_list(list);
+ GPR_ASSERT(storage->md);
+ storage->prev = NULL;
+ storage->next = list->head;
+ if (list->head != NULL) {
+ list->head->prev = storage;
+ } else {
+ list->tail = storage;
+ }
+ list->head = storage;
+ assert_valid_list(list);
+}
+
+void grpc_metadata_batch_link_head(grpc_metadata_batch *comd,
+ grpc_linked_mdelem *storage) {
+ link_head(&comd->list, storage);
+}
+
+void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd,
+ grpc_linked_mdelem *storage,
+ grpc_mdelem *elem_to_add) {
+ GPR_ASSERT(elem_to_add);
+ storage->md = elem_to_add;
+ grpc_metadata_batch_link_tail(comd, storage);
+}
+
+static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
+ assert_valid_list(list);
+ GPR_ASSERT(storage->md);
+ storage->prev = list->tail;
+ storage->next = NULL;
+ if (list->tail != NULL) {
+ list->tail->next = storage;
+ } else {
+ list->head = storage;
+ }
+ list->tail = storage;
+ assert_valid_list(list);
+}
+
+void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
+ grpc_linked_mdelem *storage) {
+ link_tail(&comd->list, storage);
+}
+
+void grpc_metadata_batch_merge(grpc_metadata_batch *target,
+ grpc_metadata_batch *add) {
+ grpc_linked_mdelem *l;
+ grpc_linked_mdelem *next;
+ for (l = add->list.head; l; l = next) {
+ next = l->next;
+ link_tail(&target->list, l);
+ }
+ for (l = add->garbage.head; l; l = next) {
+ next = l->next;
+ link_tail(&target->garbage, l);
+ }
+}
+
+void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
+ grpc_mdelem *(*filter)(void *user_data,
+ grpc_mdelem *elem),
+ void *user_data) {
+ grpc_linked_mdelem *l;
+ grpc_linked_mdelem *next;
+
+ assert_valid_list(&comd->list);
+ assert_valid_list(&comd->garbage);
+ for (l = comd->list.head; l; l = next) {
+ grpc_mdelem *orig = l->md;
+ grpc_mdelem *filt = filter(user_data, orig);
+ next = l->next;
+ if (filt == NULL) {
+ if (l->prev) {
+ l->prev->next = l->next;
+ }
+ if (l->next) {
+ l->next->prev = l->prev;
+ }
+ if (comd->list.head == l) {
+ comd->list.head = l->next;
+ }
+ if (comd->list.tail == l) {
+ comd->list.tail = l->prev;
+ }
+ assert_valid_list(&comd->list);
+ link_head(&comd->garbage, l);
+ } else if (filt != orig) {
+ grpc_mdelem_unref(orig);
+ l->md = filt;
+ }
+ }
+ assert_valid_list(&comd->list);
+ assert_valid_list(&comd->garbage);
}
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 2ffbcce87b..20146b9af2 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -50,8 +50,6 @@ typedef enum grpc_stream_op_code {
Must be ignored by receivers */
GRPC_NO_OP,
GRPC_OP_METADATA,
- GRPC_OP_DEADLINE,
- GRPC_OP_METADATA_BOUNDARY,
/* Begin a message/metadata element/status - as defined by
grpc_message_type. */
GRPC_OP_BEGIN_MESSAGE,
@@ -76,6 +74,51 @@ typedef struct grpc_flow_ctl_cb {
void *arg;
} grpc_flow_ctl_cb;
+typedef struct grpc_linked_mdelem {
+ grpc_mdelem *md;
+ struct grpc_linked_mdelem *next;
+ struct grpc_linked_mdelem *prev;
+} grpc_linked_mdelem;
+
+typedef struct grpc_mdelem_list {
+ grpc_linked_mdelem *head;
+ grpc_linked_mdelem *tail;
+} grpc_mdelem_list;
+
+typedef struct grpc_metadata_batch {
+ grpc_mdelem_list list;
+ grpc_mdelem_list garbage;
+ gpr_timespec deadline;
+} grpc_metadata_batch;
+
+void grpc_metadata_batch_init(grpc_metadata_batch *comd);
+void grpc_metadata_batch_destroy(grpc_metadata_batch *comd);
+void grpc_metadata_batch_merge(grpc_metadata_batch *target,
+ grpc_metadata_batch *add);
+
+void grpc_metadata_batch_link_head(grpc_metadata_batch *comd,
+ grpc_linked_mdelem *storage);
+void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
+ grpc_linked_mdelem *storage);
+
+void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
+ grpc_linked_mdelem *storage,
+ grpc_mdelem *elem_to_add);
+void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd,
+ grpc_linked_mdelem *storage,
+ grpc_mdelem *elem_to_add);
+
+void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
+ grpc_mdelem *(*filter)(void *user_data,
+ grpc_mdelem *elem),
+ void *user_data);
+
+#ifndef NDEBUG
+void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd);
+#else
+#define grpc_metadata_batch_assert_ok(comd) do {} while (0)
+#endif
+
/* Represents a single operation performed on a stream/transport */
typedef struct grpc_stream_op {
/* the operation to be applied */
@@ -84,8 +127,7 @@ typedef struct grpc_stream_op {
associated op-code */
union {
grpc_begin_message begin_message;
- grpc_mdelem *metadata;
- gpr_timespec deadline;
+ grpc_metadata_batch metadata;
gpr_slice slice;
grpc_flow_ctl_cb flow_ctl_cb;
} data;
@@ -118,9 +160,7 @@ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
/* Append a GRPC_OP_BEGIN to a buffer */
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
gpr_uint32 flags);
-void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *metadata);
-void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb, gpr_timespec deadline);
-void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb);
+void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */