diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 530 |
1 files changed, 297 insertions, 233 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 2949805622..7fcf6e2b04 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -81,9 +81,9 @@ typedef struct { grpc_ioreq_completion_func on_complete; void *user_data; /* a bit mask of which request ops are needed (1u << opid) */ - gpr_uint32 need_mask; + gpr_uint16 need_mask; /* a bit mask of which request ops are now completed */ - gpr_uint32 complete_mask; + gpr_uint16 complete_mask; } reqinfo_master; /* Status data for a request can come from several sources; this @@ -144,12 +144,17 @@ struct grpc_call { gpr_uint8 have_alarm; /* are we currently performing a send operation */ gpr_uint8 sending; + /* are we currently performing a recv operation */ + gpr_uint8 receiving; /* are we currently completing requests */ gpr_uint8 completing; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; - /* flag that we need to request more data */ - gpr_uint8 need_more_data; + /* are we currently reading a message? */ + gpr_uint8 reading_message; + /* flags with bits corresponding to write states allowing us to determine + what was sent */ + gpr_uint16 last_send_contains; /* Active ioreqs. request_set and request_data contain one element per active ioreq @@ -214,6 +219,13 @@ struct grpc_call { size_t send_initial_metadata_count; gpr_timespec send_deadline; + grpc_stream_op_buffer send_ops; + grpc_stream_op_buffer recv_ops; + grpc_stream_state recv_state; + + gpr_slice_buffer incoming_message; + gpr_uint32 incoming_message_length; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; @@ -234,9 +246,13 @@ struct grpc_call { } while (0) static void do_nothing(void *ignored, grpc_op_error also_ignored) {} -static send_action choose_send_action(grpc_call *call); -static void enact_send_action(grpc_call *call, send_action sa); static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); +static void call_on_done_recv(void *call, int success); +static void call_on_done_send(void *call, int success); +static int fill_send_ops(grpc_call *call, grpc_transport_op *op); +static void execute_op(grpc_call *call, grpc_transport_op *op); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); +static void finish_read_ops(grpc_call *call); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data, @@ -359,20 +375,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { return GRPC_CALL_OK; } -static void request_more_data(grpc_call *call) { - grpc_call_op op; - - /* call down */ - op.type = GRPC_REQUEST_DATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.bind_pollset = NULL; - - grpc_call_execute_op(call, &op); -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -383,17 +385,42 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } +static int need_more_data(grpc_call *call) { + return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) || + is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || + is_op_live(call, GRPC_IOREQ_RECV_CLOSE); +} + static void unlock(grpc_call *call) { - send_action sa = SEND_NOTHING; + grpc_transport_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; - int need_more_data = - call->need_more_data && - (call->write_state >= WRITE_STATE_STARTED || !call->is_client); + int start_op = 0; int i; - if (need_more_data) { - call->need_more_data = 0; + memset(&op, 0, sizeof(op)); + + if (!call->receiving && + (call->write_state >= WRITE_STATE_STARTED || !call->is_client) && + need_more_data(call)) { + op.recv_ops = &call->recv_ops; + op.recv_state = &call->recv_state; + op.on_done_recv = call_on_done_recv; + op.recv_user_data = call; + call->receiving = 1; + grpc_call_internal_ref(call); + start_op = 1; + } + + if (!call->sending) { + if (fill_send_ops(call, &op)) { + call->sending = 1; + grpc_call_internal_ref(call); + start_op = 1; + } } if (!call->completing && call->num_completed_requests != 0) { @@ -405,22 +432,10 @@ static void unlock(grpc_call *call) { grpc_call_internal_ref(call); } - if (!call->sending) { - sa = choose_send_action(call); - if (sa != SEND_NOTHING) { - call->sending = 1; - grpc_call_internal_ref(call); - } - } - gpr_mu_unlock(&call->mu); - if (need_more_data) { - request_more_data(call); - } - - if (sa != SEND_NOTHING) { - enact_send_action(call, sa); + if (start_op) { + execute_op(call, &op); } if (completing_requests > 0) { @@ -554,64 +569,137 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, } } -static void finish_send_op(grpc_call *call, grpc_ioreq_op op, write_state ws, - grpc_op_error error) { +static void call_on_done_send(void *pc, int success) { + grpc_call *call = pc; + grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR; lock(call); - finish_ioreq_op(call, op, error); + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error); + } + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error); + } + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error); + } call->sending = 0; - call->write_state = ws; unlock(call); grpc_call_internal_unref(call, 0); } -static void finish_write_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, WRITE_STATE_STARTED, error); +static void finish_message(grpc_call *call) { + /* TODO(ctiller): this could be a lot faster if coded directly */ + grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create( + call->incoming_message.slices, call->incoming_message.count); + gpr_slice_buffer_reset_and_unref(&call->incoming_message); + + grpc_bbq_push(&call->incoming_queue, byte_buffer); + + GPR_ASSERT(call->incoming_message.count == 0); + call->reading_message = 0; } -static void finish_finish_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, WRITE_STATE_WRITE_CLOSED, error); +static int begin_message(grpc_call *call, grpc_begin_message msg) { + /* can't begin a message when we're still reading a message */ + if (call->reading_message) { + char *message = NULL; + gpr_asprintf( + &message, "Message terminated early; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } + /* stash away parameters, and prepare for incoming slices */ + if (msg.length > grpc_channel_get_max_message_length(call->channel)) { + char *message = NULL; + gpr_asprintf( + &message, + "Maximum message length of %d exceeded by a message of length %d", + grpc_channel_get_max_message_length(call->channel), msg.length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (msg.length > 0) { + call->reading_message = 1; + call->incoming_message_length = msg.length; + return 1; + } else { + finish_message(call); + return 1; + } } -static void finish_start_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, WRITE_STATE_STARTED, - error); +static int add_slice_to_message(grpc_call *call, gpr_slice slice) { + if (GPR_SLICE_LENGTH(slice) == 0) { + gpr_slice_unref(slice); + return 1; + } + /* we have to be reading a message to know what to do here */ + if (!call->reading_message) { + grpc_call_cancel_with_status( + call, GRPC_STATUS_INVALID_ARGUMENT, + "Received payload data while not reading a message"); + return 0; + } + /* append the slice to the incoming buffer */ + gpr_slice_buffer_add(&call->incoming_message, slice); + if (call->incoming_message.length > call->incoming_message_length) { + /* if we got too many bytes, complain */ + char *message = NULL; + gpr_asprintf( + &message, "Receiving message overflow; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (call->incoming_message.length == call->incoming_message_length) { + finish_message(call); + return 1; + } else { + return 1; + } } -static send_action choose_send_action(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_INITIAL: - if (is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) || - is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - return SEND_BUFFERED_INITIAL_METADATA; - } else { - return SEND_INITIAL_METADATA; - } - } - return SEND_NOTHING; - case WRITE_STATE_STARTED: - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { - if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - return SEND_BUFFERED_MESSAGE; - } else { - return SEND_MESSAGE; - } - } else if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); - if (call->is_client) { - return SEND_FINISH; - } else { - return SEND_TRAILING_METADATA_AND_FINISH; - } - } - return SEND_NOTHING; - case WRITE_STATE_WRITE_CLOSED: - return SEND_NOTHING; +static void call_on_done_recv(void *pc, int success) { + grpc_call *call = pc; + size_t i; + int unref = 0; + lock(call); + for (i = 0; success && i < call->recv_ops.nops; i++) { + grpc_stream_op *op = &call->recv_ops.ops[i]; + switch (op->type) { + case GRPC_NO_OP: + break; + case GRPC_OP_METADATA: + recv_metadata(call, &op->data.metadata); + break; + case GRPC_OP_BEGIN_MESSAGE: + success = begin_message(call, op->data.begin_message); + break; + case GRPC_OP_SLICE: + success = add_slice_to_message(call, op->data.slice); + break; + } + } + if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); + call->read_state = READ_STATE_READ_CLOSED; + } + if (call->recv_state == GRPC_STREAM_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); + call->read_state = READ_STATE_STREAM_CLOSED; + unref = 1; + } + if (!success) { + abort(); + } + finish_read_ops(call); + unlock(call); + + if (unref) { + grpc_call_internal_unref(call, 0); } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - return SEND_NOTHING; } static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, @@ -639,97 +727,100 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, return out; } -static void enact_send_action(grpc_call *call, send_action sa) { +/* Copy the contents of a byte buffer into stream ops */ +static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, + grpc_stream_op_buffer *sopb) { + size_t i; + + switch (byte_buffer->type) { + case GRPC_BB_SLICE_BUFFER: + for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + gpr_slice_ref(slice); + grpc_sopb_add_slice(sopb, slice); + } + break; + } +} + +static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; - grpc_call_op op; + grpc_metadata_batch mdb; size_t i; - gpr_uint32 flags = 0; char status_str[GPR_LTOA_MIN_BUFSIZE]; + GPR_ASSERT(op->send_ops == NULL); - switch (sa) { - case SEND_NOTHING: - abort(); - break; - case SEND_BUFFERED_INITIAL_METADATA: - flags |= GRPC_WRITE_BUFFER_HINT; - /* fallthrough */ - case SEND_INITIAL_METADATA: + switch (call->write_state) { + case WRITE_STATE_INITIAL: + if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { + break; + } data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.metadata.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = call->send_deadline; + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = call->send_deadline; for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_metadata_batch_link_head(&op.data.metadata, - &call->send_initial_metadata[i]); + grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]); } - call->send_initial_metadata_count = 0; - op.done_cb = finish_start_step; - op.user_data = call; - op.bind_pollset = grpc_cq_pollset(call->cq); - grpc_call_execute_op(call, &op); - break; - case SEND_BUFFERED_MESSAGE: - flags |= GRPC_WRITE_BUFFER_HINT; - /* fallthrough */ - case SEND_MESSAGE: - data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.message = data.send_message; - op.done_cb = finish_write_step; - op.user_data = call; - op.bind_pollset = NULL; - grpc_call_execute_op(call, &op); - break; - case SEND_TRAILING_METADATA_AND_FINISH: - /* send trailing metadata */ - data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.metadata.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = call->send_deadline; - op.bind_pollset = NULL; - /* send status */ - /* TODO(ctiller): cache common status values */ - data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - gpr_ltoa(data.send_status.code, status_str); - grpc_metadata_batch_add_tail( - &op.data.metadata, &call->status_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); - if (data.send_status.details) { - grpc_metadata_batch_add_tail( - &op.data.metadata, &call->details_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + grpc_sopb_add_metadata(&call->send_ops, mdb); + op->send_ops = &call->send_ops; + op->bind_pollset = grpc_cq_pollset(call->cq); + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; + call->write_state = WRITE_STATE_STARTED; + /* fall through intended */ + case WRITE_STATE_STARTED: + if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { + data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + grpc_sopb_add_begin_message( + &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); + op->send_ops = &call->send_ops; + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; } - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - /* fallthrough: see choose_send_action for details */ - case SEND_FINISH: - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = finish_finish_step; - op.user_data = call; - op.bind_pollset = NULL; - grpc_call_execute_op(call, &op); + if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { + op->is_last_send = 1; + op->send_ops = &call->send_ops; + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; + call->write_state = WRITE_STATE_WRITE_CLOSED; + if (!call->is_client) { + /* send trailing metadata */ + data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = call->send_deadline; + /* send status */ + /* TODO(ctiller): cache common status values */ + data = call->request_data[GRPC_IOREQ_SEND_STATUS]; + gpr_ltoa(data.send_status.code, status_str); + grpc_metadata_batch_add_tail( + &mdb, &call->status_link, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, status_str))); + if (data.send_status.details) { + grpc_metadata_batch_add_tail( + &mdb, &call->details_link, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref( + grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_status.details))); + } + } + } + break; + case WRITE_STATE_WRITE_CLOSED: break; } + if (op->send_ops) { + op->on_done_send = call_on_done_send; + op->send_user_data = call; + } + return op->send_ops != NULL; } static grpc_call_error start_ioreq_error(grpc_call *call, @@ -838,10 +929,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, master->on_complete = completion; master->user_data = user_data; - if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) { - call->need_more_data = 1; - } - finish_read_ops(call); early_out_write_ops(call); @@ -871,19 +958,12 @@ void grpc_call_destroy(grpc_call *c) { grpc_call_internal_unref(c, 1); } -grpc_call_error grpc_call_cancel(grpc_call *c) { - grpc_call_element *elem; - grpc_call_op op; - - op.type = GRPC_CANCEL_OP; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.bind_pollset = NULL; +grpc_call_error grpc_call_cancel(grpc_call *call) { + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; - elem = CALL_ELEM_FROM_CALL(c, 0); - elem->filter->call_op(elem, NULL, &op); + execute_op(call, &op); return GRPC_CALL_OK; } @@ -901,11 +981,10 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return grpc_call_cancel(c); } -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { +static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, op); + elem->filter->start_transport_op(elem, op); } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -934,28 +1013,6 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state_locked(grpc_call *call, read_state state) { - GPR_ASSERT(call->read_state < state); - call->read_state = state; - finish_read_ops(call); -} - -static void set_read_state(grpc_call *call, read_state state) { - lock(call); - set_read_state_locked(call, state); - unlock(call); -} - -void grpc_call_read_closed(grpc_call_element *elem) { - set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); -} - -void grpc_call_stream_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - set_read_state(call, READ_STATE_STREAM_CLOSED); - grpc_call_internal_unref(call, 0); -} - /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ @@ -979,35 +1036,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -void grpc_call_recv_message(grpc_call_element *elem, - grpc_byte_buffer *byte_buffer) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - grpc_bbq_push(&call->incoming_queue, byte_buffer); - finish_read_ops(call); - unlock(call); -} - -void grpc_call_recv_synthetic_status(grpc_call_element *elem, - grpc_status_code status, - const char *message) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - set_status_code(call, STATUS_FROM_CORE, status); - set_status_details(call, STATUS_FROM_CORE, - grpc_mdstr_from_string(call->metadata_context, message)); - unlock(call); -} - -int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; int is_trailing; grpc_mdctx *mdctx = call->metadata_context; - lock(call); is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; for (l = md->list.head; l != NULL; l = l->next) { grpc_mdelem *md = l->md; @@ -1043,9 +1078,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { - set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); + call->read_state = READ_STATE_GOT_INITIAL_METADATA; } - unlock(call); grpc_mdctx_lock(mdctx); for (l = md->list.head; l; l = l->next) { @@ -1055,13 +1089,43 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { grpc_mdctx_locked_mdelem_unref(mdctx, l->md); } grpc_mdctx_unlock(mdctx); +} + +#if 0 +void grpc_call_read_closed(grpc_call_element *elem) { + set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); +} + +void grpc_call_stream_closed(grpc_call_element *elem) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + set_read_state(call, READ_STATE_STREAM_CLOSED); + grpc_call_internal_unref(call, 0); +} + +void grpc_call_recv_message(grpc_call_element *elem, + grpc_byte_buffer *byte_buffer) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + grpc_bbq_push(&call->incoming_queue, byte_buffer); + finish_read_ops(call); + unlock(call); +} - return !is_trailing; +void grpc_call_recv_synthetic_status(grpc_call_element *elem, + grpc_status_code status, + const char *message) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + set_status_code(call, STATUS_FROM_CORE, status); + set_status_details(call, STATUS_FROM_CORE, + grpc_mdstr_from_string(call->metadata_context, message)); + unlock(call); } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } +#endif /* * BATCH API IMPLEMENTATION |