aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-17 14:57:44 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-17 14:57:44 -0700
commit8b282cbd0b1df8ae07b563b1a11eecb8e3bbe0a6 (patch)
tree4d4c653b8f351a0ec3bb332403e6c68b2451a6b8 /src/core/transport
parent9c1043e757a050b789c21498c9c6984133a04d8e (diff)
Got rid of GRPC_SEND_START
Diffstat (limited to 'src/core/transport')
-rw-r--r--src/core/transport/chttp2/stream_encoder.c9
-rw-r--r--src/core/transport/stream_op.c58
2 files changed, 58 insertions, 9 deletions
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 8c66171b07..f50d1428ab 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -479,8 +479,9 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
/* skip */
curop++;
break;
- case GRPC_OP_FLOW_CTL_CB:
case GRPC_OP_METADATA:
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ case GRPC_OP_FLOW_CTL_CB:
/* these just get copied as they don't impact the number of flow
controlled bytes */
grpc_sopb_append(outops, op, 1);
@@ -527,6 +528,12 @@ exit_loop:
*inops_count -= curop;
memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op));
+ for (curop = 0; curop < *inops_count; curop++) {
+ if (inops[curop].type == GRPC_OP_METADATA) {
+ grpc_metadata_batch_assert_ok(&inops[curop].data.metadata);
+ }
+ }
+
return flow_controlled_bytes_taken;
}
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 1a8b4174ff..9589599257 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -33,11 +33,11 @@
#include "src/core/transport/stream_op.h"
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <string.h>
-
/* Exponential growth function: Given x, return a larger x.
Currently we grow by 1.5 times upon reallocation. */
#define GROW(x) (3 * (x) / 2)
@@ -91,19 +91,32 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
}
}
+static void assert_contained_metadata_ok(grpc_stream_op *ops, size_t nops) {
+ size_t i;
+ for (i = 0; i < nops; i++) {
+ if (ops[i].type == GRPC_OP_METADATA) {
+ grpc_metadata_batch_assert_ok(&ops[i].data.metadata);
+ }
+ }
+}
+
static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) {
sopb->capacity = new_capacity;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
if (sopb->ops == sopb->inlined_ops) {
sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity);
memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op));
} else {
sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity);
}
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
grpc_stream_op *out;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
+
if (sopb->nops == sopb->capacity) {
expandto(sopb, GROW(sopb->capacity));
}
@@ -114,6 +127,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) {
add(sopb)->type = GRPC_NO_OP;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
@@ -122,6 +136,7 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
op->type = GRPC_OP_BEGIN_MESSAGE;
op->data.begin_message.length = length;
op->data.begin_message.flags = flags;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b) {
@@ -129,12 +144,15 @@ void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b)
grpc_metadata_batch_assert_ok(&b);
op->type = GRPC_OP_METADATA;
op->data.metadata = b;
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
grpc_stream_op *op = add(sopb);
op->type = GRPC_OP_SLICE;
op->data.slice = slice;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
@@ -144,6 +162,7 @@ void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
op->type = GRPC_OP_FLOW_CTL_CB;
op->data.flow_ctl_cb.cb = cb;
op->data.flow_ctl_cb.arg = arg;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
@@ -151,12 +170,15 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t orig_nops = sopb->nops;
size_t new_nops = orig_nops + nops;
+ assert_contained_metadata_ok(ops, nops);
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
if (new_nops > sopb->capacity) {
expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops));
}
memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops);
sopb->nops = new_nops;
+ assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
@@ -183,13 +205,19 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd) {
assert_valid_list(&comd->garbage);
}
-void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); }
-
-void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); }
+void grpc_metadata_batch_init(grpc_metadata_batch *comd) {
+ comd->list.head = comd->list.tail = comd->garbage.head = comd->garbage.tail = NULL;
+ comd->deadline = gpr_inf_future;
+}
-void grpc_metadata_batch_merge(grpc_metadata_batch *target,
- grpc_metadata_batch *add) {
- abort();
+void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) {
+ grpc_linked_mdelem *l;
+ for (l = comd->list.head; l; l = l->next) {
+ grpc_mdelem_unref(l->md);
+ }
+ for (l = comd->garbage.head; l; l = l->next) {
+ grpc_mdelem_unref(l->md);
+ }
}
void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
@@ -246,6 +274,20 @@ void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
link_tail(&comd->list, storage);
}
+void grpc_metadata_batch_merge(grpc_metadata_batch *target,
+ grpc_metadata_batch *add) {
+ grpc_linked_mdelem *l;
+ grpc_linked_mdelem *next;
+ for (l = add->list.head; l; l = next) {
+ next = l->next;
+ link_tail(&target->list, l);
+ }
+ for (l = add->garbage.head; l; l = next) {
+ next = l->next;
+ link_tail(&target->garbage, l);
+ }
+}
+
void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
grpc_mdelem *(*filter)(void *user_data,
grpc_mdelem *elem),