diff options
-rw-r--r-- | src/core/channel/connected_channel.c | 39 | ||||
-rw-r--r-- | src/core/surface/call.c | 263 | ||||
-rw-r--r-- | src/core/surface/call.h | 14 | ||||
-rw-r--r-- | src/core/surface/channel.c | 23 | ||||
-rw-r--r-- | src/core/surface/channel.h | 1 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 28 | ||||
-rw-r--r-- | src/core/transport/transport.h | 15 |
7 files changed, 246 insertions, 137 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index d0b834a10a..9e2d92ffbc 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -45,8 +45,6 @@ #include <grpc/support/slice_buffer.h> #define MAX_BUFFER_LENGTH 8192 -/* the protobuf library will (by default) start warning at 100megs */ -#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) typedef struct connected_channel_channel_data { grpc_transport *transport; @@ -63,24 +61,6 @@ typedef struct connected_channel_call_data { #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \ (((call_data *)(transport_stream)) - 1) -#if 0 -/* Copy the contents of a byte buffer into stream ops */ -static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, - grpc_stream_op_buffer *sopb) { - size_t i; - - switch (byte_buffer->type) { - case GRPC_BB_SLICE_BUFFER: - for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { - gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; - gpr_slice_ref(slice); - grpc_sopb_add_slice(sopb, slice); - } - break; - } -} -#endif - /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { @@ -145,25 +125,6 @@ static void init_channel_elem(grpc_channel_element *elem, GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; - -#if 0 - cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; - if (args) { - for (i = 0; i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { - if (args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s ignored: it must be an integer", - GRPC_ARG_MAX_MESSAGE_LENGTH); - } else if (args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", - GRPC_ARG_MAX_MESSAGE_LENGTH); - } else { - cd->max_message_length = args->args[i].value.integer; - } - } - } - } -#endif } /* Destructor for channel_data */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index fb2efac74e..7fcf6e2b04 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -144,12 +144,14 @@ struct grpc_call { gpr_uint8 have_alarm; /* are we currently performing a send operation */ gpr_uint8 sending; + /* are we currently performing a recv operation */ + gpr_uint8 receiving; /* are we currently completing requests */ gpr_uint8 completing; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; - /* flag that we need to request more data */ - gpr_uint8 need_more_data; + /* are we currently reading a message? */ + gpr_uint8 reading_message; /* flags with bits corresponding to write states allowing us to determine what was sent */ gpr_uint16 last_send_contains; @@ -221,6 +223,9 @@ struct grpc_call { grpc_stream_op_buffer recv_ops; grpc_stream_state recv_state; + gpr_slice_buffer incoming_message; + gpr_uint32 incoming_message_length; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; @@ -246,6 +251,8 @@ static void call_on_done_recv(void *call, int success); static void call_on_done_send(void *call, int success); static int fill_send_ops(grpc_call *call, grpc_transport_op *op); static void execute_op(grpc_call *call, grpc_transport_op *op); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); +static void finish_read_ops(grpc_call *call); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data, @@ -378,6 +385,15 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } +static int need_more_data(grpc_call *call) { + return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) || + is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || + is_op_live(call, GRPC_IOREQ_RECV_CLOSE); +} + static void unlock(grpc_call *call) { grpc_transport_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; @@ -387,13 +403,14 @@ static void unlock(grpc_call *call) { memset(&op, 0, sizeof(op)); - if (call->need_more_data && - (call->write_state >= WRITE_STATE_STARTED || !call->is_client)) { + if (!call->receiving && + (call->write_state >= WRITE_STATE_STARTED || !call->is_client) && + need_more_data(call)) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; op.on_done_recv = call_on_done_recv; op.recv_user_data = call; - call->need_more_data = 0; + call->receiving = 1; grpc_call_internal_ref(call); start_op = 1; } @@ -570,6 +587,121 @@ static void call_on_done_send(void *pc, int success) { grpc_call_internal_unref(call, 0); } +static void finish_message(grpc_call *call) { + /* TODO(ctiller): this could be a lot faster if coded directly */ + grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create( + call->incoming_message.slices, call->incoming_message.count); + gpr_slice_buffer_reset_and_unref(&call->incoming_message); + + grpc_bbq_push(&call->incoming_queue, byte_buffer); + + GPR_ASSERT(call->incoming_message.count == 0); + call->reading_message = 0; +} + +static int begin_message(grpc_call *call, grpc_begin_message msg) { + /* can't begin a message when we're still reading a message */ + if (call->reading_message) { + char *message = NULL; + gpr_asprintf( + &message, "Message terminated early; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } + /* stash away parameters, and prepare for incoming slices */ + if (msg.length > grpc_channel_get_max_message_length(call->channel)) { + char *message = NULL; + gpr_asprintf( + &message, + "Maximum message length of %d exceeded by a message of length %d", + grpc_channel_get_max_message_length(call->channel), msg.length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (msg.length > 0) { + call->reading_message = 1; + call->incoming_message_length = msg.length; + return 1; + } else { + finish_message(call); + return 1; + } +} + +static int add_slice_to_message(grpc_call *call, gpr_slice slice) { + if (GPR_SLICE_LENGTH(slice) == 0) { + gpr_slice_unref(slice); + return 1; + } + /* we have to be reading a message to know what to do here */ + if (!call->reading_message) { + grpc_call_cancel_with_status( + call, GRPC_STATUS_INVALID_ARGUMENT, + "Received payload data while not reading a message"); + return 0; + } + /* append the slice to the incoming buffer */ + gpr_slice_buffer_add(&call->incoming_message, slice); + if (call->incoming_message.length > call->incoming_message_length) { + /* if we got too many bytes, complain */ + char *message = NULL; + gpr_asprintf( + &message, "Receiving message overflow; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (call->incoming_message.length == call->incoming_message_length) { + finish_message(call); + return 1; + } else { + return 1; + } +} + +static void call_on_done_recv(void *pc, int success) { + grpc_call *call = pc; + size_t i; + int unref = 0; + lock(call); + for (i = 0; success && i < call->recv_ops.nops; i++) { + grpc_stream_op *op = &call->recv_ops.ops[i]; + switch (op->type) { + case GRPC_NO_OP: + break; + case GRPC_OP_METADATA: + recv_metadata(call, &op->data.metadata); + break; + case GRPC_OP_BEGIN_MESSAGE: + success = begin_message(call, op->data.begin_message); + break; + case GRPC_OP_SLICE: + success = add_slice_to_message(call, op->data.slice); + break; + } + } + if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); + call->read_state = READ_STATE_READ_CLOSED; + } + if (call->recv_state == GRPC_STREAM_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); + call->read_state = READ_STATE_STREAM_CLOSED; + unref = 1; + } + if (!success) { + abort(); + } + finish_read_ops(call); + unlock(call); + + if (unref) { + grpc_call_internal_unref(call, 0); + } +} + static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, grpc_metadata *metadata) { size_t i; @@ -595,6 +727,22 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, return out; } +/* Copy the contents of a byte buffer into stream ops */ +static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, + grpc_stream_op_buffer *sopb) { + size_t i; + + switch (byte_buffer->type) { + case GRPC_BB_SLICE_BUFFER: + for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + gpr_slice_ref(slice); + grpc_sopb_add_slice(sopb, slice); + } + break; + } +} + static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; grpc_metadata_batch mdb; @@ -608,24 +756,25 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { break; } data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - mdb.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); mdb.garbage.head = mdb.garbage.tail = NULL; mdb.deadline = call->send_deadline; for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_metadata_batch_link_head(&mdb, - &call->send_initial_metadata[i]); + grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]); } grpc_sopb_add_metadata(&call->send_ops, mdb); op->send_ops = &call->send_ops; op->bind_pollset = grpc_cq_pollset(call->cq); call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; call->write_state = WRITE_STATE_STARTED; - /* fall through intended */ + /* fall through intended */ case WRITE_STATE_STARTED: if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - grpc_sopb_add_message(&call->send_ops, data.send_message); + grpc_sopb_add_begin_message( + &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; } @@ -637,8 +786,8 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { if (!call->is_client) { /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - mdb.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); mdb.garbage.head = mdb.garbage.tail = NULL; mdb.deadline = call->send_deadline; /* send status */ @@ -656,7 +805,8 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { &mdb, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), + grpc_mdstr_ref( + grpc_channel_get_message_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, data.send_status.details))); } @@ -779,10 +929,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, master->on_complete = completion; master->user_data = user_data; - if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) { - call->need_more_data = 1; - } - finish_read_ops(call); early_out_write_ops(call); @@ -867,28 +1013,6 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state_locked(grpc_call *call, read_state state) { - GPR_ASSERT(call->read_state < state); - call->read_state = state; - finish_read_ops(call); -} - -static void set_read_state(grpc_call *call, read_state state) { - lock(call); - set_read_state_locked(call, state); - unlock(call); -} - -void grpc_call_read_closed(grpc_call_element *elem) { - set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); -} - -void grpc_call_stream_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - set_read_state(call, READ_STATE_STREAM_CLOSED); - grpc_call_internal_unref(call, 0); -} - /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ @@ -912,35 +1036,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -void grpc_call_recv_message(grpc_call_element *elem, - grpc_byte_buffer *byte_buffer) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - grpc_bbq_push(&call->incoming_queue, byte_buffer); - finish_read_ops(call); - unlock(call); -} - -void grpc_call_recv_synthetic_status(grpc_call_element *elem, - grpc_status_code status, - const char *message) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - set_status_code(call, STATUS_FROM_CORE, status); - set_status_details(call, STATUS_FROM_CORE, - grpc_mdstr_from_string(call->metadata_context, message)); - unlock(call); -} - -int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; int is_trailing; grpc_mdctx *mdctx = call->metadata_context; - lock(call); is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; for (l = md->list.head; l != NULL; l = l->next) { grpc_mdelem *md = l->md; @@ -976,9 +1078,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { - set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); + call->read_state = READ_STATE_GOT_INITIAL_METADATA; } - unlock(call); grpc_mdctx_lock(mdctx); for (l = md->list.head; l; l = l->next) { @@ -988,13 +1089,43 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { grpc_mdctx_locked_mdelem_unref(mdctx, l->md); } grpc_mdctx_unlock(mdctx); +} + +#if 0 +void grpc_call_read_closed(grpc_call_element *elem) { + set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); +} - return !is_trailing; +void grpc_call_stream_closed(grpc_call_element *elem) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + set_read_state(call, READ_STATE_STREAM_CLOSED); + grpc_call_internal_unref(call, 0); +} + +void grpc_call_recv_message(grpc_call_element *elem, + grpc_byte_buffer *byte_buffer) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + grpc_bbq_push(&call->incoming_queue, byte_buffer); + finish_read_ops(call); + unlock(call); +} + +void grpc_call_recv_synthetic_status(grpc_call_element *elem, + grpc_status_code status, + const char *message) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + set_status_code(call, STATUS_FROM_CORE, status); + set_status_details(call, STATUS_FROM_CORE, + grpc_mdstr_from_string(call->metadata_context, message)); + unlock(call); } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } +#endif /* * BATCH API IMPLEMENTATION diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 358e5560a3..199beb1738 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -96,26 +96,12 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); -/* Helpers for grpc_client, grpc_server filters to publish received data to - the completion queue/surface layer */ -/* receive metadata - returns 1 if this was initial metadata */ -int grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_metadata_batch *md); -void grpc_call_recv_message(grpc_call_element *surface_element, - grpc_byte_buffer *message); -void grpc_call_read_closed(grpc_call_element *surface_element); -void grpc_call_stream_closed(grpc_call_element *surface_element); - grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); -void grpc_call_recv_synthetic_status(grpc_call_element *elem, - grpc_status_code status, - const char *message); - /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 29b042e7c1..f1d71afaf2 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -52,6 +52,7 @@ typedef struct registered_call { struct grpc_channel { int is_client; gpr_refcount refs; + gpr_uint32 max_message_length; grpc_mdctx *metadata_context; grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_message_string; @@ -68,9 +69,13 @@ struct grpc_channel { #define CHANNEL_FROM_TOP_ELEM(top_elem) \ CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) +/* the protobuf library will (by default) start warning at 100megs */ +#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) + grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); @@ -88,6 +93,24 @@ grpc_channel *grpc_channel_create_from_filters( CHANNEL_STACK_FROM_CHANNEL(channel)); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; + + channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + if (args) { + for (i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else if (args->args[i].value.integer < 0) { + gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else { + channel->max_message_length = args->args[i].value.integer; + } + } + } + } + return channel; } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index d3e51185ee..05d57a905b 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -44,6 +44,7 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); void grpc_client_channel_closed(grpc_channel_element *elem); diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index dabe68f3bd..c3901bf608 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -50,9 +50,22 @@ typedef enum grpc_stream_op_code { Must be ignored by receivers */ GRPC_NO_OP, GRPC_OP_METADATA, - GRPC_OP_MESSAGE + /* Begin a message/metadata element/status - as defined by + grpc_message_type. */ + GRPC_OP_BEGIN_MESSAGE, + /* Add a slice of data to the current message/metadata element/status. + Must not overflow the forward declared length. */ + GRPC_OP_SLICE } grpc_stream_op_code; +/* Arguments for GRPC_OP_BEGIN */ +typedef struct grpc_begin_message { + /* How many bytes of data will this message contain */ + gpr_uint32 length; + /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */ + gpr_uint32 flags; +} grpc_begin_message; + typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; @@ -105,8 +118,9 @@ typedef struct grpc_stream_op { /* the arguments to this operation. union fields are named according to the associated op-code */ union { - grpc_byte_buffer *message; + grpc_begin_message begin_message; grpc_metadata_batch metadata; + gpr_slice slice; } data; } grpc_stream_op; @@ -134,8 +148,16 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops); /* Append a GRPC_NO_OP to a buffer */ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb); -void grpc_sopb_add_message(grpc_stream_op_buffer *sopb, grpc_byte_buffer *bb); +/* Append a GRPC_OP_BEGIN to a buffer */ +void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, + gpr_uint32 flags); void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata); +/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ +void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); +/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */ +void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, + void (*cb)(void *arg, grpc_op_error error), + void *arg); /* Append a buffer to a buffer - does not ref/unref any internal objects */ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t nops); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f31011e56a..264245d351 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -113,21 +113,6 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream); -/* Enable/disable incoming data for a stream. - - This effectively disables new window becoming available for a given stream, - but does not prevent existing window from being consumed by a sender: the - caller must still be prepared to receive some additional data after this - call. - - Arguments: - transport - the transport on which to create this stream - stream - the grpc_stream to destroy (memory is still owned by the - caller, but any child memory must be cleaned up) - allow - is it allowed that new window be opened up? */ -void grpc_transport_set_allow_window_updates(grpc_transport *transport, - grpc_stream *stream, int allow); - /* Transport op: a set of operations to perform on a transport */ typedef struct grpc_transport_op { grpc_stream_op_buffer *send_ops; |