diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 1936 |
1 files changed, 718 insertions, 1218 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 81ff215c0c..a162d99193 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -31,6 +31,7 @@ * */ #include <assert.h> +#include <limits.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -42,16 +43,17 @@ #include <grpc/support/useful.h> #include "src/core/channel/channel_stack.h" +#include "src/core/compression/algorithm_metadata.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" -#include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" +#include "src/core/transport/static_metadata.h" -/** The maximum number of completions possible. +/** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch api: - initial metadata send @@ -60,7 +62,7 @@ - initial metadata recv - message recv - status/close recv (depending on client/server) */ -#define MAX_CONCURRENT_COMPLETIONS 6 +#define MAX_CONCURRENT_BATCHES 6 typedef struct { grpc_ioreq_completion_func on_complete; @@ -68,24 +70,7 @@ typedef struct { int success; } completed_request; -/* See request_set in grpc_call below for a description */ -#define REQSET_EMPTY 'X' -#define REQSET_DONE 'Y' - -#define MAX_SEND_INITIAL_METADATA_COUNT 3 - -typedef struct { - /* Overall status of the operation: starts OK, may degrade to - non-OK */ - gpr_uint8 success; - /* a bit mask of which request ops are needed (1u << opid) */ - gpr_uint16 need_mask; - /* a bit mask of which request ops are now completed */ - gpr_uint16 complete_mask; - /* Completion function to call at the end of the operation */ - grpc_ioreq_completion_func on_complete; - void *user_data; -} reqinfo_master; +#define MAX_SEND_EXTRA_METADATA_COUNT 3 /* Status data for a request can come from several sources; this enumerates them all, and acts as a priority sorting for which @@ -130,106 +115,63 @@ typedef enum { WRITE_STATE_WRITE_CLOSED } write_state; +typedef struct batch_control { + grpc_call *call; + grpc_cq_completion cq_completion; + grpc_closure finish_batch; + void *notify_tag; + gpr_refcount steps_to_complete; + + gpr_uint8 send_initial_metadata; + gpr_uint8 send_message; + gpr_uint8 send_final_op; + gpr_uint8 recv_initial_metadata; + gpr_uint8 recv_message; + gpr_uint8 recv_final_op; + gpr_uint8 is_notify_tag_closure; + gpr_uint8 success; +} batch_control; + struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; grpc_call *parent; grpc_call *first_child; - grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; - gpr_mu completion_mu; - /* how far through the stream have we read? */ - read_state read_state; - /* how far through the stream have we written? */ - write_state write_state; /* client or server call */ gpr_uint8 is_client; /* is the alarm set */ gpr_uint8 have_alarm; - /* are we currently performing a send operation */ - gpr_uint8 sending; - /* are we currently performing a recv operation */ - gpr_uint8 receiving; - /* are we currently completing requests */ - gpr_uint8 completing; /** has grpc_call_destroy been called */ gpr_uint8 destroy_called; - /* pairs with completed_requests */ - gpr_uint8 num_completed_requests; - /* are we currently reading a message? */ - gpr_uint8 reading_message; - /* have we bound a pollset yet? */ - gpr_uint8 bound_pollset; - /* is an error status set */ - gpr_uint8 error_status_set; - /** bitmask of allocated completion events in completions */ - gpr_uint8 allocated_completions; /** flag indicating that cancellation is inherited */ gpr_uint8 cancellation_is_inherited; + /** bitmask of live batches */ + gpr_uint8 used_batches; + /** which ops are in-flight */ + gpr_uint8 sent_initial_metadata; + gpr_uint8 sending_message; + gpr_uint8 sent_final_op; + gpr_uint8 received_initial_metadata; + gpr_uint8 receiving_message; + gpr_uint8 received_final_op; + + batch_control active_batches[MAX_CONCURRENT_BATCHES]; + + /* first idx: is_receiving, second idx: is_trailing */ + grpc_metadata_batch metadata_batch[2][2]; - /* flags with bits corresponding to write states allowing us to determine - what was sent */ - gpr_uint16 last_send_contains; - /* cancel with this status on the next outgoing transport op */ - grpc_status_code cancel_with_status; - - /* Active ioreqs. - request_set and request_data contain one element per active ioreq - operation. - - request_set[op] is an integer specifying a set of operations to which - the request belongs: - - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending - completion, and the integer represents to which group of operations - the ioreq belongs. Each group is represented by one master, and the - integer in request_set is an index into masters to find the master - data. - - if it is REQSET_EMPTY, the ioreq op is inactive and available to be - started - - finally, if request_set[op] is REQSET_DONE, then the operation is - complete and unavailable to be started again - - request_data[op] is the request data as supplied by the initiator of - a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT. - The set fields are as per the request type specified by op. - - Finally, one element of masters is set per active _set_ of ioreq - operations. It describes work left outstanding, result status, and - what work to perform upon operation completion. As one ioreq of each - op type can be active at once, by convention we choose the first element - of the group to be the master -- ie the master of in-progress operation - op is masters[request_set[op]]. This allows constant time allocation - and a strong upper bound of a count of masters to be calculated. */ - gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; - grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; - gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT]; - reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; - - /* Dynamic array of ioreq's that have completed: the count of - elements is queued in num_completed_requests. - This list is built up under lock(), and flushed entirely during - unlock(). - We know the upper bound of the number of elements as we can only - have one ioreq of each type active at once. */ - completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; - /* Incoming buffer of messages */ - grpc_byte_buffer_queue incoming_queue; /* Buffered read metadata waiting to be returned to the application. Element 0 is initial metadata, element 1 is trailing metadata. */ - grpc_metadata_array buffered_metadata[2]; - /* All metadata received - unreffed at once at the end of the call */ - grpc_mdelem **owned_metadata; - size_t owned_metadata_count; - size_t owned_metadata_capacity; + grpc_metadata_array *buffered_metadata[2]; /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; - /* Supported encodings (compression algorithms), a bitset */ gpr_uint32 encodings_accepted_by_peer; @@ -239,35 +181,36 @@ struct grpc_call { /* Deadline alarm - if have_alarm is non-zero */ grpc_timer alarm; - /* Call refcount - to keep the call alive during asynchronous operations */ - gpr_refcount internal_refcount; - - grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT]; - grpc_linked_mdelem status_link; - grpc_linked_mdelem details_link; - size_t send_initial_metadata_count; + /* for the client, extra metadata is initial metadata; for the + server, it's trailing metadata */ + grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; + int send_extra_metadata_count; gpr_timespec send_deadline; - grpc_stream_op_buffer send_ops; - grpc_stream_op_buffer recv_ops; - grpc_stream_state recv_state; - - gpr_slice_buffer incoming_message; - gpr_uint32 incoming_message_length; - gpr_uint32 incoming_message_flags; - grpc_closure destroy_closure; - grpc_closure on_done_recv; - grpc_closure on_done_send; - grpc_closure on_done_bind; - - /** completion events - for completion queue use */ - grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; - /** siblings: children of the same parent form a list, and this list is protected under parent->mu */ grpc_call *sibling_next; grpc_call *sibling_prev; + + grpc_slice_buffer_stream sending_stream; + grpc_byte_stream *receiving_stream; + grpc_byte_buffer **receiving_buffer; + gpr_slice receiving_slice; + grpc_closure receiving_slice_ready; + grpc_closure receiving_stream_ready; + gpr_uint32 test_only_last_message_flags; + + union { + struct { + grpc_status_code *status; + char **status_details; + size_t *status_details_capacity; + } client; + struct { + int *cancelled; + } server; + } final_op; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -279,20 +222,15 @@ struct grpc_call { static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, gpr_timespec deadline); -static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *call, int success); -static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *call, int success); -static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op); static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op); -static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *metadata); -static void finish_read_ops(grpc_call *call); -static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, +static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, const char *description); -static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, int success); - -static void lock(grpc_call *call); -static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call); +static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, + int success); +static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + int success); grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, @@ -301,9 +239,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, grpc_mdelem **add_initial_metadata, size_t add_initial_metadata_count, gpr_timespec send_deadline) { - size_t i; - grpc_transport_stream_op initial_op; - grpc_transport_stream_op *initial_op_ptr = NULL; + size_t i, j; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call *call; @@ -311,51 +247,36 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); - gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; - if (cq != NULL) { - GRPC_CQ_INTERNAL_REF(cq, "bind"); - } call->parent = parent_call; call->is_client = server_transport_data == NULL; - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - call->request_set[i] = REQSET_EMPTY; - } if (call->is_client) { - call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; - call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; + GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); + for (i = 0; i < add_initial_metadata_count; i++) { + call->send_extra_metadata[i].md = add_initial_metadata[i]; + } + call->send_extra_metadata_count = (int)add_initial_metadata_count; + } else { + GPR_ASSERT(add_initial_metadata_count == 0); + call->send_extra_metadata_count = 0; } - GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); - for (i = 0; i < add_initial_metadata_count; i++) { - call->send_initial_metadata[i].md = add_initial_metadata[i]; + for (i = 0; i < 2; i++) { + for (j = 0; j < 2; j++) { + call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + } } - call->send_initial_metadata_count = add_initial_metadata_count; call->send_deadline = send_deadline; GRPC_CHANNEL_INTERNAL_REF(channel, "call"); - call->metadata_context = grpc_channel_get_metadata_context(channel); - grpc_sopb_init(&call->send_ops); - grpc_sopb_init(&call->recv_ops); - gpr_slice_buffer_init(&call->incoming_message); - grpc_closure_init(&call->on_done_recv, call_on_done_recv, call); - grpc_closure_init(&call->on_done_send, call_on_done_send, call); - grpc_closure_init(&call->on_done_bind, finished_loose_op, call); - /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */ - gpr_ref_init(&call->internal_refcount, 2); - /* server hack: start reads immediately so we can get initial metadata. - TODO(ctiller): figure out a cleaner solution */ - if (!call->is_client) { - memset(&initial_op, 0, sizeof(initial_op)); - initial_op.recv_ops = &call->recv_ops; - initial_op.recv_state = &call->recv_state; - initial_op.on_done_recv = &call->on_done_recv; - initial_op.context = call->context; - call->receiving = 1; - GRPC_CALL_INTERNAL_REF(call, "receiving"); - initial_op_ptr = &initial_op; + /* initial refcount dropped by grpc_call_destroy */ + grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, + call->context, server_transport_data, + CALL_STACK_FROM_CALL(call)); + if (cq != NULL) { + GRPC_CQ_INTERNAL_REF(cq, "bind"); + grpc_call_stack_set_pollset(&exec_ctx, CALL_STACK_FROM_CALL(call), + grpc_cq_pollset(cq)); } - grpc_call_stack_init(&exec_ctx, channel_stack, server_transport_data, - initial_op_ptr, CALL_STACK_FROM_CALL(call)); if (parent_call != NULL) { GRPC_CALL_INTERNAL_REF(parent_call, "child"); GPR_ASSERT(call->is_client); @@ -408,90 +329,55 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_completion_queue *cq) { - lock(call); + GPR_ASSERT(cq); call->cq = cq; - if (cq) { - GRPC_CQ_INTERNAL_REF(cq, "bind"); - } - unlock(exec_ctx, call); + GRPC_CQ_INTERNAL_REF(cq, "bind"); + grpc_call_stack_set_pollset(exec_ctx, CALL_STACK_FROM_CALL(call), + grpc_cq_pollset(cq)); } -grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { - return call->cq; -} - -static grpc_cq_completion *allocate_completion(grpc_call *call) { - gpr_uint8 i; - gpr_mu_lock(&call->completion_mu); - for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { - if (call->allocated_completions & (1u << i)) { - continue; - } - /* NB: the following integer arithmetic operation needs to be in its - * expanded form due to the "integral promotion" performed (see section - * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type - * is then required to avoid the compiler warning */ - call->allocated_completions = - (gpr_uint8)(call->allocated_completions | (1u << i)); - gpr_mu_unlock(&call->completion_mu); - return &call->completions[i]; - } - GPR_UNREACHABLE_CODE(return NULL); - return NULL; -} - -static void done_completion(grpc_exec_ctx *exec_ctx, void *call, - grpc_cq_completion *completion) { - grpc_call *c = call; - gpr_mu_lock(&c->completion_mu); - c->allocated_completions &= - (gpr_uint8) ~(1u << (completion - c->completions)); - gpr_mu_unlock(&c->completion_mu); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, c, "completion"); -} - -#ifdef GRPC_CALL_REF_COUNT_DEBUG -void grpc_call_internal_ref(grpc_call *c, const char *reason) { - gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, - c->internal_refcount.count, c->internal_refcount.count + 1, reason); +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#define REF_REASON reason +#define REF_ARG , const char *reason #else -void grpc_call_internal_ref(grpc_call *c) { +#define REF_REASON "" +#define REF_ARG #endif - gpr_ref(&c->internal_refcount); +void grpc_call_internal_ref(grpc_call *c REF_ARG) { + GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON); +} +void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { + GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } -static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) { +static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) { size_t i; + int ii; grpc_call *c = call; GPR_TIMER_BEGIN("destroy_call", 0); + for (i = 0; i < 2; i++) { + grpc_metadata_batch_destroy( + &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); + } + if (c->receiving_stream != NULL) { + grpc_byte_stream_destroy(c->receiving_stream); + } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); gpr_mu_destroy(&c->mu); - gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { GRPC_MDSTR_UNREF(c->status[i].details); } } - for (i = 0; i < c->owned_metadata_count; i++) { - GRPC_MDELEM_UNREF(c->owned_metadata[i]); - } - gpr_free(c->owned_metadata); - for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { - gpr_free(c->buffered_metadata[i].metadata); - } - for (i = 0; i < c->send_initial_metadata_count; i++) { - GRPC_MDELEM_UNREF(c->send_initial_metadata[i].md); + for (ii = 0; ii < c->send_extra_metadata_count; ii++) { + GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md); } for (i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (c->context[i].destroy) { c->context[i].destroy(c->context[i].value); } } - grpc_sopb_destroy(&c->send_ops); - grpc_sopb_destroy(&c->recv_ops); - grpc_bbq_destroy(&c->incoming_queue); - gpr_slice_buffer_destroy(&c->incoming_message); if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } @@ -499,30 +385,14 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) { GPR_TIMER_END("destroy_call", 0); } -#ifdef GRPC_CALL_REF_COUNT_DEBUG -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c, - const char *reason) { - gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c, - c->internal_refcount.count, c->internal_refcount.count - 1, reason); -#else -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) { -#endif - if (gpr_unref(&c->internal_refcount)) { - destroy_call(exec_ctx, c); - } -} - static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { if (call->status[source].is_set) return; call->status[source].is_set = 1; call->status[source].code = (grpc_status_code)status; - call->error_status_set = status != GRPC_STATUS_OK; - if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) { - grpc_bbq_flush(&call->incoming_queue); - } + /* TODO(ctiller): what to do about the flush that was previously here */ } static void set_compression_algorithm(grpc_call *call, @@ -539,6 +409,14 @@ grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( return algorithm; } +gpr_uint32 grpc_call_test_only_get_message_flags(grpc_call *call) { + gpr_uint32 flags; + gpr_mu_lock(&call->mu); + flags = call->test_only_last_message_flags; + gpr_mu_unlock(&call->mu); + return flags; +} + static void destroy_encodings_accepted_by_peer(void *p) { return; } static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { @@ -596,14 +474,6 @@ gpr_uint32 grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { return encodings_accepted_by_peer; } -gpr_uint32 grpc_call_test_only_get_message_flags(grpc_call *call) { - gpr_uint32 flags; - gpr_mu_lock(&call->mu); - flags = call->incoming_message_flags; - gpr_mu_unlock(&call->mu); - return flags; -} - static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { @@ -612,149 +482,39 @@ static void set_status_details(grpc_call *call, status_source source, call->status[source].details = status; } -static int is_op_live(grpc_call *call, grpc_ioreq_op op) { - gpr_uint8 set = call->request_set[op]; - reqinfo_master *master; - if (set >= GRPC_IOREQ_OP_COUNT) return 0; - master = &call->masters[set]; - return (master->complete_mask & (1u << op)) == 0; -} - -static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } - -static int need_more_data(grpc_call *call) { - if (call->read_state == READ_STATE_STREAM_CLOSED) return 0; - /* TODO(ctiller): this needs some serious cleanup */ - return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || - (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && - grpc_bbq_empty(&call->incoming_queue)) || - is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || - is_op_live(call, GRPC_IOREQ_RECV_STATUS) || - is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || - (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && - grpc_bbq_empty(&call->incoming_queue)) || - (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called; -} - -static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) { - grpc_transport_stream_op op; - completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; - int completing_requests = 0; - int start_op = 0; - int i; - const size_t MAX_RECV_PEEK_AHEAD = 65536; - size_t buffered_bytes; - - GPR_TIMER_BEGIN("unlock", 0); - - memset(&op, 0, sizeof(op)); - - op.cancel_with_status = call->cancel_with_status; - start_op = op.cancel_with_status != GRPC_STATUS_OK; - call->cancel_with_status = GRPC_STATUS_OK; /* reset */ - - if (!call->receiving && need_more_data(call)) { - if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { - op.max_recv_bytes = call->incoming_message_length - - call->incoming_message.length + MAX_RECV_PEEK_AHEAD; - } else { - buffered_bytes = grpc_bbq_bytes(&call->incoming_queue); - if (buffered_bytes > MAX_RECV_PEEK_AHEAD) { - op.max_recv_bytes = 0; - } else { - op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; - } - } - /* TODO(ctiller): 1024 is basically to cover a bug - I don't understand yet */ - if (op.max_recv_bytes > 1024) { - op.recv_ops = &call->recv_ops; - op.recv_state = &call->recv_state; - op.on_done_recv = &call->on_done_recv; - call->receiving = 1; - GRPC_CALL_INTERNAL_REF(call, "receiving"); - start_op = 1; - } - } - - if (!call->sending) { - if (fill_send_ops(call, &op)) { - call->sending = 1; - GRPC_CALL_INTERNAL_REF(call, "sending"); - start_op = 1; - } - } - - if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) { - call->bound_pollset = 1; - op.bind_pollset = grpc_cq_pollset(call->cq); - start_op = 1; - } - - if (!call->completing && call->num_completed_requests != 0) { - completing_requests = call->num_completed_requests; - memcpy(completed_requests, call->completed_requests, - sizeof(completed_requests)); - call->num_completed_requests = 0; - call->completing = 1; - GRPC_CALL_INTERNAL_REF(call, "completing"); - } - - gpr_mu_unlock(&call->mu); - - if (start_op) { - execute_op(exec_ctx, call, &op); - } - - if (completing_requests > 0) { - for (i = 0; i < completing_requests; i++) { - completed_requests[i].on_complete(exec_ctx, call, - completed_requests[i].success, - completed_requests[i].user_data); - } - lock(call); - call->completing = 0; - unlock(exec_ctx, call); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing"); - } - - GPR_TIMER_END("unlock", 0); -} - -static void get_final_status(grpc_call *call, grpc_ioreq_data out) { +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].is_set) { - out.recv_status.set_value(call->status[i].code, - out.recv_status.user_data); + set_value(call->status[i].code, set_value_user_data); return; } } if (call->is_client) { - out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data); + set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); } else { - out.recv_status.set_value(GRPC_STATUS_OK, out.recv_status.user_data); + set_value(GRPC_STATUS_OK, set_value_user_data); } } -static void get_final_details(grpc_call *call, grpc_ioreq_data out) { +static void get_final_details(grpc_call *call, char **out_details, + size_t *out_details_capacity) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].is_set) { if (call->status[i].details) { gpr_slice details = call->status[i].details->slice; size_t len = GPR_SLICE_LENGTH(details); - if (len + 1 > *out.recv_status_details.details_capacity) { - *out.recv_status_details.details_capacity = GPR_MAX( - len + 1, *out.recv_status_details.details_capacity * 3 / 2); - *out.recv_status_details.details = - gpr_realloc(*out.recv_status_details.details, - *out.recv_status_details.details_capacity); + if (len + 1 > *out_details_capacity) { + *out_details_capacity = + GPR_MAX(len + 1, *out_details_capacity * 3 / 2); + *out_details = gpr_realloc(*out_details, *out_details_capacity); } - memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details), - len); - (*out.recv_status_details.details)[len] = 0; + memcpy(*out_details, GPR_SLICE_START_PTR(details), len); + (*out_details)[len] = 0; } else { goto no_details; } @@ -763,333 +523,45 @@ static void get_final_details(grpc_call *call, grpc_ioreq_data out) { } no_details: - if (0 == *out.recv_status_details.details_capacity) { - *out.recv_status_details.details_capacity = 8; - *out.recv_status_details.details = - gpr_malloc(*out.recv_status_details.details_capacity); - } - **out.recv_status_details.details = 0; -} - -static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, - int success) { - completed_request *cr; - gpr_uint8 master_set = call->request_set[op]; - reqinfo_master *master; - size_t i; - /* ioreq is live: we need to do something */ - master = &call->masters[master_set]; - /* NB: the following integer arithmetic operation needs to be in its - * expanded form due to the "integral promotion" performed (see section - * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type - * is then required to avoid the compiler warning */ - master->complete_mask = (gpr_uint16)(master->complete_mask | (1u << op)); - if (!success) { - master->success = 0; - } - if (master->complete_mask == master->need_mask) { - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (call->request_set[i] != master_set) { - continue; - } - call->request_set[i] = REQSET_DONE; - switch ((grpc_ioreq_op)i) { - case GRPC_IOREQ_RECV_MESSAGE: - case GRPC_IOREQ_SEND_MESSAGE: - call->request_set[i] = REQSET_EMPTY; - if (!master->success) { - call->write_state = WRITE_STATE_WRITE_CLOSED; - } - break; - case GRPC_IOREQ_SEND_STATUS: - if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != - NULL) { - GRPC_MDSTR_UNREF( - call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); - call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = - NULL; - } - break; - case GRPC_IOREQ_RECV_CLOSE: - case GRPC_IOREQ_SEND_INITIAL_METADATA: - case GRPC_IOREQ_SEND_TRAILING_METADATA: - case GRPC_IOREQ_SEND_CLOSE: - break; - case GRPC_IOREQ_RECV_STATUS: - get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]); - break; - case GRPC_IOREQ_RECV_STATUS_DETAILS: - get_final_details(call, - call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]); - break; - case GRPC_IOREQ_RECV_INITIAL_METADATA: - GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0], - *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA] - .recv_metadata); - break; - case GRPC_IOREQ_RECV_TRAILING_METADATA: - GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1], - *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA] - .recv_metadata); - break; - case GRPC_IOREQ_OP_COUNT: - abort(); - break; - } - } - cr = &call->completed_requests[call->num_completed_requests++]; - cr->success = master->success; - cr->on_complete = master->on_complete; - cr->user_data = master->user_data; - } -} - -static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { - if (is_op_live(call, op)) { - finish_live_ioreq_op(call, op, success); + if (0 == *out_details_capacity) { + *out_details_capacity = 8; + *out_details = gpr_malloc(*out_details_capacity); } + **out_details = 0; } -static void early_out_write_ops(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - /* fallthrough */ - case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); - /* fallthrough */ - case WRITE_STATE_INITIAL: - /* do nothing */ - break; - } +static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { + return (grpc_linked_mdelem *)&md->internal_data; } -static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *pc, int success) { - grpc_call *call = pc; - GPR_TIMER_BEGIN("call_on_done_send", 0); - lock(call); - if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); - call->write_state = WRITE_STATE_STARTED; - } - if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success); - } - if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - call->write_state = WRITE_STATE_WRITE_CLOSED; - } - if (!success) { - call->write_state = WRITE_STATE_WRITE_CLOSED; - early_out_write_ops(call); - } - call->send_ops.nops = 0; - call->last_send_contains = 0; - call->sending = 0; - unlock(exec_ctx, call); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "sending"); - GPR_TIMER_END("call_on_done_send", 0); -} - -static void finish_message(grpc_call *call) { - GPR_TIMER_BEGIN("finish_message", 0); - if (call->error_status_set == 0) { - /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer; - /* some aliases for readability */ - gpr_slice *slices = call->incoming_message.slices; - const size_t nslices = call->incoming_message.count; - - if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm > GRPC_COMPRESS_NONE)) { - byte_buffer = grpc_raw_compressed_byte_buffer_create( - slices, nslices, call->compression_algorithm); +static int prepare_application_metadata(grpc_call *call, int count, + grpc_metadata *metadata, + int is_trailing, + int prepend_extra_metadata) { + int i; + grpc_metadata_batch *batch = + &call->metadata_batch[0 /* is_receiving */][is_trailing]; + if (prepend_extra_metadata) { + if (call->send_extra_metadata_count == 0) { + prepend_extra_metadata = 0; } else { - byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); - } - grpc_bbq_push(&call->incoming_queue, byte_buffer); - } - gpr_slice_buffer_reset_and_unref(&call->incoming_message); - GPR_ASSERT(call->incoming_message.count == 0); - call->reading_message = 0; - GPR_TIMER_END("finish_message", 0); -} - -static int begin_message(grpc_call *call, grpc_begin_message msg) { - /* can't begin a message when we're still reading a message */ - if (call->reading_message) { - char *message = NULL; - gpr_asprintf( - &message, "Message terminated early; read %d bytes, expected %d", - (int)call->incoming_message.length, (int)call->incoming_message_length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); - gpr_free(message); - return 0; - } - /* sanity check: if message flags indicate a compressed message, the - * compression level should already be present in the call, as parsed off its - * corresponding metadata. */ - if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm == GRPC_COMPRESS_NONE)) { - char *message = NULL; - char *alg_name; - if (!grpc_compression_algorithm_name(call->compression_algorithm, - &alg_name)) { - /* This shouldn't happen, other than due to data corruption */ - alg_name = "<unknown>"; - } - gpr_asprintf(&message, - "Invalid compression algorithm (%s) for compressed message.", - alg_name); - cancel_with_status(call, GRPC_STATUS_INTERNAL, message); - gpr_free(message); - return 0; - } - /* stash away parameters, and prepare for incoming slices */ - if (msg.length > grpc_channel_get_max_message_length(call->channel)) { - char *message = NULL; - gpr_asprintf( - &message, - "Maximum message length of %d exceeded by a message of length %d", - grpc_channel_get_max_message_length(call->channel), msg.length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); - gpr_free(message); - return 0; - } else if (msg.length > 0) { - call->reading_message = 1; - call->incoming_message_length = msg.length; - call->incoming_message_flags = msg.flags; - return 1; - } else { - finish_message(call); - return 1; - } -} - -static int add_slice_to_message(grpc_call *call, gpr_slice slice) { - if (GPR_SLICE_LENGTH(slice) == 0) { - gpr_slice_unref(slice); - return 1; - } - /* we have to be reading a message to know what to do here */ - if (!call->reading_message) { - gpr_slice_unref(slice); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, - "Received payload data while not reading a message"); - return 0; - } - /* append the slice to the incoming buffer */ - gpr_slice_buffer_add(&call->incoming_message, slice); - if (call->incoming_message.length > call->incoming_message_length) { - /* if we got too many bytes, complain */ - char *message = NULL; - gpr_asprintf( - &message, "Receiving message overflow; read %d bytes, expected %d", - (int)call->incoming_message.length, (int)call->incoming_message_length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); - gpr_free(message); - return 0; - } else if (call->incoming_message.length == call->incoming_message_length) { - finish_message(call); - return 1; - } else { - return 1; - } -} - -static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) { - grpc_call *call = pc; - grpc_call *child_call; - grpc_call *next_child_call; - size_t i; - GPR_TIMER_BEGIN("call_on_done_recv", 0); - lock(call); - call->receiving = 0; - if (success) { - for (i = 0; success && i < call->recv_ops.nops; i++) { - grpc_stream_op *op = &call->recv_ops.ops[i]; - switch (op->type) { - case GRPC_NO_OP: - break; - case GRPC_OP_METADATA: - GPR_TIMER_BEGIN("recv_metadata", 0); - recv_metadata(exec_ctx, call, &op->data.metadata); - GPR_TIMER_END("recv_metadata", 0); - break; - case GRPC_OP_BEGIN_MESSAGE: - GPR_TIMER_BEGIN("begin_message", 0); - success = begin_message(call, op->data.begin_message); - GPR_TIMER_END("begin_message", 0); - break; - case GRPC_OP_SLICE: - GPR_TIMER_BEGIN("add_slice_to_message", 0); - success = add_slice_to_message(call, op->data.slice); - GPR_TIMER_END("add_slice_to_message", 0); - break; + for (i = 0; i < call->send_extra_metadata_count; i++) { + GRPC_MDELEM_REF(call->send_extra_metadata[i].md); } - } - if (!success) { - grpc_stream_ops_unref_owned_objects(&call->recv_ops.ops[i], - call->recv_ops.nops - i); - } - if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { - GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); - call->read_state = READ_STATE_READ_CLOSED; - } - if (call->recv_state == GRPC_STREAM_CLOSED) { - GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); - call->read_state = READ_STATE_STREAM_CLOSED; - if (call->have_alarm) { - grpc_timer_cancel(exec_ctx, &call->alarm); + for (i = 1; i < call->send_extra_metadata_count; i++) { + call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; } - /* propagate cancellation to any interested children */ - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); + for (i = 0; i < call->send_extra_metadata_count - 1; i++) { + call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; } - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "closed"); } - finish_read_ops(call); - } else { - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0); } - call->recv_ops.nops = 0; - unlock(exec_ctx, call); - - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "receiving"); - GPR_TIMER_END("call_on_done_recv", 0); -} - -static int prepare_application_metadata(grpc_call *call, size_t count, - grpc_metadata *metadata) { - size_t i; for (i = 0; i < count; i++) { grpc_metadata *md = &metadata[i]; - grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; - grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); - l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, - (const gpr_uint8 *)md->value, - md->value_length); + l->md = grpc_mdelem_from_string_and_buffer( + md->key, (const gpr_uint8 *)md->value, md->value_length); if (!grpc_mdstr_is_legal_header(l->md->key)) { gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", grpc_mdstr_as_c_string(l->md->key)); @@ -1099,243 +571,49 @@ static int prepare_application_metadata(grpc_call *call, size_t count, gpr_log(GPR_ERROR, "attempt to send invalid metadata value"); return 0; } - l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; - l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; } - return 1; -} - -static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, - grpc_metadata *metadata) { - grpc_mdelem_list out; - if (count == 0) { - out.head = out.tail = NULL; - return out; + for (i = 1; i < count; i++) { + linked_from_md(&metadata[i])->prev = linked_from_md(&metadata[i - 1]); } - out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); - out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); - return out; -} - -/* Copy the contents of a byte buffer into stream ops */ -static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, - grpc_stream_op_buffer *sopb) { - size_t i; - - switch (byte_buffer->type) { - case GRPC_BB_RAW: - for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) { - gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i]; - gpr_slice_ref(slice); - grpc_sopb_add_slice(sopb, slice); - } - break; + for (i = 0; i < count - 1; i++) { + linked_from_md(&metadata[i])->next = linked_from_md(&metadata[i + 1]); } -} - -static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { - grpc_ioreq_data data; - gpr_uint32 flags; - grpc_metadata_batch mdb; - size_t i; - GPR_ASSERT(op->send_ops == NULL); - - switch (call->write_state) { - case WRITE_STATE_INITIAL: - if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { - break; - } - data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - mdb.list = chain_metadata_from_app(call, data.send_metadata.count, - data.send_metadata.metadata); - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = call->send_deadline; - for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]); - } - grpc_sopb_add_metadata(&call->send_ops, mdb); - op->send_ops = &call->send_ops; - call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; - call->send_initial_metadata_count = 0; - /* fall through intended */ - case WRITE_STATE_STARTED: - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { - size_t length; - data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE]; - length = grpc_byte_buffer_length(data.send_message); - GPR_ASSERT(length <= GPR_UINT32_MAX); - grpc_sopb_add_begin_message(&call->send_ops, (gpr_uint32)length, flags); - copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); - op->send_ops = &call->send_ops; - call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; - } - if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - op->is_last_send = 1; - op->send_ops = &call->send_ops; - call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; - if (!call->is_client) { - /* send trailing metadata */ - data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - mdb.list = chain_metadata_from_app(call, data.send_metadata.count, - data.send_metadata.metadata); - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - /* send status */ - /* TODO(ctiller): cache common status values */ - data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - grpc_metadata_batch_add_tail( - &mdb, &call->status_link, - grpc_channel_get_reffed_status_elem(call->channel, - data.send_status.code)); - if (data.send_status.details) { - grpc_metadata_batch_add_tail( - &mdb, &call->details_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - GRPC_MDSTR_REF( - grpc_channel_get_message_string(call->channel)), - data.send_status.details)); - call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = - NULL; - } - grpc_sopb_add_metadata(&call->send_ops, mdb); - } - } + switch (prepend_extra_metadata * 2 + (count != 0)) { + case 0: + /* no prepend, no metadata => nothing to do */ + batch->list.head = batch->list.tail = NULL; break; - case WRITE_STATE_WRITE_CLOSED: + case 1: + /* metadata, but no prepend */ + batch->list.head = linked_from_md(&metadata[0]); + batch->list.tail = linked_from_md(&metadata[count - 1]); + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; break; - } - if (op->send_ops) { - op->on_done_send = &call->on_done_send; - } - return op->send_ops != NULL; -} - -static grpc_call_error start_ioreq_error(grpc_call *call, - gpr_uint32 mutated_ops, - grpc_call_error ret) { - size_t i; - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (mutated_ops & (1u << i)) { - call->request_set[i] = REQSET_EMPTY; - } - } - return ret; -} - -static void finish_read_ops(grpc_call *call) { - int empty; - - if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) { - empty = - (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message = - grpc_bbq_pop(&call->incoming_queue))); - if (!empty) { - finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1); - empty = grpc_bbq_empty(&call->incoming_queue); - } - } else { - empty = grpc_bbq_empty(&call->incoming_queue); - } - - switch (call->read_state) { - case READ_STATE_STREAM_CLOSED: - if (empty && !call->have_alarm) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1); - } - /* fallthrough */ - case READ_STATE_READ_CLOSED: - if (empty) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1); - } - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 1); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 1); - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 1); - /* fallthrough */ - case READ_STATE_GOT_INITIAL_METADATA: - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 1); - /* fallthrough */ - case READ_STATE_INITIAL: - /* do nothing */ + case 2: + /* prepend, but no md */ + batch->list.head = &call->send_extra_metadata[0]; + batch->list.tail = + &call->send_extra_metadata[call->send_extra_metadata_count - 1]; + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; break; + case 3: + /* prepend AND md */ + batch->list.head = &call->send_extra_metadata[0]; + call->send_extra_metadata[call->send_extra_metadata_count - 1].next = + linked_from_md(&metadata[0]); + linked_from_md(&metadata[0])->prev = + &call->send_extra_metadata[call->send_extra_metadata_count - 1]; + batch->list.tail = linked_from_md(&metadata[count - 1]); + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; + break; + default: + GPR_UNREACHABLE_CODE(return 0); } -} - -static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, - grpc_ioreq_completion_func completion, - void *user_data) { - size_t i; - gpr_uint16 have_ops = 0; - grpc_ioreq_op op; - reqinfo_master *master; - grpc_ioreq_data data; - gpr_uint8 set; - - if (nreqs == 0) { - return GRPC_CALL_OK; - } - - set = reqs[0].op; - - for (i = 0; i < nreqs; i++) { - op = reqs[i].op; - if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) { - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); - } else if (call->request_set[op] == REQSET_DONE) { - return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED); - } - data = reqs[i].data; - if (op == GRPC_IOREQ_SEND_INITIAL_METADATA || - op == GRPC_IOREQ_SEND_TRAILING_METADATA) { - if (!prepare_application_metadata(call, data.send_metadata.count, - data.send_metadata.metadata)) { - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_INVALID_METADATA); - } - } - if (op == GRPC_IOREQ_SEND_STATUS) { - set_status_code(call, STATUS_FROM_SERVER_STATUS, - (gpr_uint32)reqs[i].data.send_status.code); - if (reqs[i].data.send_status.details) { - set_status_details(call, STATUS_FROM_SERVER_STATUS, - GRPC_MDSTR_REF(reqs[i].data.send_status.details)); - } - } - /* NB: the following integer arithmetic operation needs to be in its - * expanded form due to the "integral promotion" performed (see section - * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type - * is then required to avoid the compiler warning */ - have_ops = (gpr_uint16)(have_ops | (1u << op)); - - call->request_data[op] = data; - call->request_flags[op] = reqs[i].flags; - call->request_set[op] = set; - } - - master = &call->masters[set]; - master->success = 1; - master->need_mask = have_ops; - master->complete_mask = 0; - master->on_complete = completion; - master->user_data = user_data; - - finish_read_ops(call); - early_out_write_ops(call); - - return GRPC_CALL_OK; -} -grpc_call_error grpc_call_start_ioreq_and_call_back( - grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) { - grpc_call_error err; - lock(call); - err = start_ioreq(call, reqs, nreqs, on_complete, user_data); - unlock(exec_ctx, call); - return err; + return 1; } void grpc_call_destroy(grpc_call *c) { @@ -1343,6 +621,7 @@ void grpc_call_destroy(grpc_call *c) { grpc_call *parent = c->parent; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_call_destroy", 0); GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); if (parent) { @@ -1359,17 +638,18 @@ void grpc_call_destroy(grpc_call *c) { GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); } - lock(c); + gpr_mu_lock(&c->mu); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; if (c->have_alarm) { grpc_timer_cancel(&exec_ctx, &c->alarm); } - cancel = c->read_state != READ_STATE_STREAM_CLOSED; - unlock(&exec_ctx, c); + cancel = !c->received_final_op; + gpr_mu_unlock(&c->mu); if (cancel) grpc_call_cancel(c, NULL); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_call_destroy", 0); } grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { @@ -1390,73 +670,80 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); - lock(c); - r = cancel_with_status(c, status, description); - unlock(&exec_ctx, c); + gpr_mu_lock(&c->mu); + r = cancel_with_status(&exec_ctx, c, status, description); + gpr_mu_unlock(&c->mu); grpc_exec_ctx_finish(&exec_ctx); return r; } -static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, +typedef struct cancel_closure { + grpc_closure closure; + grpc_call *call; + grpc_status_code status; +} cancel_closure; + +static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) { + cancel_closure *cc = ccp; + GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); + gpr_free(cc); +} + +static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) { + grpc_transport_stream_op op; + cancel_closure *cc = ccp; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = cc->status; + /* reuse closure to catch completion */ + grpc_closure_init(&cc->closure, done_cancel, cc); + op.on_complete = &cc->closure; + execute_op(exec_ctx, cc->call, &op); +} + +static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, const char *description) { grpc_mdstr *details = - description ? grpc_mdstr_from_string(c->metadata_context, description) - : NULL; + description ? grpc_mdstr_from_string(description) : NULL; + cancel_closure *cc = gpr_malloc(sizeof(*cc)); GPR_ASSERT(status != GRPC_STATUS_OK); set_status_code(c, STATUS_FROM_API_OVERRIDE, (gpr_uint32)status); set_status_details(c, STATUS_FROM_API_OVERRIDE, details); - c->cancel_with_status = status; + grpc_closure_init(&cc->closure, send_cancel, cc); + cc->call = c; + cc->status = status; + GRPC_CALL_INTERNAL_REF(c, "cancel"); + grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, 1); return GRPC_CALL_OK; } -static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, - int success_ignored) { - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "loose-op"); -} - -typedef struct { - grpc_call *call; - grpc_closure closure; -} finished_loose_op_allocated_args; - -static void finished_loose_op_allocated(grpc_exec_ctx *exec_ctx, void *alloc, - int success) { - finished_loose_op_allocated_args *args = alloc; - finished_loose_op(exec_ctx, args->call, success); - gpr_free(args); -} - static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op) { grpc_call_element *elem; - GPR_ASSERT(op->on_consumed == NULL); - if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) { - GRPC_CALL_INTERNAL_REF(call, "loose-op"); - if (op->bind_pollset) { - op->on_consumed = &call->on_done_bind; - } else { - finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); - args->call = call; - grpc_closure_init(&args->closure, finished_loose_op_allocated, args); - op->on_consumed = &args->closure; - } - } - + GPR_TIMER_BEGIN("execute_op", 0); elem = CALL_ELEM_FROM_CALL(call, 0); op->context = call->context; elem->filter->start_transport_stream_op(exec_ctx, elem, op); + GPR_TIMER_END("execute_op", 0); } char *grpc_call_get_peer(grpc_call *call) { grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *result = elem->filter->get_peer(&exec_ctx, elem); + char *result; GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); + result = elem->filter->get_peer(&exec_ctx, elem); + if (result == NULL) { + result = grpc_channel_get_target(call->channel); + } + if (result == NULL) { + result = gpr_strdup("unknown"); + } grpc_exec_ctx_finish(&exec_ctx); return result; } @@ -1467,14 +754,13 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) { grpc_call *call = arg; - lock(call); + gpr_mu_lock(&call->mu); call->have_alarm = 0; if (success) { - cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, + cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED, "Deadline Exceeded"); } - finish_read_ops(call); - unlock(exec_ctx, call); + gpr_mu_unlock(&call->mu); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm"); } @@ -1500,8 +786,12 @@ static void destroy_status(void *ignored) {} static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; - void *user_data = grpc_mdelem_get_user_data(md, destroy_status); - if (user_data) { + void *user_data; + if (md == GRPC_MDELEM_GRPC_STATUS_0) return 0; + if (md == GRPC_MDELEM_GRPC_STATUS_1) return 1; + if (md == GRPC_MDELEM_GRPC_STATUS_2) return 2; + user_data = grpc_mdelem_get_user_data(md, destroy_status); + if (user_data != NULL) { status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), @@ -1515,101 +805,77 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -/* just as for status above, we need to offset: metadata userdata can't hold a - * zero (null), which in this case is used to signal no compression */ -#define COMPRESS_OFFSET 1 -static void destroy_compression(void *ignored) {} - static gpr_uint32 decode_compression(grpc_mdelem *md) { - grpc_compression_algorithm algorithm; - void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); - if (user_data) { - algorithm = - ((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET; - } else { + grpc_compression_algorithm algorithm = + grpc_compression_algorithm_from_mdstr(md->value); + if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), - &algorithm)) { - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); - assert(0); - } - grpc_mdelem_set_user_data( - md, destroy_compression, - (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); + gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); } return algorithm; } -static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *md) { - grpc_linked_mdelem *l; +static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) { + if (elem->key == GRPC_MDSTR_GRPC_STATUS) { + GPR_TIMER_BEGIN("status", 0); + set_status_code(call, STATUS_FROM_WIRE, decode_status(elem)); + GPR_TIMER_END("status", 0); + return NULL; + } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) { + GPR_TIMER_BEGIN("status-details", 0); + set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value)); + GPR_TIMER_END("status-details", 0); + return NULL; + } + return elem; +} + +static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem, + int is_trailing) { grpc_metadata_array *dest; grpc_metadata *mdusr; - int is_trailing; - - is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; - for (l = md->list.head; l != NULL; l = l->next) { - grpc_mdelem *mdel = l->md; - grpc_mdstr *key = mdel->key; - if (key == grpc_channel_get_status_string(call->channel)) { - GPR_TIMER_BEGIN("status", 0); - set_status_code(call, STATUS_FROM_WIRE, decode_status(mdel)); - GPR_TIMER_END("status", 0); - } else if (key == grpc_channel_get_message_string(call->channel)) { - GPR_TIMER_BEGIN("status-details", 0); - set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(mdel->value)); - GPR_TIMER_END("status-details", 0); - } else if (key == - grpc_channel_get_compression_algorithm_string(call->channel)) { - GPR_TIMER_BEGIN("compression_algorithm", 0); - set_compression_algorithm(call, decode_compression(mdel)); - GPR_TIMER_END("compression_algorithm", 0); - } else if (key == grpc_channel_get_encodings_accepted_by_peer_string( - call->channel)) { - GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); - set_encodings_accepted_by_peer(call, mdel); - GPR_TIMER_END("encodings_accepted_by_peer", 0); - } else { - GPR_TIMER_BEGIN("report_up", 0); - dest = &call->buffered_metadata[is_trailing]; - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(mdel->key); - mdusr->value = grpc_mdstr_as_c_string(mdel->value); - mdusr->value_length = GPR_SLICE_LENGTH(mdel->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = - GPR_MAX(call->owned_metadata_capacity + 8, - call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); - } - call->owned_metadata[call->owned_metadata_count++] = mdel; - l->md = NULL; - GPR_TIMER_END("report_up", 0); - } - } - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); - } - if (!is_trailing) { - call->read_state = READ_STATE_GOT_INITIAL_METADATA; - } + GPR_TIMER_BEGIN("publish_app_metadata", 0); + dest = call->buffered_metadata[is_trailing]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(elem->key); + mdusr->value = grpc_mdstr_as_c_string(elem->value); + mdusr->value_length = GPR_SLICE_LENGTH(elem->value->slice); + GPR_TIMER_END("publish_app_metadata", 0); + return elem; +} - for (l = md->list.head; l; l = l->next) { - if (l->md) GRPC_MDELEM_UNREF(l->md); +static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { + grpc_call *call = callp; + elem = recv_common_filter(call, elem); + if (elem == NULL) { + return NULL; + } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { + GPR_TIMER_BEGIN("compression_algorithm", 0); + set_compression_algorithm(call, decode_compression(elem)); + GPR_TIMER_END("compression_algorithm", 0); + return NULL; + } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { + GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); + set_encodings_accepted_by_peer(call, elem); + GPR_TIMER_END("encodings_accepted_by_peer", 0); + return NULL; + } else { + return publish_app_metadata(call, elem, 0); } - for (l = md->garbage.head; l; l = l->next) { - GRPC_MDELEM_UNREF(l->md); +} + +static grpc_mdelem *recv_trailing_filter(void *callp, grpc_mdelem *elem) { + grpc_call *call = callp; + elem = recv_common_filter(call, elem); + if (elem == NULL) { + return NULL; + } else { + return publish_app_metadata(call, elem, 1); } } @@ -1626,19 +892,7 @@ static void set_status_value_directly(grpc_status_code status, void *dest) { } static void set_cancelled_value(grpc_status_code status, void *dest) { - *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); -} - -static void finish_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, int success, - void *tag) { - grpc_cq_end_op(exec_ctx, call->cq, tag, success, done_completion, call, - allocate_completion(call)); -} - -static void finish_batch_with_close(grpc_exec_ctx *exec_ctx, grpc_call *call, - int success, void *tag) { - grpc_cq_end_op(exec_ctx, call->cq, tag, 1, done_completion, call, - allocate_completion(call)); + *(int *)dest = (status != GRPC_STATUS_OK); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1649,258 +903,504 @@ static int are_write_flags_valid(gpr_uint32 flags) { return !(flags & invalid_positions); } -grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag, void *reserved) { - grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; - size_t in; - size_t out; +static batch_control *allocate_batch_control(grpc_call *call) { + size_t i; + for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { + if ((call->used_batches & (1 << i)) == 0) { + call->used_batches = + (gpr_uint8)(call->used_batches | (gpr_uint8)(1 << i)); + return &call->active_batches[i]; + } + } + return NULL; +} + +static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_cq_completion *storage) { + batch_control *bctl = user_data; + grpc_call *call = bctl->call; + gpr_mu_lock(&call->mu); + call->used_batches = (gpr_uint8)( + call->used_batches & ~(gpr_uint8)(1 << (bctl - call->active_batches))); + gpr_mu_unlock(&call->mu); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); +} + +static void post_batch_completion(grpc_exec_ctx *exec_ctx, + batch_control *bctl) { + grpc_call *call = bctl->call; + if (bctl->is_notify_tag_closure) { + grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); + gpr_mu_lock(&call->mu); + bctl->call->used_batches = + (gpr_uint8)(bctl->call->used_batches & + ~(gpr_uint8)(1 << (bctl - bctl->call->active_batches))); + gpr_mu_unlock(&call->mu); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); + } else { + grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success, + finish_batch_completion, bctl, &bctl->cq_completion); + } +} + +static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, + batch_control *bctl) { + grpc_call *call = bctl->call; + for (;;) { + size_t remaining = call->receiving_stream->length - + (*call->receiving_buffer)->data.raw.slice_buffer.length; + if (remaining == 0) { + call->receiving_message = 0; + grpc_byte_stream_destroy(call->receiving_stream); + call->receiving_stream = NULL; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + return; + } + if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, + &call->receiving_slice, remaining, + &call->receiving_slice_ready)) { + gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + } else { + return; + } + } +} + +static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + int success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + GPR_ASSERT(success); + gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + + continue_receiving_slices(exec_ctx, bctl); +} + +static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + grpc_call *child_call; + grpc_call *next_child_call; + + gpr_mu_lock(&call->mu); + if (bctl->send_initial_metadata) { + grpc_metadata_batch_destroy( + &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); + } + if (bctl->send_message) { + call->sending_message = 0; + } + if (bctl->send_final_op) { + grpc_metadata_batch_destroy( + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); + } + if (bctl->recv_initial_metadata) { + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_initial_filter, call); + + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0 && + !call->is_client) { + GPR_TIMER_BEGIN("set_deadline_alarm", 0); + set_deadline_alarm(exec_ctx, call, md->deadline); + GPR_TIMER_END("set_deadline_alarm", 0); + } + } + if (bctl->recv_final_op) { + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_trailing_filter, call); + + if (call->have_alarm) { + grpc_timer_cancel(exec_ctx, &call->alarm); + } + /* propagate cancellation to any interested children */ + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call, NULL); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } + + if (call->is_client) { + get_final_status(call, set_status_value_directly, + call->final_op.client.status); + get_final_details(call, call->final_op.client.status_details, + call->final_op.client.status_details_capacity); + } else { + get_final_status(call, set_cancelled_value, + call->final_op.server.cancelled); + } + + success = 1; + } + bctl->success = success != 0; + gpr_mu_unlock(&call->mu); + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } +} + +static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + int success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + if (call->receiving_stream == NULL) { + *call->receiving_buffer = NULL; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else if (call->receiving_stream->length > + grpc_channel_get_max_message_length(call->channel)) { + cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, + "Max message size exceeded"); + grpc_byte_stream_destroy(call->receiving_stream); + call->receiving_stream = NULL; + *call->receiving_buffer = NULL; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else { + call->test_only_last_message_flags = call->receiving_stream->flags; + if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( + NULL, 0, call->compression_algorithm); + } else { + *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); + } + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, + bctl); + continue_receiving_slices(exec_ctx, bctl); + /* early out */ + return; + } +} + +static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, + grpc_call *call, const grpc_op *ops, + size_t nops, void *notify_tag, + int is_notify_tag_closure) { + grpc_transport_stream_op stream_op; + size_t i; const grpc_op *op; - grpc_ioreq *req; - void (*finish_func)(grpc_exec_ctx *, grpc_call *, int, void *) = finish_batch; - grpc_call_error error; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + batch_control *bctl; + int num_completion_callbacks_needed = 1; + grpc_call_error error = GRPC_CALL_OK; GPR_TIMER_BEGIN("grpc_call_start_batch", 0); - GRPC_API_TRACE( - "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", - 5, (call, ops, (unsigned long)nops, tag, reserved)); + GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); - if (reserved != NULL) { - error = GRPC_CALL_ERROR; - goto done; - } + memset(&stream_op, 0, sizeof(stream_op)); - GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); + /* TODO(ctiller): this feels like it could be made lock-free */ + gpr_mu_lock(&call->mu); + bctl = allocate_batch_control(call); + memset(bctl, 0, sizeof(*bctl)); + bctl->call = call; + bctl->notify_tag = notify_tag; + bctl->is_notify_tag_closure = (gpr_uint8)(is_notify_tag_closure != 0); if (nops == 0) { - grpc_cq_begin_op(call->cq); GRPC_CALL_INTERNAL_REF(call, "completion"); - grpc_cq_end_op(&exec_ctx, call->cq, tag, 1, done_completion, call, - allocate_completion(call)); + bctl->success = 1; + if (!is_notify_tag_closure) { + grpc_cq_begin_op(call->cq, notify_tag); + } + gpr_mu_unlock(&call->mu); + post_batch_completion(exec_ctx, bctl); error = GRPC_CALL_OK; goto done; } - /* rewrite batch ops into ioreq ops */ - for (in = 0, out = 0; in < nops; in++) { - op = &ops[in]; + /* rewrite batch ops into a transport op */ + for (i = 0; i < nops; i++) { + op = &ops[i]; if (op->reserved != NULL) { error = GRPC_CALL_ERROR; - goto done; + goto done_with_error; } switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; + } + if (call->sent_initial_metadata) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (op->data.send_initial_metadata.count > INT_MAX) { + error = GRPC_CALL_ERROR_INVALID_METADATA; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; - req->data.send_metadata.count = op->data.send_initial_metadata.count; - req->data.send_metadata.metadata = - op->data.send_initial_metadata.metadata; - req->flags = op->flags; + bctl->send_initial_metadata = 1; + call->sent_initial_metadata = 1; + if (!prepare_application_metadata( + call, (int)op->data.send_initial_metadata.count, + op->data.send_initial_metadata.metadata, 0, call->is_client)) { + error = GRPC_CALL_ERROR_INVALID_METADATA; + goto done_with_error; + } + /* TODO(ctiller): just make these the same variable? */ + call->metadata_batch[0][0].deadline = call->send_deadline; + stream_op.send_initial_metadata = + &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } if (op->data.send_message == NULL) { error = GRPC_CALL_ERROR_INVALID_MESSAGE; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->sending_message) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_MESSAGE; - req->data.send_message = op->data.send_message; - req->flags = op->flags; + bctl->send_message = 1; + call->sending_message = 1; + grpc_slice_buffer_stream_init( + &call->sending_stream, + &op->data.send_message->data.raw.slice_buffer, op->flags); + stream_op.send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } if (!call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_SERVER; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->sent_final_op) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_CLOSE; - req->flags = op->flags; + bctl->send_final_op = 1; + call->sent_final_op = 1; + stream_op.send_trailing_metadata = + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } if (call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_CLIENT; - goto done; + goto done_with_error; + } + if (call->sent_final_op) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (op->data.send_status_from_server.trailing_metadata_count > + INT_MAX) { + error = GRPC_CALL_ERROR_INVALID_METADATA; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; - req->flags = op->flags; - req->data.send_metadata.count = - op->data.send_status_from_server.trailing_metadata_count; - req->data.send_metadata.metadata = - op->data.send_status_from_server.trailing_metadata; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + bctl->send_final_op = 1; + call->sent_final_op = 1; + call->send_extra_metadata_count = 1; + call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( + call->channel, op->data.send_status_from_server.status); + if (op->data.send_status_from_server.status_details != NULL) { + call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_GRPC_MESSAGE, + grpc_mdstr_from_string( + op->data.send_status_from_server.status_details)); + call->send_extra_metadata_count++; + set_status_details( + call, STATUS_FROM_API_OVERRIDE, + GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value)); } - req->op = GRPC_IOREQ_SEND_STATUS; - req->data.send_status.code = op->data.send_status_from_server.status; - req->data.send_status.details = - op->data.send_status_from_server.status_details != NULL - ? grpc_mdstr_from_string( - call->metadata_context, - op->data.send_status_from_server.status_details) - : NULL; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + set_status_code(call, STATUS_FROM_API_OVERRIDE, + (gpr_uint32)op->data.send_status_from_server.status); + if (!prepare_application_metadata( + call, + (int)op->data.send_status_from_server.trailing_metadata_count, + op->data.send_status_from_server.trailing_metadata, 1, 1)) { + error = GRPC_CALL_ERROR_INVALID_METADATA; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_CLOSE; + stream_op.send_trailing_metadata = + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; - } - if (!call->is_client) { - error = GRPC_CALL_ERROR_NOT_ON_SERVER; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->received_initial_metadata) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; - req->data.recv_metadata = op->data.recv_initial_metadata; - req->data.recv_metadata->count = 0; - req->flags = op->flags; + call->received_initial_metadata = 1; + call->buffered_metadata[0] = op->data.recv_initial_metadata; + bctl->recv_initial_metadata = 1; + stream_op.recv_initial_metadata = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; break; case GRPC_OP_RECV_MESSAGE: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->receiving_message) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_MESSAGE; - req->data.recv_message = op->data.recv_message; - req->flags = op->flags; + call->receiving_message = 1; + bctl->recv_message = 1; + call->receiving_buffer = op->data.recv_message; + stream_op.recv_message = &call->receiving_stream; + grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, + bctl); + stream_op.recv_message_ready = &call->receiving_stream_ready; + num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } if (!call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_SERVER; - goto done; - } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_STATUS; - req->flags = op->flags; - req->data.recv_status.set_value = set_status_value_directly; - req->data.recv_status.user_data = op->data.recv_status_on_client.status; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->received_final_op) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_STATUS_DETAILS; - req->data.recv_status_details.details = + call->received_final_op = 1; + call->buffered_metadata[1] = + op->data.recv_status_on_client.trailing_metadata; + call->final_op.client.status = op->data.recv_status_on_client.status; + call->final_op.client.status_details = op->data.recv_status_on_client.status_details; - req->data.recv_status_details.details_capacity = + call->final_op.client.status_details_capacity = op->data.recv_status_on_client.status_details_capacity; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; - } - req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; - req->data.recv_metadata = - op->data.recv_status_on_client.trailing_metadata; - req->data.recv_metadata->count = 0; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; - } - req->op = GRPC_IOREQ_RECV_CLOSE; - finish_func = finish_batch_with_close; + bctl->recv_final_op = 1; + stream_op.recv_trailing_metadata = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->is_client) { + error = GRPC_CALL_ERROR_NOT_ON_CLIENT; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_STATUS; - req->flags = op->flags; - req->data.recv_status.set_value = set_cancelled_value; - req->data.recv_status.user_data = - op->data.recv_close_on_server.cancelled; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->received_final_op) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_CLOSE; - finish_func = finish_batch_with_close; + call->received_final_op = 1; + call->final_op.server.cancelled = + op->data.recv_close_on_server.cancelled; + bctl->recv_final_op = 1; + stream_op.recv_trailing_metadata = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; break; } } GRPC_CALL_INTERNAL_REF(call, "completion"); - grpc_cq_begin_op(call->cq); + if (!is_notify_tag_closure) { + grpc_cq_begin_op(call->cq, notify_tag); + } + gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); + + stream_op.context = call->context; + grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); + stream_op.on_complete = &bctl->finish_batch; + gpr_mu_unlock(&call->mu); + + execute_op(exec_ctx, call, &stream_op); - error = grpc_call_start_ioreq_and_call_back(&exec_ctx, call, reqs, out, - finish_func, tag); done: - grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_start_batch", 0); return error; + +done_with_error: + /* reverse any mutations that occured */ + if (bctl->send_initial_metadata) { + call->sent_initial_metadata = 0; + grpc_metadata_batch_clear(&call->metadata_batch[0][0]); + } + if (bctl->send_message) { + call->sending_message = 0; + grpc_byte_stream_destroy(&call->sending_stream.base); + } + if (bctl->send_final_op) { + call->sent_final_op = 0; + grpc_metadata_batch_clear(&call->metadata_batch[0][1]); + } + if (bctl->recv_initial_metadata) { + call->received_initial_metadata = 0; + } + if (bctl->recv_message) { + call->receiving_message = 0; + } + if (bctl->recv_final_op) { + call->received_final_op = 0; + } + gpr_mu_unlock(&call->mu); + goto done; +} + +grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, + size_t nops, void *tag, void *reserved) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_call_error err; + + GRPC_API_TRACE( + "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", + 5, (call, ops, (unsigned long)nops, tag, reserved)); + + if (reserved != NULL) { + err = GRPC_CALL_ERROR; + } else { + err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0); + } + + grpc_exec_ctx_finish(&exec_ctx); + return err; +} + +grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx, + grpc_call *call, + const grpc_op *ops, + size_t nops, + grpc_closure *closure) { + return call_start_batch(exec_ctx, call, ops, nops, closure, 1); } void grpc_call_context_set(grpc_call *call, grpc_context_index elem, |