aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-17 14:57:44 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-17 14:57:44 -0700
commit8b282cbd0b1df8ae07b563b1a11eecb8e3bbe0a6 (patch)
tree4d4c653b8f351a0ec3bb332403e6c68b2451a6b8 /src/core
parent9c1043e757a050b789c21498c9c6984133a04d8e (diff)
Got rid of GRPC_SEND_START
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/call_op_string.c11
-rw-r--r--src/core/channel/channel_stack.c14
-rw-r--r--src/core/channel/channel_stack.h13
-rw-r--r--src/core/channel/client_channel.c61
-rw-r--r--src/core/channel/connected_channel.c9
-rw-r--r--src/core/channel/http_server_filter.c7
-rw-r--r--src/core/security/auth.c5
-rw-r--r--src/core/surface/call.c36
-rw-r--r--src/core/surface/call.h2
-rw-r--r--src/core/surface/client.c3
-rw-r--r--src/core/surface/lame_client.c5
-rw-r--r--src/core/transport/chttp2/stream_encoder.c9
-rw-r--r--src/core/transport/stream_op.c58
13 files changed, 142 insertions, 91 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c
index 3e30d822f4..12e9f03b41 100644
--- a/src/core/channel/call_op_string.c
+++ b/src/core/channel/call_op_string.c
@@ -86,10 +86,6 @@ char *grpc_call_op_string(grpc_call_op *op) {
gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
put_metadata_list(&b, op->data.metadata);
break;
- case GRPC_SEND_START:
- gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset);
- gpr_strvec_add(&b, tmp);
- break;
case GRPC_SEND_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
break;
@@ -115,12 +111,19 @@ char *grpc_call_op_string(grpc_call_op *op) {
case GRPC_RECV_FINISH:
gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
break;
+ case GRPC_RECV_SYNTHETIC_STATUS:
+ gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'", op->data.synthetic_status.status, op->data.synthetic_status.message);
+ gpr_strvec_add(&b, tmp);
+ break;
case GRPC_CANCEL_OP:
gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
break;
}
gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
gpr_strvec_add(&b, tmp);
+ if (op->bind_pollset) {
+ gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
+ }
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index cd3496b42b..3a3a3a75b7 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -183,6 +183,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
grpc_call_element *next_elem = elem + op->dir;
+ if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) {
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ }
next_elem->filter->call_op(next_elem, elem, op);
}
@@ -211,6 +214,7 @@ void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
cancel_op.done_cb = do_nothing;
cancel_op.user_data = NULL;
cancel_op.flags = 0;
+ cancel_op.bind_pollset = NULL;
grpc_call_next_op(cur_elem, &cancel_op);
}
@@ -221,11 +225,19 @@ void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
finish_op.flags = 0;
+ finish_op.bind_pollset = NULL;
grpc_call_next_op(cur_elem, &finish_op);
}
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message) {
- abort();
+ grpc_call_op op;
+ op.type = GRPC_RECV_SYNTHETIC_STATUS;
+ op.dir = GRPC_CALL_UP;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ op.data.synthetic_status.status = status;
+ op.data.synthetic_status.message = message;
+ grpc_call_next_op(cur_elem, &op);
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index cc1eab7cf2..addc92b272 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -62,8 +62,6 @@ typedef struct grpc_call_element grpc_call_element;
typedef enum {
/* send metadata to the channels peer */
GRPC_SEND_METADATA,
- /* start a connection (corresponds to start_invoke/accept) */
- GRPC_SEND_START,
/* send a message to the channels peer */
GRPC_SEND_MESSAGE,
/* send a pre-formatted message to the channels peer */
@@ -80,6 +78,8 @@ typedef enum {
GRPC_RECV_HALF_CLOSE,
/* full close was received from the channels peer */
GRPC_RECV_FINISH,
+ /* a status has been sythesized locally */
+ GRPC_RECV_SYNTHETIC_STATUS,
/* the call has been abnormally terminated */
GRPC_CANCEL_OP
} grpc_call_op_type;
@@ -103,13 +103,16 @@ typedef struct {
/* Argument data, matching up with grpc_call_op_type names */
union {
- struct {
- grpc_pollset *pollset;
- } start;
grpc_byte_buffer *message;
grpc_metadata_batch metadata;
+ struct {
+ grpc_status_code status;
+ const char *message;
+ } synthetic_status;
} data;
+ grpc_pollset *bind_pollset;
+
/* Must be called when processing of this call-op is complete.
Signature chosen to match transport flow control callbacks */
void (*done_cb)(void *user_data, grpc_op_error error);
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 018228b4e6..fb984f2878 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -82,21 +82,16 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
+ gpr_uint8 got_first_send;
+
call_state state;
- grpc_metadata_batch pending_metadata;
- gpr_uint32 pending_metadata_flags;
gpr_timespec deadline;
union {
struct {
/* our child call stack */
grpc_child_call *child_call;
} active;
- struct {
- void (*on_complete)(void *user_data, grpc_op_error error);
- void *on_complete_user_data;
- gpr_uint32 start_flags;
- grpc_pollset *pollset;
- } waiting;
+ grpc_call_op waiting_op;
} s;
};
@@ -121,19 +116,9 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
- grpc_call_op mop;
GPR_ASSERT(calld->state == CALL_ACTIVE);
- /* sending buffered metadata down the stack before the start call */
- mop.type = GRPC_SEND_METADATA;
- mop.dir = GRPC_CALL_DOWN;
- mop.flags = calld->pending_metadata_flags;
- mop.data.metadata = calld->pending_metadata;
- mop.done_cb = do_nothing;
- mop.user_data = NULL;
- child_elem->filter->call_op(child_elem, elem, &mop);
-
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
child_elem->filter->call_op(child_elem, elem, op);
@@ -177,10 +162,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
gpr_realloc(chand->waiting_children,
chand->waiting_child_capacity * sizeof(call_data *));
}
- calld->s.waiting.on_complete = op->done_cb;
- calld->s.waiting.on_complete_user_data = op->user_data;
- calld->s.waiting.start_flags = op->flags;
- calld->s.waiting.pollset = op->data.start.pollset;
+ calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld;
gpr_mu_unlock(&chand->mu);
@@ -233,7 +215,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
send_up_cancelled_ops(elem);
- calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
+ calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data,
GRPC_OP_ERROR);
return; /* early out */
case CALL_CREATED:
@@ -257,12 +239,13 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_SEND_METADATA:
- grpc_metadata_batch_merge(&calld->pending_metadata, &op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_SEND_START:
- /* filter out the start event to find which child to send on */
- start_rpc(elem, op);
+ if (!calld->got_first_send) {
+ /* filter out the start event to find which child to send on */
+ calld->got_first_send = 1;
+ start_rpc(elem, op);
+ } else {
+ grpc_call_next_op(elem, op);
+ }
break;
case GRPC_CANCEL_OP:
cancel_rpc(elem, op);
@@ -365,12 +348,6 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void error_bad_on_complete(void *arg, grpc_op_error error) {
- gpr_log(GPR_ERROR,
- "Waiting finished but not started? Bad on_complete callback");
- abort();
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data) {
@@ -381,17 +358,13 @@ static void init_call_elem(grpc_call_element *elem,
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->s.waiting.on_complete = error_bad_on_complete;
- calld->s.waiting.on_complete_user_data = NULL;
- grpc_metadata_batch_init(&calld->pending_metadata);
+ calld->got_first_send = 0;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- /* if the metadata buffer is not flushed, destroy it here. */
- grpc_metadata_batch_destroy(&calld->pending_metadata);
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
@@ -498,13 +471,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
- call_ops[i].type = GRPC_SEND_START;
- call_ops[i].dir = GRPC_CALL_DOWN;
- call_ops[i].flags = waiting_children[i]->s.waiting.start_flags;
- call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
- call_ops[i].user_data =
- waiting_children[i]->s.waiting.on_complete_user_data;
- call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
+ call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 17abba06be..84d1f3569d 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -119,14 +119,13 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ if (op->bind_pollset) {
+ grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
+ }
+
switch (op->type) {
case GRPC_SEND_METADATA:
grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
- op->user_data);
- break;
- case GRPC_SEND_START:
- grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
end_bufferable_op(op, chand, calld, 0);
break;
case GRPC_SEND_MESSAGE:
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 0e50f408b5..77f204c7a7 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -188,8 +188,11 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
case GRPC_SEND_METADATA:
/* If we haven't sent status 200 yet, we need to so so because it needs to
come before any non : prefixed metadata. */
- grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
- grpc_mdelem_ref(channeld->status_ok));
+ if (!calld->sent_status) {
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+ grpc_mdelem_ref(channeld->status_ok));
+ }
grpc_call_next_op(elem, op);
break;
default:
diff --git a/src/core/security/auth.c b/src/core/security/auth.c
index f61287ad3f..0b9d566c90 100644
--- a/src/core/security/auth.c
+++ b/src/core/security/auth.c
@@ -175,10 +175,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
calld->method = grpc_mdstr_ref(md->value);
}
}
- grpc_call_next_op(elem, op);
- break;
-
- case GRPC_SEND_START:
if (calld->host != NULL) {
grpc_security_status status;
const char *call_host = grpc_mdstr_as_c_string(calld->host);
@@ -200,7 +196,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
}
send_security_metadata(elem, op);
break;
-
default:
/* pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5facaa503d..0d5fe225d9 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -94,6 +94,8 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
@@ -363,6 +365,7 @@ static void request_more_data(grpc_call *call) {
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
}
@@ -660,15 +663,9 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_metadata_batch_link_head(&op.data.metadata,
&call->send_initial_metadata[i]);
}
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
- op.type = GRPC_SEND_START;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.start.pollset = grpc_cq_pollset(call->cq);
op.done_cb = finish_start_step;
op.user_data = call;
+ op.bind_pollset = grpc_cq_pollset(call->cq);
grpc_call_execute_op(call, &op);
break;
case SEND_BUFFERED_MESSAGE:
@@ -682,6 +679,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.data.message = data.send_message;
op.done_cb = finish_write_step;
op.user_data = call;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
case SEND_TRAILING_METADATA_AND_FINISH:
@@ -694,6 +692,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
call, data.send_metadata.count, data.send_metadata.metadata);
op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
op.data.metadata.deadline = call->send_deadline;
+ op.bind_pollset = NULL;
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
@@ -723,6 +722,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.flags = 0;
op.done_cb = finish_finish_step;
op.user_data = call;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
}
@@ -876,6 +876,7 @@ grpc_call_error grpc_call_cancel(grpc_call *c) {
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
+ op.bind_pollset = NULL;
elem = CALL_ELEM_FROM_CALL(c, 0);
elem->filter->call_op(elem, NULL, &op);
@@ -983,6 +984,14 @@ void grpc_call_recv_message(grpc_call_element *elem,
unlock(call);
}
+void grpc_call_recv_synthetic_status(grpc_call_element *elem, grpc_status_code status, const char *message) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ set_status_code(call, STATUS_FROM_CORE, status);
+ set_status_details(call, STATUS_FROM_CORE, grpc_mdstr_from_string(call->metadata_context, message));
+ unlock(call);
+}
+
int grpc_call_recv_metadata(grpc_call_element *elem,
grpc_metadata_batch *md) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
@@ -990,6 +999,7 @@ int grpc_call_recv_metadata(grpc_call_element *elem,
grpc_metadata_array *dest;
grpc_metadata *mdusr;
int is_trailing;
+ grpc_mdctx *mdctx = call->metadata_context;
lock(call);
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
@@ -998,10 +1008,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem,
grpc_mdstr *key = md->key;
if (key == grpc_channel_get_status_string(call->channel)) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
- grpc_mdelem_unref(md);
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
- grpc_mdelem_unref(md);
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1022,6 +1030,7 @@ int grpc_call_recv_metadata(grpc_call_element *elem,
sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
}
call->owned_metadata[call->owned_metadata_count++] = md;
+ l->md = 0;
}
}
if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
@@ -1032,6 +1041,15 @@ int grpc_call_recv_metadata(grpc_call_element *elem,
}
unlock(call);
+ grpc_mdctx_lock(mdctx);
+ for (l = md->list.head; l; l = l->next) {
+ if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ for (l = md->garbage.head; l; l = l->next) {
+ grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ grpc_mdctx_unlock(mdctx);
+
return !is_trailing;
}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 2848ccc5a1..16de197af1 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -113,6 +113,8 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
+void grpc_call_recv_synthetic_status(grpc_call_element *elem, grpc_status_code status, const char *message);
+
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index b987c2de5f..719346689a 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -61,6 +61,9 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
case GRPC_RECV_FINISH:
grpc_call_stream_closed(elem);
break;
+ case GRPC_RECV_SYNTHETIC_STATUS:
+ grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status, op->data.synthetic_status.message);
+ break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
grpc_call_next_op(elem, op);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 74e9628385..b7b3d92958 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -51,14 +51,11 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
- case GRPC_SEND_START:
+ case GRPC_SEND_METADATA:
grpc_call_element_recv_status(elem, GRPC_STATUS_UNKNOWN,
"Rpc sent on a lame channel.");
grpc_call_stream_closed(elem);
break;
- case GRPC_SEND_METADATA:
- abort();
- break;
default:
break;
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 8c66171b07..f50d1428ab 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -479,8 +479,9 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
/* skip */
curop++;
break;
- case GRPC_OP_FLOW_CTL_CB:
case GRPC_OP_METADATA:
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ case GRPC_OP_FLOW_CTL_CB:
/* these just get copied as they don't impact the number of flow
controlled bytes */
grpc_sopb_append(outops, op, 1);
@@ -527,6 +528,12 @@ exit_loop:
*inops_count -= curop;
memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op));
+ for (curop = 0; curop < *inops_count; curop++) {
+ if (inops[curop].type == GRPC_OP_METADATA) {
+ grpc_metadata_batch_assert_ok(&inops[curop].data.metadata);
+ }
+ }
+
return flow_controlled_bytes_taken;
}
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 1a8b4174ff..9589599257 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -33,11 +33,11 @@
#include "src/core/transport/stream_op.h"
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <string.h>
-
/* Exponential growth function: Given x, return a larger x.
Currently we grow by 1.5 times upon reallocation. */
#define GROW(x) (3 * (x) / 2)
@@ -91,19 +91,32 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
}
}
+static void assert_contained_metadata_ok(grpc_stream_op *ops, size_t nops) {
+ size_t i;
+ for (i = 0; i < nops; i++) {
+ if (ops[i].type == GRPC_OP_METADATA) {
+ grpc_metadata_batch_assert_ok(&ops[i].data.metadata);
+ }
+ }
+}
+
static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) {
sopb->capacity = new_capacity;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
if (sopb->ops == sopb->inlined_ops) {
sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity);
memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op));
} else {
sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity);
}
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
grpc_stream_op *out;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
+
if (sopb->nops == sopb->capacity) {
expandto(sopb, GROW(sopb->capacity));
}
@@ -114,6 +127,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) {
add(sopb)->type = GRPC_NO_OP;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
@@ -122,6 +136,7 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
op->type = GRPC_OP_BEGIN_MESSAGE;
op->data.begin_message.length = length;
op->data.begin_message.flags = flags;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b) {
@@ -129,12 +144,15 @@ void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b)
grpc_metadata_batch_assert_ok(&b);
op->type = GRPC_OP_METADATA;
op->data.metadata = b;
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
grpc_stream_op *op = add(sopb);
op->type = GRPC_OP_SLICE;
op->data.slice = slice;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
@@ -144,6 +162,7 @@ void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
op->type = GRPC_OP_FLOW_CTL_CB;
op->data.flow_ctl_cb.cb = cb;
op->data.flow_ctl_cb.arg = arg;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
@@ -151,12 +170,15 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t orig_nops = sopb->nops;
size_t new_nops = orig_nops + nops;
+ assert_contained_metadata_ok(ops, nops);
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
if (new_nops > sopb->capacity) {
expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops));
}
memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops);
sopb->nops = new_nops;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
@@ -183,13 +205,19 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd) {
assert_valid_list(&comd->garbage);
}
-void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); }
-
-void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); }
+void grpc_metadata_batch_init(grpc_metadata_batch *comd) {
+ comd->list.head = comd->list.tail = comd->garbage.head = comd->garbage.tail = NULL;
+ comd->deadline = gpr_inf_future;
+}
-void grpc_metadata_batch_merge(grpc_metadata_batch *target,
- grpc_metadata_batch *add) {
- abort();
+void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) {
+ grpc_linked_mdelem *l;
+ for (l = comd->list.head; l; l = l->next) {
+ grpc_mdelem_unref(l->md);
+ }
+ for (l = comd->garbage.head; l; l = l->next) {
+ grpc_mdelem_unref(l->md);
+ }
}
void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
@@ -246,6 +274,20 @@ void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
link_tail(&comd->list, storage);
}
+void grpc_metadata_batch_merge(grpc_metadata_batch *target,
+ grpc_metadata_batch *add) {
+ grpc_linked_mdelem *l;
+ grpc_linked_mdelem *next;
+ for (l = add->list.head; l; l = next) {
+ next = l->next;
+ link_tail(&target->list, l);
+ }
+ for (l = add->garbage.head; l; l = next) {
+ next = l->next;
+ link_tail(&target->garbage, l);
+ }
+}
+
void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
grpc_mdelem *(*filter)(void *user_data,
grpc_mdelem *elem),