diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/byte_buffer_queue.c | 97 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_queue.h | 62 | ||||
-rw-r--r-- | src/core/surface/call.c | 1884 | ||||
-rw-r--r-- | src/core/surface/call.h | 67 | ||||
-rw-r--r-- | src/core/surface/call_log_batch.c | 3 | ||||
-rw-r--r-- | src/core/surface/call_test_only.h | 1 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 82 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/surface/init.c | 5 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 74 | ||||
-rw-r--r-- | src/core/surface/server.c | 230 |
11 files changed, 907 insertions, 1601 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c deleted file mode 100644 index e47dc4f4ce..0000000000 --- a/src/core/surface/byte_buffer_queue.c +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/surface/byte_buffer_queue.h" -#include <grpc/support/alloc.h> -#include <grpc/support/useful.h> - -static void bba_destroy(grpc_bbq_array *array, size_t start_pos) { - size_t i; - for (i = start_pos; i < array->count; i++) { - grpc_byte_buffer_destroy(array->data[i]); - } - gpr_free(array->data); -} - -/* Append an operation to an array, expanding as needed */ -static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { - if (a->count == a->capacity) { - a->capacity = GPR_MAX(a->capacity * 2, 8); - a->data = gpr_realloc(a->data, sizeof(grpc_byte_buffer *) * a->capacity); - } - a->data[a->count++] = buffer; -} - -void grpc_bbq_destroy(grpc_byte_buffer_queue *q) { - bba_destroy(&q->filling, 0); - bba_destroy(&q->draining, q->drain_pos); -} - -int grpc_bbq_empty(grpc_byte_buffer_queue *q) { - return (q->drain_pos == q->draining.count && q->filling.count == 0); -} - -void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { - q->bytes += grpc_byte_buffer_length(buffer); - bba_push(&q->filling, buffer); -} - -void grpc_bbq_flush(grpc_byte_buffer_queue *q) { - grpc_byte_buffer *bb; - while ((bb = grpc_bbq_pop(q))) { - grpc_byte_buffer_destroy(bb); - } -} - -size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; } - -grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { - grpc_bbq_array temp_array; - grpc_byte_buffer *out; - - if (q->drain_pos == q->draining.count) { - if (q->filling.count == 0) { - return NULL; - } - q->draining.count = 0; - q->drain_pos = 0; - /* swap arrays */ - temp_array = q->filling; - q->filling = q->draining; - q->draining = temp_array; - } - - out = q->draining.data[q->drain_pos++]; - q->bytes -= grpc_byte_buffer_length(out); - return out; -} diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h deleted file mode 100644 index 2c3b22d24e..0000000000 --- a/src/core/surface/byte_buffer_queue.h +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H -#define GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H - -#include <grpc/byte_buffer.h> - -/* TODO(ctiller): inline an element or two into this struct to avoid per-call - allocations */ -typedef struct { - grpc_byte_buffer **data; - size_t count; - size_t capacity; -} grpc_bbq_array; - -/* should be initialized by zeroing memory */ -typedef struct { - size_t drain_pos; - grpc_bbq_array filling; - grpc_bbq_array draining; - size_t bytes; -} grpc_byte_buffer_queue; - -void grpc_bbq_destroy(grpc_byte_buffer_queue *q); -grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); -void grpc_bbq_flush(grpc_byte_buffer_queue *q); -int grpc_bbq_empty(grpc_byte_buffer_queue *q); -void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); -size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q); - -#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 81ff215c0c..056d49064e 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -31,6 +31,7 @@ * */ #include <assert.h> +#include <limits.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -46,12 +47,11 @@ #include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" -#include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" -/** The maximum number of completions possible. +/** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch api: - initial metadata send @@ -60,7 +60,7 @@ - initial metadata recv - message recv - status/close recv (depending on client/server) */ -#define MAX_CONCURRENT_COMPLETIONS 6 +#define MAX_CONCURRENT_BATCHES 6 typedef struct { grpc_ioreq_completion_func on_complete; @@ -68,24 +68,7 @@ typedef struct { int success; } completed_request; -/* See request_set in grpc_call below for a description */ -#define REQSET_EMPTY 'X' -#define REQSET_DONE 'Y' - -#define MAX_SEND_INITIAL_METADATA_COUNT 3 - -typedef struct { - /* Overall status of the operation: starts OK, may degrade to - non-OK */ - gpr_uint8 success; - /* a bit mask of which request ops are needed (1u << opid) */ - gpr_uint16 need_mask; - /* a bit mask of which request ops are now completed */ - gpr_uint16 complete_mask; - /* Completion function to call at the end of the operation */ - grpc_ioreq_completion_func on_complete; - void *user_data; -} reqinfo_master; +#define MAX_SEND_EXTRA_METADATA_COUNT 3 /* Status data for a request can come from several sources; this enumerates them all, and acts as a priority sorting for which @@ -130,6 +113,23 @@ typedef enum { WRITE_STATE_WRITE_CLOSED } write_state; +typedef struct batch_control { + grpc_call *call; + grpc_cq_completion cq_completion; + grpc_closure finish_batch; + void *notify_tag; + gpr_refcount steps_to_complete; + + gpr_uint8 send_initial_metadata; + gpr_uint8 send_message; + gpr_uint8 send_final_op; + gpr_uint8 recv_initial_metadata; + gpr_uint8 recv_message; + gpr_uint8 recv_final_op; + gpr_uint8 is_notify_tag_closure; + gpr_uint8 success; +} batch_control; + struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; @@ -138,98 +138,39 @@ struct grpc_call { grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; - gpr_mu completion_mu; - /* how far through the stream have we read? */ - read_state read_state; - /* how far through the stream have we written? */ - write_state write_state; /* client or server call */ gpr_uint8 is_client; /* is the alarm set */ gpr_uint8 have_alarm; - /* are we currently performing a send operation */ - gpr_uint8 sending; - /* are we currently performing a recv operation */ - gpr_uint8 receiving; - /* are we currently completing requests */ - gpr_uint8 completing; /** has grpc_call_destroy been called */ gpr_uint8 destroy_called; - /* pairs with completed_requests */ - gpr_uint8 num_completed_requests; - /* are we currently reading a message? */ - gpr_uint8 reading_message; - /* have we bound a pollset yet? */ - gpr_uint8 bound_pollset; - /* is an error status set */ - gpr_uint8 error_status_set; - /** bitmask of allocated completion events in completions */ - gpr_uint8 allocated_completions; /** flag indicating that cancellation is inherited */ gpr_uint8 cancellation_is_inherited; + /** bitmask of live batches */ + gpr_uint8 used_batches; + /** which ops are in-flight */ + gpr_uint8 sent_initial_metadata; + gpr_uint8 sending_message; + gpr_uint8 sent_final_op; + gpr_uint8 received_initial_metadata; + gpr_uint8 receiving_message; + gpr_uint8 received_final_op; + + batch_control active_batches[MAX_CONCURRENT_BATCHES]; + + /* first idx: is_receiving, second idx: is_trailing */ + grpc_metadata_batch metadata_batch[2][2]; - /* flags with bits corresponding to write states allowing us to determine - what was sent */ - gpr_uint16 last_send_contains; - /* cancel with this status on the next outgoing transport op */ - grpc_status_code cancel_with_status; - - /* Active ioreqs. - request_set and request_data contain one element per active ioreq - operation. - - request_set[op] is an integer specifying a set of operations to which - the request belongs: - - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending - completion, and the integer represents to which group of operations - the ioreq belongs. Each group is represented by one master, and the - integer in request_set is an index into masters to find the master - data. - - if it is REQSET_EMPTY, the ioreq op is inactive and available to be - started - - finally, if request_set[op] is REQSET_DONE, then the operation is - complete and unavailable to be started again - - request_data[op] is the request data as supplied by the initiator of - a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT. - The set fields are as per the request type specified by op. - - Finally, one element of masters is set per active _set_ of ioreq - operations. It describes work left outstanding, result status, and - what work to perform upon operation completion. As one ioreq of each - op type can be active at once, by convention we choose the first element - of the group to be the master -- ie the master of in-progress operation - op is masters[request_set[op]]. This allows constant time allocation - and a strong upper bound of a count of masters to be calculated. */ - gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; - grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; - gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT]; - reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; - - /* Dynamic array of ioreq's that have completed: the count of - elements is queued in num_completed_requests. - This list is built up under lock(), and flushed entirely during - unlock(). - We know the upper bound of the number of elements as we can only - have one ioreq of each type active at once. */ - completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; - /* Incoming buffer of messages */ - grpc_byte_buffer_queue incoming_queue; /* Buffered read metadata waiting to be returned to the application. Element 0 is initial metadata, element 1 is trailing metadata. */ - grpc_metadata_array buffered_metadata[2]; - /* All metadata received - unreffed at once at the end of the call */ - grpc_mdelem **owned_metadata; - size_t owned_metadata_count; - size_t owned_metadata_capacity; + grpc_metadata_array *buffered_metadata[2]; /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; - /* Supported encodings (compression algorithms), a bitset */ gpr_uint32 encodings_accepted_by_peer; @@ -239,35 +180,36 @@ struct grpc_call { /* Deadline alarm - if have_alarm is non-zero */ grpc_timer alarm; - /* Call refcount - to keep the call alive during asynchronous operations */ - gpr_refcount internal_refcount; - - grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT]; - grpc_linked_mdelem status_link; - grpc_linked_mdelem details_link; - size_t send_initial_metadata_count; + /* for the client, extra metadata is initial metadata; for the + server, it's trailing metadata */ + grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; + int send_extra_metadata_count; gpr_timespec send_deadline; - grpc_stream_op_buffer send_ops; - grpc_stream_op_buffer recv_ops; - grpc_stream_state recv_state; - - gpr_slice_buffer incoming_message; - gpr_uint32 incoming_message_length; - gpr_uint32 incoming_message_flags; - grpc_closure destroy_closure; - grpc_closure on_done_recv; - grpc_closure on_done_send; - grpc_closure on_done_bind; - - /** completion events - for completion queue use */ - grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; - /** siblings: children of the same parent form a list, and this list is protected under parent->mu */ grpc_call *sibling_next; grpc_call *sibling_prev; + + grpc_slice_buffer_stream sending_stream; + grpc_byte_stream *receiving_stream; + grpc_byte_buffer **receiving_buffer; + gpr_slice receiving_slice; + grpc_closure receiving_slice_ready; + grpc_closure receiving_stream_ready; + gpr_uint32 test_only_last_message_flags; + + union { + struct { + grpc_status_code *status; + char **status_details; + size_t *status_details_capacity; + } client; + struct { + int *cancelled; + } server; + } final_op; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -279,20 +221,15 @@ struct grpc_call { static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, gpr_timespec deadline); -static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *call, int success); -static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *call, int success); -static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op); static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op); -static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *metadata); -static void finish_read_ops(grpc_call *call); -static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, +static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, const char *description); -static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, int success); - -static void lock(grpc_call *call); -static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call); +static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, + int success); +static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + int success); grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, @@ -301,9 +238,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, grpc_mdelem **add_initial_metadata, size_t add_initial_metadata_count, gpr_timespec send_deadline) { - size_t i; - grpc_transport_stream_op initial_op; - grpc_transport_stream_op *initial_op_ptr = NULL; + size_t i, j; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call *call; @@ -311,51 +246,37 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); - gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; - if (cq != NULL) { - GRPC_CQ_INTERNAL_REF(cq, "bind"); - } call->parent = parent_call; call->is_client = server_transport_data == NULL; - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - call->request_set[i] = REQSET_EMPTY; - } if (call->is_client) { - call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; - call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; + GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); + for (i = 0; i < add_initial_metadata_count; i++) { + call->send_extra_metadata[i].md = add_initial_metadata[i]; + } + call->send_extra_metadata_count = (int)add_initial_metadata_count; + } else { + GPR_ASSERT(add_initial_metadata_count == 0); + call->send_extra_metadata_count = 0; } - GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); - for (i = 0; i < add_initial_metadata_count; i++) { - call->send_initial_metadata[i].md = add_initial_metadata[i]; + for (i = 0; i < 2; i++) { + for (j = 0; j < 2; j++) { + call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + } } - call->send_initial_metadata_count = add_initial_metadata_count; call->send_deadline = send_deadline; GRPC_CHANNEL_INTERNAL_REF(channel, "call"); call->metadata_context = grpc_channel_get_metadata_context(channel); - grpc_sopb_init(&call->send_ops); - grpc_sopb_init(&call->recv_ops); - gpr_slice_buffer_init(&call->incoming_message); - grpc_closure_init(&call->on_done_recv, call_on_done_recv, call); - grpc_closure_init(&call->on_done_send, call_on_done_send, call); - grpc_closure_init(&call->on_done_bind, finished_loose_op, call); - /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */ - gpr_ref_init(&call->internal_refcount, 2); - /* server hack: start reads immediately so we can get initial metadata. - TODO(ctiller): figure out a cleaner solution */ - if (!call->is_client) { - memset(&initial_op, 0, sizeof(initial_op)); - initial_op.recv_ops = &call->recv_ops; - initial_op.recv_state = &call->recv_state; - initial_op.on_done_recv = &call->on_done_recv; - initial_op.context = call->context; - call->receiving = 1; - GRPC_CALL_INTERNAL_REF(call, "receiving"); - initial_op_ptr = &initial_op; + /* initial refcount dropped by grpc_call_destroy */ + grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, + call->context, server_transport_data, + CALL_STACK_FROM_CALL(call)); + if (cq != NULL) { + GRPC_CQ_INTERNAL_REF(cq, "bind"); + grpc_call_stack_set_pollset(&exec_ctx, CALL_STACK_FROM_CALL(call), + grpc_cq_pollset(cq)); } - grpc_call_stack_init(&exec_ctx, channel_stack, server_transport_data, - initial_op_ptr, CALL_STACK_FROM_CALL(call)); if (parent_call != NULL) { GRPC_CALL_INTERNAL_REF(parent_call, "child"); GPR_ASSERT(call->is_client); @@ -408,90 +329,62 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_completion_queue *cq) { - lock(call); + GPR_ASSERT(cq); call->cq = cq; - if (cq) { - GRPC_CQ_INTERNAL_REF(cq, "bind"); - } - unlock(exec_ctx, call); + GRPC_CQ_INTERNAL_REF(cq, "bind"); + grpc_call_stack_set_pollset(exec_ctx, CALL_STACK_FROM_CALL(call), + grpc_cq_pollset(cq)); } grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } -static grpc_cq_completion *allocate_completion(grpc_call *call) { - gpr_uint8 i; - gpr_mu_lock(&call->completion_mu); - for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { - if (call->allocated_completions & (1u << i)) { - continue; - } - /* NB: the following integer arithmetic operation needs to be in its - * expanded form due to the "integral promotion" performed (see section - * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type - * is then required to avoid the compiler warning */ - call->allocated_completions = - (gpr_uint8)(call->allocated_completions | (1u << i)); - gpr_mu_unlock(&call->completion_mu); - return &call->completions[i]; - } - GPR_UNREACHABLE_CODE(return NULL); - return NULL; +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_call_internal_ref(grpc_call *c, const char *reason) { + grpc_call_stack_ref(CALL_STACK_FROM_CALL(c), reason); } - -static void done_completion(grpc_exec_ctx *exec_ctx, void *call, - grpc_cq_completion *completion) { - grpc_call *c = call; - gpr_mu_lock(&c->completion_mu); - c->allocated_completions &= - (gpr_uint8) ~(1u << (completion - c->completions)); - gpr_mu_unlock(&c->completion_mu); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, c, "completion"); +void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c, + const char *reason) { + grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c), reason); } - -#ifdef GRPC_CALL_REF_COUNT_DEBUG -void grpc_call_internal_ref(grpc_call *c, const char *reason) { - gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, - c->internal_refcount.count, c->internal_refcount.count + 1, reason); #else void grpc_call_internal_ref(grpc_call *c) { -#endif - gpr_ref(&c->internal_refcount); + grpc_call_stack_ref(CALL_STACK_FROM_CALL(c)); } +void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) { + grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c)); +} +#endif -static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) { +static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) { size_t i; + int ii; grpc_call *c = call; GPR_TIMER_BEGIN("destroy_call", 0); + for (i = 0; i < 2; i++) { + grpc_metadata_batch_destroy( + &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); + } + if (c->receiving_stream != NULL) { + grpc_byte_stream_destroy(c->receiving_stream); + } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); gpr_mu_destroy(&c->mu); - gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { GRPC_MDSTR_UNREF(c->status[i].details); } } - for (i = 0; i < c->owned_metadata_count; i++) { - GRPC_MDELEM_UNREF(c->owned_metadata[i]); - } - gpr_free(c->owned_metadata); - for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { - gpr_free(c->buffered_metadata[i].metadata); - } - for (i = 0; i < c->send_initial_metadata_count; i++) { - GRPC_MDELEM_UNREF(c->send_initial_metadata[i].md); + for (ii = 0; ii < c->send_extra_metadata_count; ii++) { + GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md); } for (i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (c->context[i].destroy) { c->context[i].destroy(c->context[i].value); } } - grpc_sopb_destroy(&c->send_ops); - grpc_sopb_destroy(&c->recv_ops); - grpc_bbq_destroy(&c->incoming_queue); - gpr_slice_buffer_destroy(&c->incoming_message); if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } @@ -499,30 +392,14 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) { GPR_TIMER_END("destroy_call", 0); } -#ifdef GRPC_CALL_REF_COUNT_DEBUG -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c, - const char *reason) { - gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c, - c->internal_refcount.count, c->internal_refcount.count - 1, reason); -#else -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) { -#endif - if (gpr_unref(&c->internal_refcount)) { - destroy_call(exec_ctx, c); - } -} - static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { if (call->status[source].is_set) return; call->status[source].is_set = 1; call->status[source].code = (grpc_status_code)status; - call->error_status_set = status != GRPC_STATUS_OK; - if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) { - grpc_bbq_flush(&call->incoming_queue); - } + /* TODO(ctiller): what to do about the flush that was previously here */ } static void set_compression_algorithm(grpc_call *call, @@ -539,6 +416,14 @@ grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( return algorithm; } +gpr_uint32 grpc_call_test_only_get_message_flags(grpc_call *call) { + gpr_uint32 flags; + gpr_mu_lock(&call->mu); + flags = call->test_only_last_message_flags; + gpr_mu_unlock(&call->mu); + return flags; +} + static void destroy_encodings_accepted_by_peer(void *p) { return; } static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { @@ -596,14 +481,6 @@ gpr_uint32 grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { return encodings_accepted_by_peer; } -gpr_uint32 grpc_call_test_only_get_message_flags(grpc_call *call) { - gpr_uint32 flags; - gpr_mu_lock(&call->mu); - flags = call->incoming_message_flags; - gpr_mu_unlock(&call->mu); - return flags; -} - static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { @@ -612,149 +489,39 @@ static void set_status_details(grpc_call *call, status_source source, call->status[source].details = status; } -static int is_op_live(grpc_call *call, grpc_ioreq_op op) { - gpr_uint8 set = call->request_set[op]; - reqinfo_master *master; - if (set >= GRPC_IOREQ_OP_COUNT) return 0; - master = &call->masters[set]; - return (master->complete_mask & (1u << op)) == 0; -} - -static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } - -static int need_more_data(grpc_call *call) { - if (call->read_state == READ_STATE_STREAM_CLOSED) return 0; - /* TODO(ctiller): this needs some serious cleanup */ - return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || - (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && - grpc_bbq_empty(&call->incoming_queue)) || - is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || - is_op_live(call, GRPC_IOREQ_RECV_STATUS) || - is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || - (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && - grpc_bbq_empty(&call->incoming_queue)) || - (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called; -} - -static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) { - grpc_transport_stream_op op; - completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; - int completing_requests = 0; - int start_op = 0; - int i; - const size_t MAX_RECV_PEEK_AHEAD = 65536; - size_t buffered_bytes; - - GPR_TIMER_BEGIN("unlock", 0); - - memset(&op, 0, sizeof(op)); - - op.cancel_with_status = call->cancel_with_status; - start_op = op.cancel_with_status != GRPC_STATUS_OK; - call->cancel_with_status = GRPC_STATUS_OK; /* reset */ - - if (!call->receiving && need_more_data(call)) { - if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { - op.max_recv_bytes = call->incoming_message_length - - call->incoming_message.length + MAX_RECV_PEEK_AHEAD; - } else { - buffered_bytes = grpc_bbq_bytes(&call->incoming_queue); - if (buffered_bytes > MAX_RECV_PEEK_AHEAD) { - op.max_recv_bytes = 0; - } else { - op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; - } - } - /* TODO(ctiller): 1024 is basically to cover a bug - I don't understand yet */ - if (op.max_recv_bytes > 1024) { - op.recv_ops = &call->recv_ops; - op.recv_state = &call->recv_state; - op.on_done_recv = &call->on_done_recv; - call->receiving = 1; - GRPC_CALL_INTERNAL_REF(call, "receiving"); - start_op = 1; - } - } - - if (!call->sending) { - if (fill_send_ops(call, &op)) { - call->sending = 1; - GRPC_CALL_INTERNAL_REF(call, "sending"); - start_op = 1; - } - } - - if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) { - call->bound_pollset = 1; - op.bind_pollset = grpc_cq_pollset(call->cq); - start_op = 1; - } - - if (!call->completing && call->num_completed_requests != 0) { - completing_requests = call->num_completed_requests; - memcpy(completed_requests, call->completed_requests, - sizeof(completed_requests)); - call->num_completed_requests = 0; - call->completing = 1; - GRPC_CALL_INTERNAL_REF(call, "completing"); - } - - gpr_mu_unlock(&call->mu); - - if (start_op) { - execute_op(exec_ctx, call, &op); - } - - if (completing_requests > 0) { - for (i = 0; i < completing_requests; i++) { - completed_requests[i].on_complete(exec_ctx, call, - completed_requests[i].success, - completed_requests[i].user_data); - } - lock(call); - call->completing = 0; - unlock(exec_ctx, call); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing"); - } - - GPR_TIMER_END("unlock", 0); -} - -static void get_final_status(grpc_call *call, grpc_ioreq_data out) { +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].is_set) { - out.recv_status.set_value(call->status[i].code, - out.recv_status.user_data); + set_value(call->status[i].code, set_value_user_data); return; } } if (call->is_client) { - out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data); + set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); } else { - out.recv_status.set_value(GRPC_STATUS_OK, out.recv_status.user_data); + set_value(GRPC_STATUS_OK, set_value_user_data); } } -static void get_final_details(grpc_call *call, grpc_ioreq_data out) { +static void get_final_details(grpc_call *call, char **out_details, + size_t *out_details_capacity) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].is_set) { if (call->status[i].details) { gpr_slice details = call->status[i].details->slice; size_t len = GPR_SLICE_LENGTH(details); - if (len + 1 > *out.recv_status_details.details_capacity) { - *out.recv_status_details.details_capacity = GPR_MAX( - len + 1, *out.recv_status_details.details_capacity * 3 / 2); - *out.recv_status_details.details = - gpr_realloc(*out.recv_status_details.details, - *out.recv_status_details.details_capacity); + if (len + 1 > *out_details_capacity) { + *out_details_capacity = + GPR_MAX(len + 1, *out_details_capacity * 3 / 2); + *out_details = gpr_realloc(*out_details, *out_details_capacity); } - memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details), - len); - (*out.recv_status_details.details)[len] = 0; + memcpy(*out_details, GPR_SLICE_START_PTR(details), len); + (*out_details)[len] = 0; } else { goto no_details; } @@ -763,328 +530,41 @@ static void get_final_details(grpc_call *call, grpc_ioreq_data out) { } no_details: - if (0 == *out.recv_status_details.details_capacity) { - *out.recv_status_details.details_capacity = 8; - *out.recv_status_details.details = - gpr_malloc(*out.recv_status_details.details_capacity); - } - **out.recv_status_details.details = 0; -} - -static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, - int success) { - completed_request *cr; - gpr_uint8 master_set = call->request_set[op]; - reqinfo_master *master; - size_t i; - /* ioreq is live: we need to do something */ - master = &call->masters[master_set]; - /* NB: the following integer arithmetic operation needs to be in its - * expanded form due to the "integral promotion" performed (see section - * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type - * is then required to avoid the compiler warning */ - master->complete_mask = (gpr_uint16)(master->complete_mask | (1u << op)); - if (!success) { - master->success = 0; - } - if (master->complete_mask == master->need_mask) { - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (call->request_set[i] != master_set) { - continue; - } - call->request_set[i] = REQSET_DONE; - switch ((grpc_ioreq_op)i) { - case GRPC_IOREQ_RECV_MESSAGE: - case GRPC_IOREQ_SEND_MESSAGE: - call->request_set[i] = REQSET_EMPTY; - if (!master->success) { - call->write_state = WRITE_STATE_WRITE_CLOSED; - } - break; - case GRPC_IOREQ_SEND_STATUS: - if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != - NULL) { - GRPC_MDSTR_UNREF( - call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); - call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = - NULL; - } - break; - case GRPC_IOREQ_RECV_CLOSE: - case GRPC_IOREQ_SEND_INITIAL_METADATA: - case GRPC_IOREQ_SEND_TRAILING_METADATA: - case GRPC_IOREQ_SEND_CLOSE: - break; - case GRPC_IOREQ_RECV_STATUS: - get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]); - break; - case GRPC_IOREQ_RECV_STATUS_DETAILS: - get_final_details(call, - call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]); - break; - case GRPC_IOREQ_RECV_INITIAL_METADATA: - GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0], - *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA] - .recv_metadata); - break; - case GRPC_IOREQ_RECV_TRAILING_METADATA: - GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1], - *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA] - .recv_metadata); - break; - case GRPC_IOREQ_OP_COUNT: - abort(); - break; - } - } - cr = &call->completed_requests[call->num_completed_requests++]; - cr->success = master->success; - cr->on_complete = master->on_complete; - cr->user_data = master->user_data; - } -} - -static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { - if (is_op_live(call, op)) { - finish_live_ioreq_op(call, op, success); + if (0 == *out_details_capacity) { + *out_details_capacity = 8; + *out_details = gpr_malloc(*out_details_capacity); } + **out_details = 0; } -static void early_out_write_ops(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - /* fallthrough */ - case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); - /* fallthrough */ - case WRITE_STATE_INITIAL: - /* do nothing */ - break; - } +static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { + return (grpc_linked_mdelem *)&md->internal_data; } -static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *pc, int success) { - grpc_call *call = pc; - GPR_TIMER_BEGIN("call_on_done_send", 0); - lock(call); - if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); - call->write_state = WRITE_STATE_STARTED; - } - if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success); - } - if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - call->write_state = WRITE_STATE_WRITE_CLOSED; - } - if (!success) { - call->write_state = WRITE_STATE_WRITE_CLOSED; - early_out_write_ops(call); - } - call->send_ops.nops = 0; - call->last_send_contains = 0; - call->sending = 0; - unlock(exec_ctx, call); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "sending"); - GPR_TIMER_END("call_on_done_send", 0); -} - -static void finish_message(grpc_call *call) { - GPR_TIMER_BEGIN("finish_message", 0); - if (call->error_status_set == 0) { - /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer; - /* some aliases for readability */ - gpr_slice *slices = call->incoming_message.slices; - const size_t nslices = call->incoming_message.count; - - if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm > GRPC_COMPRESS_NONE)) { - byte_buffer = grpc_raw_compressed_byte_buffer_create( - slices, nslices, call->compression_algorithm); +static int prepare_application_metadata(grpc_call *call, int count, + grpc_metadata *metadata, + int is_trailing, + int prepend_extra_metadata) { + int i; + grpc_metadata_batch *batch = + &call->metadata_batch[0 /* is_receiving */][is_trailing]; + if (prepend_extra_metadata) { + if (call->send_extra_metadata_count == 0) { + prepend_extra_metadata = 0; } else { - byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); - } - grpc_bbq_push(&call->incoming_queue, byte_buffer); - } - gpr_slice_buffer_reset_and_unref(&call->incoming_message); - GPR_ASSERT(call->incoming_message.count == 0); - call->reading_message = 0; - GPR_TIMER_END("finish_message", 0); -} - -static int begin_message(grpc_call *call, grpc_begin_message msg) { - /* can't begin a message when we're still reading a message */ - if (call->reading_message) { - char *message = NULL; - gpr_asprintf( - &message, "Message terminated early; read %d bytes, expected %d", - (int)call->incoming_message.length, (int)call->incoming_message_length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); - gpr_free(message); - return 0; - } - /* sanity check: if message flags indicate a compressed message, the - * compression level should already be present in the call, as parsed off its - * corresponding metadata. */ - if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm == GRPC_COMPRESS_NONE)) { - char *message = NULL; - char *alg_name; - if (!grpc_compression_algorithm_name(call->compression_algorithm, - &alg_name)) { - /* This shouldn't happen, other than due to data corruption */ - alg_name = "<unknown>"; - } - gpr_asprintf(&message, - "Invalid compression algorithm (%s) for compressed message.", - alg_name); - cancel_with_status(call, GRPC_STATUS_INTERNAL, message); - gpr_free(message); - return 0; - } - /* stash away parameters, and prepare for incoming slices */ - if (msg.length > grpc_channel_get_max_message_length(call->channel)) { - char *message = NULL; - gpr_asprintf( - &message, - "Maximum message length of %d exceeded by a message of length %d", - grpc_channel_get_max_message_length(call->channel), msg.length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); - gpr_free(message); - return 0; - } else if (msg.length > 0) { - call->reading_message = 1; - call->incoming_message_length = msg.length; - call->incoming_message_flags = msg.flags; - return 1; - } else { - finish_message(call); - return 1; - } -} - -static int add_slice_to_message(grpc_call *call, gpr_slice slice) { - if (GPR_SLICE_LENGTH(slice) == 0) { - gpr_slice_unref(slice); - return 1; - } - /* we have to be reading a message to know what to do here */ - if (!call->reading_message) { - gpr_slice_unref(slice); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, - "Received payload data while not reading a message"); - return 0; - } - /* append the slice to the incoming buffer */ - gpr_slice_buffer_add(&call->incoming_message, slice); - if (call->incoming_message.length > call->incoming_message_length) { - /* if we got too many bytes, complain */ - char *message = NULL; - gpr_asprintf( - &message, "Receiving message overflow; read %d bytes, expected %d", - (int)call->incoming_message.length, (int)call->incoming_message_length); - cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); - gpr_free(message); - return 0; - } else if (call->incoming_message.length == call->incoming_message_length) { - finish_message(call); - return 1; - } else { - return 1; - } -} - -static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) { - grpc_call *call = pc; - grpc_call *child_call; - grpc_call *next_child_call; - size_t i; - GPR_TIMER_BEGIN("call_on_done_recv", 0); - lock(call); - call->receiving = 0; - if (success) { - for (i = 0; success && i < call->recv_ops.nops; i++) { - grpc_stream_op *op = &call->recv_ops.ops[i]; - switch (op->type) { - case GRPC_NO_OP: - break; - case GRPC_OP_METADATA: - GPR_TIMER_BEGIN("recv_metadata", 0); - recv_metadata(exec_ctx, call, &op->data.metadata); - GPR_TIMER_END("recv_metadata", 0); - break; - case GRPC_OP_BEGIN_MESSAGE: - GPR_TIMER_BEGIN("begin_message", 0); - success = begin_message(call, op->data.begin_message); - GPR_TIMER_END("begin_message", 0); - break; - case GRPC_OP_SLICE: - GPR_TIMER_BEGIN("add_slice_to_message", 0); - success = add_slice_to_message(call, op->data.slice); - GPR_TIMER_END("add_slice_to_message", 0); - break; + for (i = 0; i < call->send_extra_metadata_count; i++) { + GRPC_MDELEM_REF(call->send_extra_metadata[i].md); } - } - if (!success) { - grpc_stream_ops_unref_owned_objects(&call->recv_ops.ops[i], - call->recv_ops.nops - i); - } - if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { - GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); - call->read_state = READ_STATE_READ_CLOSED; - } - if (call->recv_state == GRPC_STREAM_CLOSED) { - GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); - call->read_state = READ_STATE_STREAM_CLOSED; - if (call->have_alarm) { - grpc_timer_cancel(exec_ctx, &call->alarm); + for (i = 1; i < call->send_extra_metadata_count; i++) { + call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; } - /* propagate cancellation to any interested children */ - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); + for (i = 0; i < call->send_extra_metadata_count - 1; i++) { + call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; } - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "closed"); } - finish_read_ops(call); - } else { - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0); } - call->recv_ops.nops = 0; - unlock(exec_ctx, call); - - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "receiving"); - GPR_TIMER_END("call_on_done_recv", 0); -} - -static int prepare_application_metadata(grpc_call *call, size_t count, - grpc_metadata *metadata) { - size_t i; for (i = 0; i < count; i++) { grpc_metadata *md = &metadata[i]; - grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; - grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, @@ -1099,243 +579,49 @@ static int prepare_application_metadata(grpc_call *call, size_t count, gpr_log(GPR_ERROR, "attempt to send invalid metadata value"); return 0; } - l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; - l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; } - return 1; -} - -static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, - grpc_metadata *metadata) { - grpc_mdelem_list out; - if (count == 0) { - out.head = out.tail = NULL; - return out; + for (i = 1; i < count; i++) { + linked_from_md(&metadata[i])->prev = linked_from_md(&metadata[i - 1]); } - out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); - out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); - return out; -} - -/* Copy the contents of a byte buffer into stream ops */ -static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, - grpc_stream_op_buffer *sopb) { - size_t i; - - switch (byte_buffer->type) { - case GRPC_BB_RAW: - for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) { - gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i]; - gpr_slice_ref(slice); - grpc_sopb_add_slice(sopb, slice); - } - break; + for (i = 0; i < count - 1; i++) { + linked_from_md(&metadata[i])->next = linked_from_md(&metadata[i + 1]); } -} - -static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { - grpc_ioreq_data data; - gpr_uint32 flags; - grpc_metadata_batch mdb; - size_t i; - GPR_ASSERT(op->send_ops == NULL); - - switch (call->write_state) { - case WRITE_STATE_INITIAL: - if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { - break; - } - data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - mdb.list = chain_metadata_from_app(call, data.send_metadata.count, - data.send_metadata.metadata); - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = call->send_deadline; - for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]); - } - grpc_sopb_add_metadata(&call->send_ops, mdb); - op->send_ops = &call->send_ops; - call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; - call->send_initial_metadata_count = 0; - /* fall through intended */ - case WRITE_STATE_STARTED: - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { - size_t length; - data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE]; - length = grpc_byte_buffer_length(data.send_message); - GPR_ASSERT(length <= GPR_UINT32_MAX); - grpc_sopb_add_begin_message(&call->send_ops, (gpr_uint32)length, flags); - copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); - op->send_ops = &call->send_ops; - call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; - } - if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - op->is_last_send = 1; - op->send_ops = &call->send_ops; - call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; - if (!call->is_client) { - /* send trailing metadata */ - data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - mdb.list = chain_metadata_from_app(call, data.send_metadata.count, - data.send_metadata.metadata); - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - /* send status */ - /* TODO(ctiller): cache common status values */ - data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - grpc_metadata_batch_add_tail( - &mdb, &call->status_link, - grpc_channel_get_reffed_status_elem(call->channel, - data.send_status.code)); - if (data.send_status.details) { - grpc_metadata_batch_add_tail( - &mdb, &call->details_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - GRPC_MDSTR_REF( - grpc_channel_get_message_string(call->channel)), - data.send_status.details)); - call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = - NULL; - } - grpc_sopb_add_metadata(&call->send_ops, mdb); - } - } + switch (prepend_extra_metadata * 2 + (count != 0)) { + case 0: + /* no prepend, no metadata => nothing to do */ + batch->list.head = batch->list.tail = NULL; break; - case WRITE_STATE_WRITE_CLOSED: + case 1: + /* metadata, but no prepend */ + batch->list.head = linked_from_md(&metadata[0]); + batch->list.tail = linked_from_md(&metadata[count - 1]); + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; break; - } - if (op->send_ops) { - op->on_done_send = &call->on_done_send; - } - return op->send_ops != NULL; -} - -static grpc_call_error start_ioreq_error(grpc_call *call, - gpr_uint32 mutated_ops, - grpc_call_error ret) { - size_t i; - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (mutated_ops & (1u << i)) { - call->request_set[i] = REQSET_EMPTY; - } - } - return ret; -} - -static void finish_read_ops(grpc_call *call) { - int empty; - - if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) { - empty = - (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message = - grpc_bbq_pop(&call->incoming_queue))); - if (!empty) { - finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1); - empty = grpc_bbq_empty(&call->incoming_queue); - } - } else { - empty = grpc_bbq_empty(&call->incoming_queue); - } - - switch (call->read_state) { - case READ_STATE_STREAM_CLOSED: - if (empty && !call->have_alarm) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1); - } - /* fallthrough */ - case READ_STATE_READ_CLOSED: - if (empty) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1); - } - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 1); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 1); - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 1); - /* fallthrough */ - case READ_STATE_GOT_INITIAL_METADATA: - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 1); - /* fallthrough */ - case READ_STATE_INITIAL: - /* do nothing */ + case 2: + /* prepend, but no md */ + batch->list.head = &call->send_extra_metadata[0]; + batch->list.tail = + &call->send_extra_metadata[call->send_extra_metadata_count - 1]; + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; break; + case 3: + /* prepend AND md */ + batch->list.head = &call->send_extra_metadata[0]; + call->send_extra_metadata[call->send_extra_metadata_count - 1].next = + linked_from_md(&metadata[0]); + linked_from_md(&metadata[0])->prev = + &call->send_extra_metadata[call->send_extra_metadata_count - 1]; + batch->list.tail = linked_from_md(&metadata[count - 1]); + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; + break; + default: + GPR_UNREACHABLE_CODE(return 0); } -} - -static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, - grpc_ioreq_completion_func completion, - void *user_data) { - size_t i; - gpr_uint16 have_ops = 0; - grpc_ioreq_op op; - reqinfo_master *master; - grpc_ioreq_data data; - gpr_uint8 set; - - if (nreqs == 0) { - return GRPC_CALL_OK; - } - - set = reqs[0].op; - - for (i = 0; i < nreqs; i++) { - op = reqs[i].op; - if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) { - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); - } else if (call->request_set[op] == REQSET_DONE) { - return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED); - } - data = reqs[i].data; - if (op == GRPC_IOREQ_SEND_INITIAL_METADATA || - op == GRPC_IOREQ_SEND_TRAILING_METADATA) { - if (!prepare_application_metadata(call, data.send_metadata.count, - data.send_metadata.metadata)) { - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_INVALID_METADATA); - } - } - if (op == GRPC_IOREQ_SEND_STATUS) { - set_status_code(call, STATUS_FROM_SERVER_STATUS, - (gpr_uint32)reqs[i].data.send_status.code); - if (reqs[i].data.send_status.details) { - set_status_details(call, STATUS_FROM_SERVER_STATUS, - GRPC_MDSTR_REF(reqs[i].data.send_status.details)); - } - } - /* NB: the following integer arithmetic operation needs to be in its - * expanded form due to the "integral promotion" performed (see section - * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type - * is then required to avoid the compiler warning */ - have_ops = (gpr_uint16)(have_ops | (1u << op)); - - call->request_data[op] = data; - call->request_flags[op] = reqs[i].flags; - call->request_set[op] = set; - } - - master = &call->masters[set]; - master->success = 1; - master->need_mask = have_ops; - master->complete_mask = 0; - master->on_complete = completion; - master->user_data = user_data; - finish_read_ops(call); - early_out_write_ops(call); - - return GRPC_CALL_OK; -} - -grpc_call_error grpc_call_start_ioreq_and_call_back( - grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) { - grpc_call_error err; - lock(call); - err = start_ioreq(call, reqs, nreqs, on_complete, user_data); - unlock(exec_ctx, call); - return err; + return 1; } void grpc_call_destroy(grpc_call *c) { @@ -1343,6 +629,7 @@ void grpc_call_destroy(grpc_call *c) { grpc_call *parent = c->parent; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_call_destroy", 0); GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); if (parent) { @@ -1359,17 +646,18 @@ void grpc_call_destroy(grpc_call *c) { GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); } - lock(c); + gpr_mu_lock(&c->mu); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; if (c->have_alarm) { grpc_timer_cancel(&exec_ctx, &c->alarm); } - cancel = c->read_state != READ_STATE_STREAM_CLOSED; - unlock(&exec_ctx, c); + cancel = !c->received_final_op; + gpr_mu_unlock(&c->mu); if (cancel) grpc_call_cancel(c, NULL); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_call_destroy", 0); } grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { @@ -1390,66 +678,67 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); - lock(c); - r = cancel_with_status(c, status, description); - unlock(&exec_ctx, c); + gpr_mu_lock(&c->mu); + r = cancel_with_status(&exec_ctx, c, status, description); + gpr_mu_unlock(&c->mu); grpc_exec_ctx_finish(&exec_ctx); return r; } -static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, +typedef struct cancel_closure { + grpc_closure closure; + grpc_call *call; + grpc_status_code status; +} cancel_closure; + +static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) { + cancel_closure *cc = ccp; + GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); + gpr_free(cc); +} + +static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) { + grpc_transport_stream_op op; + cancel_closure *cc = ccp; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = cc->status; + /* reuse closure to catch completion */ + grpc_closure_init(&cc->closure, done_cancel, cc); + op.on_complete = &cc->closure; + execute_op(exec_ctx, cc->call, &op); +} + +static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, const char *description) { grpc_mdstr *details = description ? grpc_mdstr_from_string(c->metadata_context, description) : NULL; + cancel_closure *cc = gpr_malloc(sizeof(*cc)); GPR_ASSERT(status != GRPC_STATUS_OK); set_status_code(c, STATUS_FROM_API_OVERRIDE, (gpr_uint32)status); set_status_details(c, STATUS_FROM_API_OVERRIDE, details); - c->cancel_with_status = status; + grpc_closure_init(&cc->closure, send_cancel, cc); + cc->call = c; + cc->status = status; + GRPC_CALL_INTERNAL_REF(c, "cancel"); + grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, 1); return GRPC_CALL_OK; } -static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, - int success_ignored) { - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "loose-op"); -} - -typedef struct { - grpc_call *call; - grpc_closure closure; -} finished_loose_op_allocated_args; - -static void finished_loose_op_allocated(grpc_exec_ctx *exec_ctx, void *alloc, - int success) { - finished_loose_op_allocated_args *args = alloc; - finished_loose_op(exec_ctx, args->call, success); - gpr_free(args); -} - static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op) { grpc_call_element *elem; - GPR_ASSERT(op->on_consumed == NULL); - if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) { - GRPC_CALL_INTERNAL_REF(call, "loose-op"); - if (op->bind_pollset) { - op->on_consumed = &call->on_done_bind; - } else { - finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); - args->call = call; - grpc_closure_init(&args->closure, finished_loose_op_allocated, args); - op->on_consumed = &args->closure; - } - } - + GPR_TIMER_BEGIN("execute_op", 0); elem = CALL_ELEM_FROM_CALL(call, 0); op->context = call->context; elem->filter->start_transport_stream_op(exec_ctx, elem, op); + GPR_TIMER_END("execute_op", 0); } char *grpc_call_get_peer(grpc_call *call) { @@ -1467,14 +756,13 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) { grpc_call *call = arg; - lock(call); + gpr_mu_lock(&call->mu); call->have_alarm = 0; if (success) { - cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, + cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED, "Deadline Exceeded"); } - finish_read_ops(call); - unlock(exec_ctx, call); + gpr_mu_unlock(&call->mu); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm"); } @@ -1540,76 +828,69 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { return algorithm; } -static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *md) { - grpc_linked_mdelem *l; +static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) { + if (elem->key == grpc_channel_get_status_string(call->channel)) { + GPR_TIMER_BEGIN("status", 0); + set_status_code(call, STATUS_FROM_WIRE, decode_status(elem)); + GPR_TIMER_END("status", 0); + return NULL; + } else if (elem->key == grpc_channel_get_message_string(call->channel)) { + GPR_TIMER_BEGIN("status-details", 0); + set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value)); + GPR_TIMER_END("status-details", 0); + return NULL; + } + return elem; +} + +static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem, + int is_trailing) { grpc_metadata_array *dest; grpc_metadata *mdusr; - int is_trailing; - - is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; - for (l = md->list.head; l != NULL; l = l->next) { - grpc_mdelem *mdel = l->md; - grpc_mdstr *key = mdel->key; - if (key == grpc_channel_get_status_string(call->channel)) { - GPR_TIMER_BEGIN("status", 0); - set_status_code(call, STATUS_FROM_WIRE, decode_status(mdel)); - GPR_TIMER_END("status", 0); - } else if (key == grpc_channel_get_message_string(call->channel)) { - GPR_TIMER_BEGIN("status-details", 0); - set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(mdel->value)); - GPR_TIMER_END("status-details", 0); - } else if (key == - grpc_channel_get_compression_algorithm_string(call->channel)) { - GPR_TIMER_BEGIN("compression_algorithm", 0); - set_compression_algorithm(call, decode_compression(mdel)); - GPR_TIMER_END("compression_algorithm", 0); - } else if (key == grpc_channel_get_encodings_accepted_by_peer_string( - call->channel)) { - GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); - set_encodings_accepted_by_peer(call, mdel); - GPR_TIMER_END("encodings_accepted_by_peer", 0); - } else { - GPR_TIMER_BEGIN("report_up", 0); - dest = &call->buffered_metadata[is_trailing]; - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(mdel->key); - mdusr->value = grpc_mdstr_as_c_string(mdel->value); - mdusr->value_length = GPR_SLICE_LENGTH(mdel->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = - GPR_MAX(call->owned_metadata_capacity + 8, - call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); - } - call->owned_metadata[call->owned_metadata_count++] = mdel; - l->md = NULL; - GPR_TIMER_END("report_up", 0); - } - } - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); - } - if (!is_trailing) { - call->read_state = READ_STATE_GOT_INITIAL_METADATA; - } + GPR_TIMER_BEGIN("publish_app_metadata", 0); + dest = call->buffered_metadata[is_trailing]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(elem->key); + mdusr->value = grpc_mdstr_as_c_string(elem->value); + mdusr->value_length = GPR_SLICE_LENGTH(elem->value->slice); + GPR_TIMER_END("publish_app_metadata", 0); + return elem; +} - for (l = md->list.head; l; l = l->next) { - if (l->md) GRPC_MDELEM_UNREF(l->md); +static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { + grpc_call *call = callp; + elem = recv_common_filter(call, elem); + if (elem == NULL) { + return NULL; + } else if (elem->key == + grpc_channel_get_compression_algorithm_string(call->channel)) { + GPR_TIMER_BEGIN("compression_algorithm", 0); + set_compression_algorithm(call, decode_compression(elem)); + GPR_TIMER_END("compression_algorithm", 0); + return NULL; + } else if (elem->key == grpc_channel_get_encodings_accepted_by_peer_string( + call->channel)) { + GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); + set_encodings_accepted_by_peer(call, elem); + GPR_TIMER_END("encodings_accepted_by_peer", 0); + return NULL; + } else { + return publish_app_metadata(call, elem, 0); } - for (l = md->garbage.head; l; l = l->next) { - GRPC_MDELEM_UNREF(l->md); +} + +static grpc_mdelem *recv_trailing_filter(void *callp, grpc_mdelem *elem) { + grpc_call *call = callp; + elem = recv_common_filter(call, elem); + if (elem == NULL) { + return NULL; + } else { + return publish_app_metadata(call, elem, 1); } } @@ -1626,19 +907,7 @@ static void set_status_value_directly(grpc_status_code status, void *dest) { } static void set_cancelled_value(grpc_status_code status, void *dest) { - *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); -} - -static void finish_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, int success, - void *tag) { - grpc_cq_end_op(exec_ctx, call->cq, tag, success, done_completion, call, - allocate_completion(call)); -} - -static void finish_batch_with_close(grpc_exec_ctx *exec_ctx, grpc_call *call, - int success, void *tag) { - grpc_cq_end_op(exec_ctx, call->cq, tag, 1, done_completion, call, - allocate_completion(call)); + *(int *)dest = (status != GRPC_STATUS_OK); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1649,258 +918,503 @@ static int are_write_flags_valid(gpr_uint32 flags) { return !(flags & invalid_positions); } -grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag, void *reserved) { - grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; - size_t in; - size_t out; +static batch_control *allocate_batch_control(grpc_call *call) { + size_t i; + for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { + if ((call->used_batches & (1 << i)) == 0) { + call->used_batches |= (gpr_uint8)(1 << i); + return &call->active_batches[i]; + } + } + GPR_UNREACHABLE_CODE(return NULL); +} + +static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_cq_completion *storage) { + batch_control *bctl = user_data; + grpc_call *call = bctl->call; + gpr_mu_lock(&call->mu); + call->used_batches = (gpr_uint8)( + call->used_batches & ~(gpr_uint8)(1 << (bctl - call->active_batches))); + gpr_mu_unlock(&call->mu); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); +} + +static void post_batch_completion(grpc_exec_ctx *exec_ctx, + batch_control *bctl) { + grpc_call *call = bctl->call; + if (bctl->is_notify_tag_closure) { + gpr_mu_lock(&call->mu); + bctl->call->used_batches = + (gpr_uint8)(bctl->call->used_batches & + ~(gpr_uint8)(1 << (bctl - bctl->call->active_batches))); + gpr_mu_unlock(&call->mu); + grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); + } else { + grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success, + finish_batch_completion, bctl, &bctl->cq_completion); + } +} + +static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, + batch_control *bctl) { + grpc_call *call = bctl->call; + for (;;) { + size_t remaining = call->receiving_stream->length - + (*call->receiving_buffer)->data.raw.slice_buffer.length; + if (remaining == 0) { + call->receiving_message = 0; + grpc_byte_stream_destroy(call->receiving_stream); + call->receiving_stream = NULL; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + return; + } + if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, + &call->receiving_slice, remaining, + &call->receiving_slice_ready)) { + gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + } else { + return; + } + } +} + +static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + int success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + GPR_ASSERT(success); + gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + + continue_receiving_slices(exec_ctx, bctl); +} + +static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + grpc_call *child_call; + grpc_call *next_child_call; + + gpr_mu_lock(&call->mu); + if (bctl->send_initial_metadata) { + grpc_metadata_batch_destroy( + &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); + } + if (bctl->send_message) { + call->sending_message = 0; + } + if (bctl->send_final_op) { + grpc_metadata_batch_destroy( + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); + } + if (bctl->recv_initial_metadata) { + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_initial_filter, call); + + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0 && + !call->is_client) { + GPR_TIMER_BEGIN("set_deadline_alarm", 0); + set_deadline_alarm(exec_ctx, call, md->deadline); + GPR_TIMER_END("set_deadline_alarm", 0); + } + } + if (bctl->recv_final_op) { + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_trailing_filter, call); + + if (call->have_alarm) { + grpc_timer_cancel(exec_ctx, &call->alarm); + } + /* propagate cancellation to any interested children */ + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call, NULL); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } + + if (call->is_client) { + get_final_status(call, set_status_value_directly, + call->final_op.client.status); + get_final_details(call, call->final_op.client.status_details, + call->final_op.client.status_details_capacity); + } else { + get_final_status(call, set_cancelled_value, + call->final_op.server.cancelled); + } + + success = 1; + } + bctl->success = success != 0; + gpr_mu_unlock(&call->mu); + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } +} + +static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + int success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + if (call->receiving_stream == NULL) { + *call->receiving_buffer = NULL; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else if (call->receiving_stream->length > + grpc_channel_get_max_message_length(call->channel)) { + cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, + "Max message size exceeded"); + grpc_byte_stream_destroy(call->receiving_stream); + call->receiving_stream = NULL; + *call->receiving_buffer = NULL; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else { + call->test_only_last_message_flags = call->receiving_stream->flags; + if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( + NULL, 0, call->compression_algorithm); + } else { + *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); + } + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, + bctl); + continue_receiving_slices(exec_ctx, bctl); + /* early out */ + return; + } +} + +static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, + grpc_call *call, const grpc_op *ops, + size_t nops, void *notify_tag, + int is_notify_tag_closure) { + grpc_transport_stream_op stream_op; + size_t i; const grpc_op *op; - grpc_ioreq *req; - void (*finish_func)(grpc_exec_ctx *, grpc_call *, int, void *) = finish_batch; - grpc_call_error error; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + batch_control *bctl; + int num_completion_callbacks_needed = 1; + grpc_call_error error = GRPC_CALL_OK; GPR_TIMER_BEGIN("grpc_call_start_batch", 0); - GRPC_API_TRACE( - "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", - 5, (call, ops, (unsigned long)nops, tag, reserved)); + GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); - if (reserved != NULL) { - error = GRPC_CALL_ERROR; - goto done; - } + memset(&stream_op, 0, sizeof(stream_op)); - GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); + /* TODO(ctiller): this feels like it could be made lock-free */ + gpr_mu_lock(&call->mu); + bctl = allocate_batch_control(call); + memset(bctl, 0, sizeof(*bctl)); + bctl->call = call; + bctl->notify_tag = notify_tag; + bctl->is_notify_tag_closure = (gpr_uint8)(is_notify_tag_closure != 0); if (nops == 0) { - grpc_cq_begin_op(call->cq); GRPC_CALL_INTERNAL_REF(call, "completion"); - grpc_cq_end_op(&exec_ctx, call->cq, tag, 1, done_completion, call, - allocate_completion(call)); - error = GRPC_CALL_OK; - goto done; + bctl->success = 1; + if (!is_notify_tag_closure) { + grpc_cq_begin_op(call->cq); + } + gpr_mu_unlock(&call->mu); + post_batch_completion(exec_ctx, bctl); + return GRPC_CALL_OK; } - /* rewrite batch ops into ioreq ops */ - for (in = 0, out = 0; in < nops; in++) { - op = &ops[in]; + /* rewrite batch ops into a transport op */ + for (i = 0; i < nops; i++) { + op = &ops[i]; if (op->reserved != NULL) { error = GRPC_CALL_ERROR; - goto done; + goto done_with_error; } switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; + } + if (call->sent_initial_metadata) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; + } + if (op->data.send_initial_metadata.count > INT_MAX) { + error = GRPC_CALL_ERROR_INVALID_METADATA; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + bctl->send_initial_metadata = 1; + call->sent_initial_metadata = 1; + if (!prepare_application_metadata( + call, (int)op->data.send_initial_metadata.count, + op->data.send_initial_metadata.metadata, 0, call->is_client)) { + error = GRPC_CALL_ERROR_INVALID_METADATA; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; - req->data.send_metadata.count = op->data.send_initial_metadata.count; - req->data.send_metadata.metadata = - op->data.send_initial_metadata.metadata; - req->flags = op->flags; + /* TODO(ctiller): just make these the same variable? */ + call->metadata_batch[0][0].deadline = call->send_deadline; + stream_op.send_initial_metadata = + &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } if (op->data.send_message == NULL) { error = GRPC_CALL_ERROR_INVALID_MESSAGE; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->sending_message) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_MESSAGE; - req->data.send_message = op->data.send_message; - req->flags = op->flags; + bctl->send_message = 1; + call->sending_message = 1; + grpc_slice_buffer_stream_init( + &call->sending_stream, + &op->data.send_message->data.raw.slice_buffer, op->flags); + stream_op.send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } if (!call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_SERVER; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->sent_final_op) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_CLOSE; - req->flags = op->flags; + bctl->send_final_op = 1; + call->sent_final_op = 1; + stream_op.send_trailing_metadata = + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } if (call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_CLIENT; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->sent_final_op) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; - req->flags = op->flags; - req->data.send_metadata.count = - op->data.send_status_from_server.trailing_metadata_count; - req->data.send_metadata.metadata = - op->data.send_status_from_server.trailing_metadata; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (op->data.send_status_from_server.trailing_metadata_count > + INT_MAX) { + error = GRPC_CALL_ERROR_INVALID_METADATA; + goto done_with_error; } - req->op = GRPC_IOREQ_SEND_STATUS; - req->data.send_status.code = op->data.send_status_from_server.status; - req->data.send_status.details = - op->data.send_status_from_server.status_details != NULL - ? grpc_mdstr_from_string( - call->metadata_context, - op->data.send_status_from_server.status_details) - : NULL; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + bctl->send_final_op = 1; + call->sent_final_op = 1; + call->send_extra_metadata_count = 1; + call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( + call->channel, op->data.send_status_from_server.status); + if (op->data.send_status_from_server.status_details != NULL) { + call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings( + call->metadata_context, + GRPC_MDSTR_REF(grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string( + call->metadata_context, + op->data.send_status_from_server.status_details)); + call->send_extra_metadata_count++; + set_status_details( + call, STATUS_FROM_API_OVERRIDE, + GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value)); } - req->op = GRPC_IOREQ_SEND_CLOSE; + set_status_code(call, STATUS_FROM_API_OVERRIDE, + (gpr_uint32)op->data.send_status_from_server.status); + if (!prepare_application_metadata( + call, + (int)op->data.send_status_from_server.trailing_metadata_count, + op->data.send_status_from_server.trailing_metadata, 1, 1)) { + error = GRPC_CALL_ERROR_INVALID_METADATA; + goto done_with_error; + } + stream_op.send_trailing_metadata = + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; - } - if (!call->is_client) { - error = GRPC_CALL_ERROR_NOT_ON_SERVER; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->received_initial_metadata) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; - req->data.recv_metadata = op->data.recv_initial_metadata; - req->data.recv_metadata->count = 0; - req->flags = op->flags; + call->received_initial_metadata = 1; + call->buffered_metadata[0] = op->data.recv_initial_metadata; + bctl->recv_initial_metadata = 1; + stream_op.recv_initial_metadata = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; break; case GRPC_OP_RECV_MESSAGE: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->receiving_message) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } - req->op = GRPC_IOREQ_RECV_MESSAGE; - req->data.recv_message = op->data.recv_message; - req->flags = op->flags; + call->receiving_message = 1; + bctl->recv_message = 1; + call->receiving_buffer = op->data.recv_message; + stream_op.recv_message = &call->receiving_stream; + grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, + bctl); + stream_op.recv_message_ready = &call->receiving_stream_ready; + num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } if (!call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_SERVER; - goto done; - } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_STATUS; - req->flags = op->flags; - req->data.recv_status.set_value = set_status_value_directly; - req->data.recv_status.user_data = op->data.recv_status_on_client.status; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->received_final_op) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_STATUS_DETAILS; - req->data.recv_status_details.details = + call->received_final_op = 1; + call->buffered_metadata[1] = + op->data.recv_status_on_client.trailing_metadata; + call->final_op.client.status = op->data.recv_status_on_client.status; + call->final_op.client.status_details = op->data.recv_status_on_client.status_details; - req->data.recv_status_details.details_capacity = + call->final_op.client.status_details_capacity = op->data.recv_status_on_client.status_details_capacity; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; - } - req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; - req->data.recv_metadata = - op->data.recv_status_on_client.trailing_metadata; - req->data.recv_metadata->count = 0; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; - } - req->op = GRPC_IOREQ_RECV_CLOSE; - finish_func = finish_batch_with_close; + bctl->recv_final_op = 1; + stream_op.recv_trailing_metadata = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; - goto done; + goto done_with_error; } - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->is_client) { + error = GRPC_CALL_ERROR_NOT_ON_CLIENT; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_STATUS; - req->flags = op->flags; - req->data.recv_status.set_value = set_cancelled_value; - req->data.recv_status.user_data = - op->data.recv_close_on_server.cancelled; - req = &reqs[out++]; - if (out > GRPC_IOREQ_OP_COUNT) { - error = GRPC_CALL_ERROR_BATCH_TOO_BIG; - goto done; + if (call->received_final_op) { + error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + goto done_with_error; } - req->op = GRPC_IOREQ_RECV_CLOSE; - finish_func = finish_batch_with_close; + call->received_final_op = 1; + call->final_op.server.cancelled = + op->data.recv_close_on_server.cancelled; + bctl->recv_final_op = 1; + stream_op.recv_trailing_metadata = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; break; } } GRPC_CALL_INTERNAL_REF(call, "completion"); - grpc_cq_begin_op(call->cq); + if (!is_notify_tag_closure) { + grpc_cq_begin_op(call->cq); + } + gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); + + stream_op.context = call->context; + grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); + stream_op.on_complete = &bctl->finish_batch; + gpr_mu_unlock(&call->mu); + + execute_op(exec_ctx, call, &stream_op); - error = grpc_call_start_ioreq_and_call_back(&exec_ctx, call, reqs, out, - finish_func, tag); done: - grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_start_batch", 0); return error; + +done_with_error: + /* reverse any mutations that occured */ + if (bctl->send_initial_metadata) { + call->sent_initial_metadata = 0; + grpc_metadata_batch_clear(&call->metadata_batch[0][0]); + } + if (bctl->send_message) { + call->sending_message = 0; + grpc_byte_stream_destroy(&call->sending_stream.base); + } + if (bctl->send_final_op) { + call->sent_final_op = 0; + grpc_metadata_batch_clear(&call->metadata_batch[0][1]); + } + if (bctl->recv_initial_metadata) { + call->received_initial_metadata = 0; + } + if (bctl->recv_message) { + call->receiving_message = 0; + } + if (bctl->recv_final_op) { + call->received_final_op = 0; + } + gpr_mu_unlock(&call->mu); + goto done; +} + +grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, + size_t nops, void *tag, void *reserved) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_call_error err; + + GRPC_API_TRACE( + "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", + 5, (call, ops, (unsigned long)nops, tag, reserved)); + + if (reserved != NULL) { + err = GRPC_CALL_ERROR; + } else { + err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0); + } + + grpc_exec_ctx_finish(&exec_ctx); + return err; +} + +grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx, + grpc_call *call, + const grpc_op *ops, + size_t nops, + grpc_closure *closure) { + return call_start_batch(exec_ctx, call, ops, nops, closure, 1); } void grpc_call_context_set(grpc_call *call, grpc_context_index elem, diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 9b7c6f9bfb..24ab9c33bb 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -44,51 +44,6 @@ extern "C" { #endif -/* Primitive operation types - grpc_op's get rewritten into these */ -typedef enum { - GRPC_IOREQ_RECV_INITIAL_METADATA, - GRPC_IOREQ_RECV_MESSAGE, - GRPC_IOREQ_RECV_TRAILING_METADATA, - GRPC_IOREQ_RECV_STATUS, - GRPC_IOREQ_RECV_STATUS_DETAILS, - GRPC_IOREQ_RECV_CLOSE, - GRPC_IOREQ_SEND_INITIAL_METADATA, - GRPC_IOREQ_SEND_MESSAGE, - GRPC_IOREQ_SEND_TRAILING_METADATA, - GRPC_IOREQ_SEND_STATUS, - GRPC_IOREQ_SEND_CLOSE, - GRPC_IOREQ_OP_COUNT -} grpc_ioreq_op; - -typedef union { - grpc_metadata_array *recv_metadata; - grpc_byte_buffer **recv_message; - struct { - void (*set_value)(grpc_status_code status, void *user_data); - void *user_data; - } recv_status; - struct { - char **details; - size_t *details_capacity; - } recv_status_details; - struct { - size_t count; - grpc_metadata *metadata; - } send_metadata; - grpc_byte_buffer *send_message; - struct { - grpc_status_code code; - grpc_mdstr *details; - } send_status; -} grpc_ioreq_data; - -typedef struct { - grpc_ioreq_op op; - gpr_uint32 flags; - /**< A copy of the write flags from grpc_op */ - grpc_ioreq_data data; -} grpc_ioreq; - typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx, grpc_call *call, int success, void *user_data); @@ -105,7 +60,7 @@ void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_completion_queue *cq); grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); -#ifdef GRPC_CALL_REF_COUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_call_internal_ref(grpc_call *call, const char *reason); void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *call, const char *reason); @@ -121,12 +76,14 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *call); grpc_call_internal_unref(exec_ctx, call) #endif -grpc_call_error grpc_call_start_ioreq_and_call_back( - grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); - grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); +grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx, + grpc_call *call, + const grpc_op *ops, + size_t nops, + grpc_closure *closure); + /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); @@ -157,16 +114,6 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem); #define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \ if (grpc_api_trace) grpc_call_log_batch(sev, call, ops, nops, tag) -#define GRPC_SERVER_LOG_REQUEST_CALL(sev, server, call, details, \ - initial_metadata, cq_bound_to_call, \ - cq_for_notifications, tag) \ - if (grpc_api_trace) \ - grpc_server_log_request_call(sev, server, call, details, initial_metadata, \ - cq_bound_to_call, cq_for_notifications, tag) - -#define GRPC_SERVER_LOG_SHUTDOWN(sev, server, cq, tag) \ - if (grpc_api_trace) grpc_server_log_shutdown(sev, server, cq, tag) - gpr_uint8 grpc_call_is_client(grpc_call *call); #ifdef __cplusplus diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c index 2dd9737cf8..16212f6386 100644 --- a/src/core/surface/call_log_batch.c +++ b/src/core/surface/call_log_batch.c @@ -110,9 +110,6 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, void *tag) { char *tmp; size_t i; - gpr_log(file, line, severity, - "grpc_call_start_batch(call=%p, ops=%p, nops=%d, tag=%p)", call, ops, - nops, tag); for (i = 0; i < nops; i++) { tmp = grpc_op_string(&ops[i]); gpr_log(file, line, severity, "ops[%d]: %s", i, tmp); diff --git a/src/core/surface/call_test_only.h b/src/core/surface/call_test_only.h index df4be3248b..b3a2bbd6f1 100644 --- a/src/core/surface/call_test_only.h +++ b/src/core/surface/call_test_only.h @@ -57,7 +57,6 @@ gpr_uint32 grpc_call_test_only_get_message_flags(grpc_call *call); * To be indexed by grpc_compression_algorithm enum values. */ gpr_uint32 grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call); - #ifdef __cplusplus } #endif diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index aa90b3f7f5..d56e5cbe84 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -71,9 +71,29 @@ struct grpc_completion_queue { int is_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_closure pollset_destroy_done; + grpc_closure pollset_shutdown_done; + + grpc_completion_queue *next_free; }; +static gpr_mu g_freelist_mu; +grpc_completion_queue *g_freelist; + +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, + int success); + +void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); } + +void grpc_cq_global_shutdown(void) { + gpr_mu_destroy(&g_freelist_mu); + while (g_freelist) { + grpc_completion_queue *next = g_freelist->next_free; + grpc_pollset_destroy(&g_freelist->pollset); + gpr_free(g_freelist); + g_freelist = next; + } +} + struct grpc_cq_alarm { grpc_timer alarm; grpc_cq_completion completion; @@ -83,22 +103,41 @@ struct grpc_cq_alarm { void *tag; }; -static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *cc, - int success); - grpc_completion_queue *grpc_completion_queue_create(void *reserved) { - grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); - GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); + grpc_completion_queue *cc; GPR_ASSERT(!reserved); - memset(cc, 0, sizeof(*cc)); + + GPR_TIMER_BEGIN("grpc_completion_queue_create", 0); + + GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); + + gpr_mu_lock(&g_freelist_mu); + if (g_freelist == NULL) { + gpr_mu_unlock(&g_freelist_mu); + + cc = gpr_malloc(sizeof(grpc_completion_queue)); + grpc_pollset_init(&cc->pollset); + } else { + cc = g_freelist; + g_freelist = g_freelist->next_free; + gpr_mu_unlock(&g_freelist_mu); + /* pollset already initialized */ + } + /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ gpr_ref_init(&cc->owning_refs, 2); - grpc_pollset_init(&cc->pollset); cc->completed_tail = &cc->completed_head; cc->completed_head.next = (gpr_uintptr)cc->completed_tail; - grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc); + cc->shutdown = 0; + cc->shutdown_called = 0; + cc->is_server_cq = 0; + cc->num_pluckers = 0; + grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc); + + GPR_TIMER_END("grpc_completion_queue_create", 0); + return cc; } @@ -113,8 +152,8 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { gpr_ref(&cc->owning_refs); } -static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *arg, - int success) { +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, + int success) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } @@ -129,8 +168,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head); - grpc_pollset_destroy(&cc->pollset); - gpr_free(cc); + grpc_pollset_reset(&cc->pollset); + gpr_mu_lock(&g_freelist_mu); + cc->next_free = g_freelist; + g_freelist = cc; + gpr_mu_unlock(&g_freelist_mu); } } @@ -185,8 +227,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; + grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_destroy_done); } GPR_TIMER_END("grpc_cq_end_op", 0); @@ -365,29 +407,31 @@ done: to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); if (cc->shutdown_called) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cc->shutdown_called = 1; - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (gpr_unref(&cc->pending_events)) { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_destroy_done); + grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); + GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); + GPR_TIMER_END("grpc_completion_queue_destroy", 0); } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 5f8282e542..a40bb048ac 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -83,4 +83,7 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); +void grpc_cq_global_init(void); +void grpc_cq_global_shutdown(void); + #endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/surface/init.c b/src/core/surface/init.c index b2e66a830e..f8cba01cad 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -52,6 +52,7 @@ #include "src/core/profiling/timers.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" +#include "src/core/surface/completion_queue.h" #include "src/core/surface/init.h" #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" @@ -118,6 +119,7 @@ void grpc_init(void) { } } gpr_timers_global_init(); + grpc_cq_global_init(); for (i = 0; i < g_number_of_plugins; i++) { if (g_all_of_the_plugins[i].init != NULL) { g_all_of_the_plugins[i].init(); @@ -133,8 +135,9 @@ void grpc_shutdown(void) { GRPC_API_TRACE("grpc_shutdown(void)", 0, ()); gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { - grpc_iomgr_shutdown(); grpc_executor_shutdown(); + grpc_cq_global_shutdown(); + grpc_iomgr_shutdown(); census_shutdown(); gpr_timers_global_destroy(); grpc_tracer_shutdown(); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index e72264fbcd..fc458a603d 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -55,38 +55,33 @@ typedef struct { const char *error_message; } channel_data; +static void fill_metadata(grpc_call_element *elem, grpc_metadata_batch *mdb) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + char tmp[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(chand->error_code, tmp); + calld->status.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp); + calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", + chand->error_message); + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb->list.head = &calld->status; + mdb->list.tail = &calld->details; + mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); +} + static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->send_ops != NULL) { - grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0); - } - if (op->recv_ops != NULL) { - char tmp[GPR_LTOA_MIN_BUFSIZE]; - grpc_metadata_batch mdb; - gpr_ltoa(chand->error_code, tmp); - calld->status.md = - grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp); - calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", - chand->error_message); - calld->status.prev = calld->details.next = NULL; - calld->status.next = &calld->details; - calld->details.prev = &calld->status; - mdb.list.head = &calld->status; - mdb.list.tail = &calld->details; - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - grpc_sopb_add_metadata(op->recv_ops, mdb); - *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1); - } - if (op->on_consumed != NULL) { - op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0); + if (op->recv_initial_metadata != NULL) { + fill_metadata(elem, op->recv_initial_metadata); + } else if (op->recv_trailing_metadata != NULL) { + fill_metadata(elem, op->recv_trailing_metadata); } + grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -109,25 +104,19 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *transport_server_data, - grpc_transport_stream_op *initial_op) { - if (initial_op) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, initial_op); - } -} + grpc_call_element_args *args) {} static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {} static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; - GPR_ASSERT(is_first); - GPR_ASSERT(is_last); - chand->mdctx = mdctx; - chand->master = master; + GPR_ASSERT(args->is_first); + GPR_ASSERT(args->is_last); + chand->mdctx = args->metadata_context; + chand->master = args->master; } static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, @@ -135,8 +124,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, static const grpc_channel_filter lame_filter = { lame_start_transport_stream_op, lame_start_transport_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, lame_get_peer, "lame-client", + init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + lame_get_peer, "lame-client", }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 819226278d..e9f380083f 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -84,18 +84,18 @@ typedef struct requested_call { grpc_completion_queue *cq_for_notification; grpc_call **call; grpc_cq_completion completion; + grpc_metadata_array *initial_metadata; union { struct { grpc_call_details *details; - grpc_metadata_array *initial_metadata; } batch; struct { registered_method *registered_method; gpr_timespec *deadline; - grpc_metadata_array *initial_metadata; grpc_byte_buffer **optional_payload; } registered; } data; + grpc_closure publish; } requested_call; typedef struct channel_registered_method { @@ -150,16 +150,16 @@ struct call_data { grpc_mdstr *path; grpc_mdstr *host; gpr_timespec deadline; - int got_initial_metadata; grpc_completion_queue *cq_new; - grpc_stream_op_buffer *recv_ops; - grpc_stream_state *recv_state; - grpc_closure *on_done_recv; + grpc_metadata_batch *recv_initial_metadata; + grpc_metadata_array initial_metadata; - grpc_closure server_on_recv; + grpc_closure got_initial_metadata; + grpc_closure server_on_recv_initial_metadata; grpc_closure kill_zombie_closure; + grpc_closure *on_done_recv_initial_metadata; call_data *pending_next; }; @@ -396,7 +396,6 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, int success) { channel_data *chand = cd; grpc_server *server = chand->server; - gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); server_unref(exec_ctx, server); } @@ -571,79 +570,35 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void server_on_recv(grpc_exec_ctx *exec_ctx, void *ptr, int success) { +static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, + int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; gpr_timespec op_deadline; - if (success && !calld->got_initial_metadata) { - size_t i; - size_t nops = calld->recv_ops->nops; - grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - op_deadline = op->data.metadata.deadline; - if (0 != - gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { - calld->deadline = op->data.metadata.deadline; - } - if (calld->host && calld->path) { - calld->got_initial_metadata = 1; - start_new_rpc(exec_ctx, elem); - } - break; - } + grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, elem); + op_deadline = calld->recv_initial_metadata->deadline; + if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { + calld->deadline = op_deadline; } - - switch (*calld->recv_state) { - case GRPC_STREAM_OPEN: - break; - case GRPC_STREAM_SEND_CLOSED: - break; - case GRPC_STREAM_RECV_CLOSED: - gpr_mu_lock(&calld->mu_state); - if (calld->state == NOT_STARTED) { - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); - } else { - gpr_mu_unlock(&calld->mu_state); - } - break; - case GRPC_STREAM_CLOSED: - gpr_mu_lock(&calld->mu_state); - if (calld->state == NOT_STARTED) { - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); - } else if (calld->state == PENDING) { - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); - /* zombied call will be destroyed when it's removed from the pending - queue... later */ - } else { - gpr_mu_unlock(&calld->mu_state); - } - break; + if (calld->host && calld->path) { + /* do nothing */ + } else { + success = 0; } - calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); + calld->on_done_recv_initial_metadata->cb( + exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, success); } static void server_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; - if (op->recv_ops) { - /* substitute our callback for the higher callback */ - calld->recv_ops = op->recv_ops; - calld->recv_state = op->recv_state; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = &calld->server_on_recv; + if (op->recv_initial_metadata != NULL) { + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->on_done_recv_initial_metadata = op->on_complete; + op->on_complete = &calld->server_on_recv_initial_metadata; } } @@ -655,12 +610,48 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_next_op(exec_ctx, elem, op); } -static void accept_stream(void *cd, grpc_transport *transport, +static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, + int success) { + grpc_call_element *elem = ptr; + call_data *calld = elem->call_data; + if (success) { + start_new_rpc(exec_ctx, elem); + } else { + gpr_mu_lock(&calld->mu_state); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + } else if (calld->state == PENDING) { + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + /* zombied call will be destroyed when it's removed from the pending + queue... later */ + } else { + gpr_mu_unlock(&calld->mu_state); + } + } +} + +static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, + grpc_transport *transport, const void *transport_server_data) { channel_data *chand = cd; /* create a call */ - grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL, - 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_call *call = + grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, + NULL, 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_INITIAL_METADATA; + op.data.recv_initial_metadata = &calld->initial_metadata; + grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem); + grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, + &calld->got_initial_metadata); } static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, @@ -685,8 +676,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -694,11 +684,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); - grpc_closure_init(&calld->server_on_recv, server_on_recv, elem); + grpc_closure_init(&calld->server_on_recv_initial_metadata, + server_on_recv_initial_metadata, elem); server_ref(chand->server); - - if (initial_op) server_mutate_op(elem, initial_op); } static void destroy_call_elem(grpc_exec_ctx *exec_ctx, @@ -714,6 +703,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, if (calld->path) { GRPC_MDSTR_UNREF(calld->path); } + grpc_metadata_array_destroy(&calld->initial_metadata); gpr_mu_destroy(&calld->mu_state); @@ -721,17 +711,16 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, } static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, int is_first, - int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; - GPR_ASSERT(is_first); - GPR_ASSERT(!is_last); + GPR_ASSERT(args->is_first); + GPR_ASSERT(!args->is_last); chand->server = NULL; chand->channel = NULL; - chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); - chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); + chand->path_key = grpc_mdstr_from_string(args->metadata_context, ":path"); + chand->authority_key = + grpc_mdstr_from_string(args->metadata_context, ":authority"); chand->next = chand->prev = chand; chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; @@ -769,8 +758,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, static const grpc_channel_filter server_surface_filter = { server_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, grpc_call_next_get_peer, "server", + init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "server", }; void grpc_server_register_completion_queue(grpc_server *server, @@ -1022,8 +1012,6 @@ void grpc_server_shutdown_and_notify(grpc_server *server, GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); - GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag); - /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq); @@ -1187,12 +1175,9 @@ grpc_call_error grpc_server_request_call( GRPC_API_TRACE( "grpc_server_request_call(" "server=%p, call=%p, details=%p, initial_metadata=%p, " - "cq_bound_to_call=%p, cq_for_notification=%p, tag%p)", + "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)", 7, (server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag)); - GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, - initial_metadata, cq_bound_to_call, - cq_for_notification, tag); if (!grpc_cq_is_server_cq(cq_for_notification)) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; @@ -1207,7 +1192,7 @@ grpc_call_error grpc_server_request_call( rc->cq_for_notification = cq_for_notification; rc->call = call; rc->data.batch.details = details; - rc->data.batch.initial_metadata = initial_metadata; + rc->initial_metadata = initial_metadata; error = queue_call_request(&exec_ctx, server, rc); done: grpc_exec_ctx_finish(&exec_ctx); @@ -1244,7 +1229,7 @@ grpc_call_error grpc_server_request_registered_call( rc->call = call; rc->data.registered.registered_method = rm; rc->data.registered.deadline = deadline; - rc->data.registered.initial_metadata = initial_metadata; + rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; error = queue_call_request(&exec_ctx, server, rc); done: @@ -1253,12 +1238,7 @@ done: } static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, - grpc_call *call, int success, - void *tag); -static void publish_was_not_set(grpc_exec_ctx *exec_ctx, grpc_call *call, - int success, void *tag) { - abort(); -} + void *user_data, int success); static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { gpr_slice slice = value->slice; @@ -1273,9 +1253,10 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, call_data *calld, requested_call *rc) { - grpc_ioreq_completion_func publish = publish_was_not_set; - grpc_ioreq req[2]; - grpc_ioreq *r = req; + grpc_op ops[1]; + grpc_op *op = ops; + + memset(ops, 0, sizeof(ops)); /* called once initial metadata has been read by the call, but BEFORE the ioreq to fetch it out of the call has been executed. @@ -1284,8 +1265,10 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, an ioreq op, that should complete immediately. */ grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); + grpc_closure_init(&rc->publish, publish_registered_or_batch, rc); *rc->call = calld->call; calld->cq_new = rc->cq_for_notification; + GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata); switch (rc->type) { case BATCH_CALL: GPR_ASSERT(calld->host != NULL); @@ -1295,31 +1278,22 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; - r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; - r->data.recv_metadata = rc->data.batch.initial_metadata; - r->flags = 0; - r++; - publish = publish_registered_or_batch; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; - r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; - r->data.recv_metadata = rc->data.registered.initial_metadata; - r->flags = 0; - r++; if (rc->data.registered.optional_payload) { - r->op = GRPC_IOREQ_RECV_MESSAGE; - r->data.recv_message = rc->data.registered.optional_payload; - r->flags = 0; - r++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = rc->data.registered.optional_payload; + op++; } - publish = publish_registered_or_batch; break; + default: + GPR_UNREACHABLE_CODE(return ); } GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_ioreq_and_call_back(exec_ctx, calld->call, req, - (size_t)(r - req), publish, rc); + grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops, + (size_t)(op - ops), &rc->publish); } static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, @@ -1342,25 +1316,19 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, requested_call *rc) { *rc->call = NULL; - switch (rc->type) { - case BATCH_CALL: - rc->data.batch.initial_metadata->count = 0; - break; - case REGISTERED_CALL: - rc->data.registered.initial_metadata->count = 0; - break; - } + rc->initial_metadata->count = 0; + server_ref(server); grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); } -static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, - grpc_call *call, int success, - void *prc) { +static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc, + int success) { + requested_call *rc = prc; + grpc_call *call = *rc->call; grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - requested_call *rc = prc; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; server_ref(chand->server); |