aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-22 11:14:26 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-22 11:14:26 -0700
commit629b0ed8041f96968e8e61c2b4992d08a38cf28a (patch)
tree8b07c219b83ce999efcf8db9a81f388007ec4ba4
parent77979b08607a1e408fdbd8b0445dee3667742161 (diff)
Call compiles
-rw-r--r--src/core/channel/connected_channel.c39
-rw-r--r--src/core/surface/call.c263
-rw-r--r--src/core/surface/call.h14
-rw-r--r--src/core/surface/channel.c23
-rw-r--r--src/core/surface/channel.h1
-rw-r--r--src/core/transport/stream_op.h28
-rw-r--r--src/core/transport/transport.h15
7 files changed, 246 insertions, 137 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index d0b834a10a..9e2d92ffbc 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -45,8 +45,6 @@
#include <grpc/support/slice_buffer.h>
#define MAX_BUFFER_LENGTH 8192
-/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
typedef struct connected_channel_channel_data {
grpc_transport *transport;
@@ -63,24 +61,6 @@ typedef struct connected_channel_call_data {
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
(((call_data *)(transport_stream)) - 1)
-#if 0
-/* Copy the contents of a byte buffer into stream ops */
-static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
- grpc_stream_op_buffer *sopb) {
- size_t i;
-
- switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
- gpr_slice_ref(slice);
- grpc_sopb_add_slice(sopb, slice);
- }
- break;
- }
-}
-#endif
-
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
@@ -145,25 +125,6 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
-
-#if 0
- cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
- if (args) {
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
- if (args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else if (args->args[i].value.integer < 0) {
- gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else {
- cd->max_message_length = args->args[i].value.integer;
- }
- }
- }
- }
-#endif
}
/* Destructor for channel_data */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index fb2efac74e..7fcf6e2b04 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -144,12 +144,14 @@ struct grpc_call {
gpr_uint8 have_alarm;
/* are we currently performing a send operation */
gpr_uint8 sending;
+ /* are we currently performing a recv operation */
+ gpr_uint8 receiving;
/* are we currently completing requests */
gpr_uint8 completing;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
- /* flag that we need to request more data */
- gpr_uint8 need_more_data;
+ /* are we currently reading a message? */
+ gpr_uint8 reading_message;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint16 last_send_contains;
@@ -221,6 +223,9 @@ struct grpc_call {
grpc_stream_op_buffer recv_ops;
grpc_stream_state recv_state;
+ gpr_slice_buffer incoming_message;
+ gpr_uint32 incoming_message_length;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@@ -246,6 +251,8 @@ static void call_on_done_recv(void *call, int success);
static void call_on_done_send(void *call, int success);
static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
static void execute_op(grpc_call *call, grpc_transport_op *op);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
+static void finish_read_ops(grpc_call *call);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -378,6 +385,15 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
+static int need_more_data(grpc_call *call) {
+ return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
+ is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
+ is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
+ is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
+ is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
+ is_op_live(call, GRPC_IOREQ_RECV_CLOSE);
+}
+
static void unlock(grpc_call *call) {
grpc_transport_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
@@ -387,13 +403,14 @@ static void unlock(grpc_call *call) {
memset(&op, 0, sizeof(op));
- if (call->need_more_data &&
- (call->write_state >= WRITE_STATE_STARTED || !call->is_client)) {
+ if (!call->receiving &&
+ (call->write_state >= WRITE_STATE_STARTED || !call->is_client) &&
+ need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
op.on_done_recv = call_on_done_recv;
op.recv_user_data = call;
- call->need_more_data = 0;
+ call->receiving = 1;
grpc_call_internal_ref(call);
start_op = 1;
}
@@ -570,6 +587,121 @@ static void call_on_done_send(void *pc, int success) {
grpc_call_internal_unref(call, 0);
}
+static void finish_message(grpc_call *call) {
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
+ call->incoming_message.slices, call->incoming_message.count);
+ gpr_slice_buffer_reset_and_unref(&call->incoming_message);
+
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+
+ GPR_ASSERT(call->incoming_message.count == 0);
+ call->reading_message = 0;
+}
+
+static int begin_message(grpc_call *call, grpc_begin_message msg) {
+ /* can't begin a message when we're still reading a message */
+ if (call->reading_message) {
+ char *message = NULL;
+ gpr_asprintf(
+ &message, "Message terminated early; read %d bytes, expected %d",
+ (int)call->incoming_message.length, (int)call->incoming_message_length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ }
+ /* stash away parameters, and prepare for incoming slices */
+ if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
+ char *message = NULL;
+ gpr_asprintf(
+ &message,
+ "Maximum message length of %d exceeded by a message of length %d",
+ grpc_channel_get_max_message_length(call->channel), msg.length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ } else if (msg.length > 0) {
+ call->reading_message = 1;
+ call->incoming_message_length = msg.length;
+ return 1;
+ } else {
+ finish_message(call);
+ return 1;
+ }
+}
+
+static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
+ if (GPR_SLICE_LENGTH(slice) == 0) {
+ gpr_slice_unref(slice);
+ return 1;
+ }
+ /* we have to be reading a message to know what to do here */
+ if (!call->reading_message) {
+ grpc_call_cancel_with_status(
+ call, GRPC_STATUS_INVALID_ARGUMENT,
+ "Received payload data while not reading a message");
+ return 0;
+ }
+ /* append the slice to the incoming buffer */
+ gpr_slice_buffer_add(&call->incoming_message, slice);
+ if (call->incoming_message.length > call->incoming_message_length) {
+ /* if we got too many bytes, complain */
+ char *message = NULL;
+ gpr_asprintf(
+ &message, "Receiving message overflow; read %d bytes, expected %d",
+ (int)call->incoming_message.length, (int)call->incoming_message_length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ } else if (call->incoming_message.length == call->incoming_message_length) {
+ finish_message(call);
+ return 1;
+ } else {
+ return 1;
+ }
+}
+
+static void call_on_done_recv(void *pc, int success) {
+ grpc_call *call = pc;
+ size_t i;
+ int unref = 0;
+ lock(call);
+ for (i = 0; success && i < call->recv_ops.nops; i++) {
+ grpc_stream_op *op = &call->recv_ops.ops[i];
+ switch (op->type) {
+ case GRPC_NO_OP:
+ break;
+ case GRPC_OP_METADATA:
+ recv_metadata(call, &op->data.metadata);
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ success = begin_message(call, op->data.begin_message);
+ break;
+ case GRPC_OP_SLICE:
+ success = add_slice_to_message(call, op->data.slice);
+ break;
+ }
+ }
+ if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
+ GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
+ call->read_state = READ_STATE_READ_CLOSED;
+ }
+ if (call->recv_state == GRPC_STREAM_CLOSED) {
+ GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
+ call->read_state = READ_STATE_STREAM_CLOSED;
+ unref = 1;
+ }
+ if (!success) {
+ abort();
+ }
+ finish_read_ops(call);
+ unlock(call);
+
+ if (unref) {
+ grpc_call_internal_unref(call, 0);
+ }
+}
+
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
grpc_metadata *metadata) {
size_t i;
@@ -595,6 +727,22 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
return out;
}
+/* Copy the contents of a byte buffer into stream ops */
+static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
+ grpc_stream_op_buffer *sopb) {
+ size_t i;
+
+ switch (byte_buffer->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ gpr_slice_ref(slice);
+ grpc_sopb_add_slice(sopb, slice);
+ }
+ break;
+ }
+}
+
static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
grpc_metadata_batch mdb;
@@ -608,24 +756,25 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
break;
}
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- mdb.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
+ mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+ data.send_metadata.metadata);
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = call->send_deadline;
for (i = 0; i < call->send_initial_metadata_count; i++) {
- grpc_metadata_batch_link_head(&mdb,
- &call->send_initial_metadata[i]);
+ grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
}
grpc_sopb_add_metadata(&call->send_ops, mdb);
op->send_ops = &call->send_ops;
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
call->write_state = WRITE_STATE_STARTED;
- /* fall through intended */
+ /* fall through intended */
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
- grpc_sopb_add_message(&call->send_ops, data.send_message);
+ grpc_sopb_add_begin_message(
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
}
@@ -637,8 +786,8 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
if (!call->is_client) {
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
- mdb.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
+ mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+ data.send_metadata.metadata);
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = call->send_deadline;
/* send status */
@@ -656,7 +805,8 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
&mdb, &call->details_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
+ grpc_mdstr_ref(
+ grpc_channel_get_message_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context,
data.send_status.details)));
}
@@ -779,10 +929,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
master->on_complete = completion;
master->user_data = user_data;
- if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) {
- call->need_more_data = 1;
- }
-
finish_read_ops(call);
early_out_write_ops(call);
@@ -867,28 +1013,6 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-static void set_read_state_locked(grpc_call *call, read_state state) {
- GPR_ASSERT(call->read_state < state);
- call->read_state = state;
- finish_read_ops(call);
-}
-
-static void set_read_state(grpc_call *call, read_state state) {
- lock(call);
- set_read_state_locked(call, state);
- unlock(call);
-}
-
-void grpc_call_read_closed(grpc_call_element *elem) {
- set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
-}
-
-void grpc_call_stream_closed(grpc_call_element *elem) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- set_read_state(call, READ_STATE_STREAM_CLOSED);
- grpc_call_internal_unref(call, 0);
-}
-
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
@@ -912,35 +1036,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
-void grpc_call_recv_message(grpc_call_element *elem,
- grpc_byte_buffer *byte_buffer) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
- finish_read_ops(call);
- unlock(call);
-}
-
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- set_status_code(call, STATUS_FROM_CORE, status);
- set_status_details(call, STATUS_FROM_CORE,
- grpc_mdstr_from_string(call->metadata_context, message));
- unlock(call);
-}
-
-int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
int is_trailing;
grpc_mdctx *mdctx = call->metadata_context;
- lock(call);
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
for (l = md->list.head; l != NULL; l = l->next) {
grpc_mdelem *md = l->md;
@@ -976,9 +1078,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
- set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
+ call->read_state = READ_STATE_GOT_INITIAL_METADATA;
}
- unlock(call);
grpc_mdctx_lock(mdctx);
for (l = md->list.head; l; l = l->next) {
@@ -988,13 +1089,43 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
}
grpc_mdctx_unlock(mdctx);
+}
+
+#if 0
+void grpc_call_read_closed(grpc_call_element *elem) {
+ set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
+}
- return !is_trailing;
+void grpc_call_stream_closed(grpc_call_element *elem) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ set_read_state(call, READ_STATE_STREAM_CLOSED);
+ grpc_call_internal_unref(call, 0);
+}
+
+void grpc_call_recv_message(grpc_call_element *elem,
+ grpc_byte_buffer *byte_buffer) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+ finish_read_ops(call);
+ unlock(call);
+}
+
+void grpc_call_recv_synthetic_status(grpc_call_element *elem,
+ grpc_status_code status,
+ const char *message) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ set_status_code(call, STATUS_FROM_CORE, status);
+ set_status_details(call, STATUS_FROM_CORE,
+ grpc_mdstr_from_string(call->metadata_context, message));
+ unlock(call);
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
+#endif
/*
* BATCH API IMPLEMENTATION
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 358e5560a3..199beb1738 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -96,26 +96,12 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
-/* Helpers for grpc_client, grpc_server filters to publish received data to
- the completion queue/surface layer */
-/* receive metadata - returns 1 if this was initial metadata */
-int grpc_call_recv_metadata(grpc_call_element *surface_element,
- grpc_metadata_batch *md);
-void grpc_call_recv_message(grpc_call_element *surface_element,
- grpc_byte_buffer *message);
-void grpc_call_read_closed(grpc_call_element *surface_element);
-void grpc_call_stream_closed(grpc_call_element *surface_element);
-
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message);
-
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 29b042e7c1..f1d71afaf2 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -52,6 +52,7 @@ typedef struct registered_call {
struct grpc_channel {
int is_client;
gpr_refcount refs;
+ gpr_uint32 max_message_length;
grpc_mdctx *metadata_context;
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_message_string;
@@ -68,9 +69,13 @@ struct grpc_channel {
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
+/* the protobuf library will (by default) start warning at 100megs */
+#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+ size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
@@ -88,6 +93,24 @@ grpc_channel *grpc_channel_create_from_filters(
CHANNEL_STACK_FROM_CHANNEL(channel));
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
+
+ channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
+ if (args) {
+ for (i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
+ if (args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else if (args->args[i].value.integer < 0) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else {
+ channel->max_message_length = args->args[i].value.integer;
+ }
+ }
+ }
+ }
+
return channel;
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index d3e51185ee..05d57a905b 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -44,6 +44,7 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
void grpc_client_channel_closed(grpc_channel_element *elem);
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index dabe68f3bd..c3901bf608 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -50,9 +50,22 @@ typedef enum grpc_stream_op_code {
Must be ignored by receivers */
GRPC_NO_OP,
GRPC_OP_METADATA,
- GRPC_OP_MESSAGE
+ /* Begin a message/metadata element/status - as defined by
+ grpc_message_type. */
+ GRPC_OP_BEGIN_MESSAGE,
+ /* Add a slice of data to the current message/metadata element/status.
+ Must not overflow the forward declared length. */
+ GRPC_OP_SLICE
} grpc_stream_op_code;
+/* Arguments for GRPC_OP_BEGIN */
+typedef struct grpc_begin_message {
+ /* How many bytes of data will this message contain */
+ gpr_uint32 length;
+ /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */
+ gpr_uint32 flags;
+} grpc_begin_message;
+
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
@@ -105,8 +118,9 @@ typedef struct grpc_stream_op {
/* the arguments to this operation. union fields are named according to the
associated op-code */
union {
- grpc_byte_buffer *message;
+ grpc_begin_message begin_message;
grpc_metadata_batch metadata;
+ gpr_slice slice;
} data;
} grpc_stream_op;
@@ -134,8 +148,16 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops);
/* Append a GRPC_NO_OP to a buffer */
void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
-void grpc_sopb_add_message(grpc_stream_op_buffer *sopb, grpc_byte_buffer *bb);
+/* Append a GRPC_OP_BEGIN to a buffer */
+void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
+ gpr_uint32 flags);
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
+/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
+void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
+/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
+void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
+ void (*cb)(void *arg, grpc_op_error error),
+ void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f31011e56a..264245d351 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -113,21 +113,6 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream);
-/* Enable/disable incoming data for a stream.
-
- This effectively disables new window becoming available for a given stream,
- but does not prevent existing window from being consumed by a sender: the
- caller must still be prepared to receive some additional data after this
- call.
-
- Arguments:
- transport - the transport on which to create this stream
- stream - the grpc_stream to destroy (memory is still owned by the
- caller, but any child memory must be cleaned up)
- allow - is it allowed that new window be opened up? */
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
- grpc_stream *stream, int allow);
-
/* Transport op: a set of operations to perform on a transport */
typedef struct grpc_transport_op {
grpc_stream_op_buffer *send_ops;