diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/channel_stack.c | 16 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 8 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 16 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 4 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_queue.c | 78 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_queue.h | 59 | ||||
-rw-r--r-- | src/core/surface/call.c | 1796 | ||||
-rw-r--r-- | src/core/surface/call.h | 79 | ||||
-rw-r--r-- | src/core/surface/call_details.c | 13 | ||||
-rw-r--r-- | src/core/surface/channel.c | 34 | ||||
-rw-r--r-- | src/core/surface/client.c | 12 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 36 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 8 | ||||
-rw-r--r-- | src/core/surface/event_string.c | 6 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 16 | ||||
-rw-r--r-- | src/core/surface/metadata_array.c | 17 | ||||
-rw-r--r-- | src/core/surface/server.c | 324 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 2 | ||||
-rw-r--r-- | src/cpp/client/channel.cc | 1 |
20 files changed, 1651 insertions, 876 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index e28bbd798d..d9e722c4f1 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -210,6 +210,7 @@ void grpc_call_element_recv_metadata(grpc_call_element *cur_elem, metadata_op.dir = GRPC_CALL_UP; metadata_op.done_cb = do_nothing; metadata_op.user_data = NULL; + metadata_op.flags = 0; metadata_op.data.metadata = mdelem; grpc_call_next_op(cur_elem, &metadata_op); } @@ -221,6 +222,7 @@ void grpc_call_element_send_metadata(grpc_call_element *cur_elem, metadata_op.dir = GRPC_CALL_DOWN; metadata_op.done_cb = do_nothing; metadata_op.user_data = NULL; + metadata_op.flags = 0; metadata_op.data.metadata = mdelem; grpc_call_next_op(cur_elem, &metadata_op); } @@ -231,14 +233,16 @@ void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { cancel_op.dir = GRPC_CALL_DOWN; cancel_op.done_cb = do_nothing; cancel_op.user_data = NULL; + cancel_op.flags = 0; grpc_call_next_op(cur_elem, &cancel_op); } void grpc_call_element_send_finish(grpc_call_element *cur_elem) { - grpc_call_op cancel_op; - cancel_op.type = GRPC_SEND_FINISH; - cancel_op.dir = GRPC_CALL_DOWN; - cancel_op.done_cb = do_nothing; - cancel_op.user_data = NULL; - grpc_call_next_op(cur_elem, &cancel_op); + grpc_call_op finish_op; + finish_op.type = GRPC_SEND_FINISH; + finish_op.dir = GRPC_CALL_DOWN; + finish_op.done_cb = do_nothing; + finish_op.user_data = NULL; + finish_op.flags = 0; + grpc_call_next_op(cur_elem, &finish_op); } diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index d35cede97b..61a6caf032 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -298,10 +298,6 @@ static void recv_error(channel_data *chand, call_data *calld, int line, static void do_nothing(void *calldata, grpc_op_error error) {} -static void done_message(void *user_data, grpc_op_error error) { - grpc_byte_buffer_destroy(user_data); -} - static void finish_message(channel_data *chand, call_data *calld) { grpc_call_element *elem = calld->elem; grpc_call_op call_op; @@ -309,9 +305,9 @@ static void finish_message(channel_data *chand, call_data *calld) { call_op.flags = 0; /* if we got all the bytes for this message, call up the stack */ call_op.type = GRPC_RECV_MESSAGE; - call_op.done_cb = done_message; + call_op.done_cb = do_nothing; /* TODO(ctiller): this could be a lot faster if coded directly */ - call_op.user_data = call_op.data.message = grpc_byte_buffer_create( + call_op.data.message = grpc_byte_buffer_create( calld->incoming_message.slices, calld->incoming_message.count); gpr_slice_buffer_reset_and_unref(&calld->incoming_message); diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 96acb385bf..a2b5f48f60 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -1,5 +1,4 @@ /* - * * Copyright 2014, Google Inc. * All rights reserved. * @@ -44,6 +43,7 @@ typedef struct channel_data { grpc_mdelem *method; grpc_mdelem *scheme; grpc_mdelem *content_type; + grpc_mdelem *status; } channel_data; /* used to silence 'variable not used' warnings */ @@ -86,6 +86,18 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->content_type)); grpc_call_next_op(elem, op); break; + case GRPC_RECV_METADATA: + if (op->data.metadata == channeld->status) { + grpc_mdelem_unref(op->data.metadata); + op->done_cb(op->user_data, GRPC_OP_OK); + } else if (op->data.metadata->key == channeld->status->key) { + grpc_mdelem_unref(op->data.metadata); + op->done_cb(op->user_data, GRPC_OP_OK); + grpc_call_element_send_cancel(elem); + } else { + grpc_call_next_op(elem, op); + } + break; default: /* pass control up or down the stack depending on op->dir */ grpc_call_next_op(elem, op); @@ -166,6 +178,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_mdelem_from_strings(mdctx, ":scheme", scheme_from_args(args)); channeld->content_type = grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); + channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); } /* Destructor for channel data */ @@ -177,6 +190,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_mdelem_unref(channeld->method); grpc_mdelem_unref(channeld->scheme); grpc_mdelem_unref(channeld->content_type); + grpc_mdelem_unref(channeld->status); } const grpc_channel_filter grpc_http_client_filter = { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 994dbe495d..b1c2c64a18 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -80,9 +80,7 @@ void grpc_pollset_kick(grpc_pollset *p) { } } -void grpc_pollset_force_kick(grpc_pollset *p) { - grpc_pollset_kick_kick(&p->kick_state); -} +void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick_kick(&p->kick_state); } /* global state management */ diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c new file mode 100644 index 0000000000..dc280a60c5 --- /dev/null +++ b/src/core/surface/byte_buffer_queue.c @@ -0,0 +1,78 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/byte_buffer_queue.h" +#include <grpc/support/alloc.h> +#include <grpc/support/useful.h> + +static void bba_destroy(grpc_bbq_array *array) { gpr_free(array->data); } + +/* Append an operation to an array, expanding as needed */ +static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { + if (a->count == a->capacity) { + a->capacity = GPR_MAX(a->capacity * 2, 8); + a->data = gpr_realloc(a->data, sizeof(grpc_byte_buffer *) * a->capacity); + } + a->data[a->count++] = buffer; +} + +void grpc_bbq_destroy(grpc_byte_buffer_queue *q) { + bba_destroy(&q->filling); + bba_destroy(&q->draining); +} + +int grpc_bbq_empty(grpc_byte_buffer_queue *q) { + return (q->drain_pos == q->draining.count && q->filling.count == 0); +} + +void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { + bba_push(&q->filling, buffer); +} + +grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { + grpc_bbq_array temp_array; + + if (q->drain_pos == q->draining.count) { + if (q->filling.count == 0) { + return NULL; + } + q->draining.count = 0; + q->drain_pos = 0; + /* swap arrays */ + temp_array = q->filling; + q->filling = q->draining; + q->draining = temp_array; + } + + return q->draining.data[q->drain_pos++]; +} diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h new file mode 100644 index 0000000000..358a42d5af --- /dev/null +++ b/src/core/surface/byte_buffer_queue.h @@ -0,0 +1,59 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ +#define __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ + +#include <grpc/byte_buffer.h> + +/* TODO(ctiller): inline an element or two into this struct to avoid per-call + allocations */ +typedef struct { + grpc_byte_buffer **data; + size_t count; + size_t capacity; +} grpc_bbq_array; + +/* should be initialized by zeroing memory */ +typedef struct { + size_t drain_pos; + grpc_bbq_array filling; + grpc_bbq_array draining; +} grpc_byte_buffer_queue; + +void grpc_bbq_destroy(grpc_byte_buffer_queue *q); +grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); +int grpc_bbq_empty(grpc_byte_buffer_queue *q); +void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); + +#endif /* __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5a24264cce..b9a2aa5434 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -36,6 +36,7 @@ #include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" +#include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> @@ -45,814 +46,808 @@ #include <stdlib.h> #include <string.h> -#define INVALID_TAG ((void *)0xdeadbeef) +typedef struct legacy_state legacy_state; +static void destroy_legacy_state(legacy_state *ls); -/* Pending read queue +typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; - This data structure tracks reads that need to be presented to the completion - queue but are waiting for the application to ask for them. */ - -#define INITIAL_PENDING_READ_COUNT 4 +typedef enum { + SEND_NOTHING, + SEND_INITIAL_METADATA, + SEND_MESSAGE, + SEND_TRAILING_METADATA_AND_FINISH, + SEND_FINISH +} send_action; typedef struct { - grpc_byte_buffer *byte_buffer; + grpc_ioreq_completion_func on_complete; void *user_data; - void (*on_finish)(void *user_data, grpc_op_error error); -} pending_read; + grpc_op_error status; +} completed_request; -/* TODO(ctiller): inline an element or two into this struct to avoid per-call - allocations */ -typedef struct { - pending_read *data; - size_t count; - size_t capacity; -} pending_read_array; +/* See request_set in grpc_call below for a description */ +#define REQSET_EMPTY 255 +#define REQSET_DONE 254 typedef struct { - size_t drain_pos; - pending_read_array filling; - pending_read_array draining; -} pending_read_queue; - -static void pra_init(pending_read_array *array) { - array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT); - array->count = 0; - array->capacity = INITIAL_PENDING_READ_COUNT; -} - -static void pra_destroy(pending_read_array *array, - size_t finish_starting_from) { - size_t i; - for (i = finish_starting_from; i < array->count; i++) { - array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR); - } - gpr_free(array->data); -} - -/* Append an operation to an array, expanding as needed */ -static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer, - void (*on_finish)(void *user_data, grpc_op_error error), - void *user_data) { - if (a->count == a->capacity) { - a->capacity *= 2; - a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity); - } - a->data[a->count].byte_buffer = buffer; - a->data[a->count].user_data = user_data; - a->data[a->count].on_finish = on_finish; - a->count++; -} - -static void prq_init(pending_read_queue *q) { - q->drain_pos = 0; - pra_init(&q->filling); - pra_init(&q->draining); -} - -static void prq_destroy(pending_read_queue *q) { - pra_destroy(&q->filling, 0); - pra_destroy(&q->draining, q->drain_pos); -} - -static int prq_is_empty(pending_read_queue *q) { - return (q->drain_pos == q->draining.count && q->filling.count == 0); -} - -static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer, - void (*on_finish)(void *user_data, grpc_op_error error), - void *user_data) { - pra_push(&q->filling, buffer, on_finish, user_data); -} - -/* Take the first queue element and move it to the completion queue. Do nothing - if q is empty */ -static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call, - grpc_completion_queue *cq) { - pending_read_array temp_array; - pending_read *pr; - - if (q->drain_pos == q->draining.count) { - if (q->filling.count == 0) { - return 0; - } - q->draining.count = 0; - q->drain_pos = 0; - /* swap arrays */ - temp_array = q->filling; - q->filling = q->draining; - q->draining = temp_array; - } + /* Overall status of the operation: starts OK, may degrade to + non-OK */ + grpc_op_error status; + /* Completion function to call at the end of the operation */ + grpc_ioreq_completion_func on_complete; + void *user_data; + /* a bit mask of which request ops are needed (1u << opid) */ + gpr_uint32 need_mask; + /* a bit mask of which request ops are now completed */ + gpr_uint32 complete_mask; +} reqinfo_master; + +/* Status data for a request can come from several sources; this + enumerates them all, and acts as a priority sorting for which + status to return to the application - earlier entries override + later ones */ +typedef enum { + /* Status came from the application layer overriding whatever + the wire says */ + STATUS_FROM_API_OVERRIDE = 0, + /* Status came from 'the wire' - or somewhere below the surface + layer */ + STATUS_FROM_WIRE, + STATUS_SOURCE_COUNT +} status_source; - pr = q->draining.data + q->drain_pos; - q->drain_pos++; - grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data, - pr->byte_buffer); - return 1; -} +typedef struct { + gpr_uint8 is_set; + grpc_status_code code; + grpc_mdstr *details; +} received_status; -/* grpc_call proper */ +/* How far through the GRPC stream have we read? */ +typedef enum { + /* We are still waiting for initial metadata to complete */ + READ_STATE_INITIAL = 0, + /* We have gotten initial metadata, and are reading either + messages or trailing metadata */ + READ_STATE_GOT_INITIAL_METADATA, + /* The stream is closed for reading */ + READ_STATE_READ_CLOSED, + /* The stream is closed for reading & writing */ + READ_STATE_STREAM_CLOSED +} read_state; -/* the state of a call, based upon which functions have been called against - said call */ typedef enum { - CALL_CREATED, - CALL_BOUNDCQ, - CALL_STARTED, - CALL_FINISHED -} call_state; + WRITE_STATE_INITIAL = 0, + WRITE_STATE_STARTED, + WRITE_STATE_WRITE_CLOSED +} write_state; struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; grpc_mdctx *metadata_context; - - call_state state; + /* TODO(ctiller): share with cq if possible? */ + gpr_mu mu; + + /* how far through the stream have we read? */ + read_state read_state; + /* how far through the stream have we written? */ + write_state write_state; + /* client or server call */ gpr_uint8 is_client; - gpr_uint8 have_write; - grpc_metadata_buffer incoming_metadata; - - /* protects variables in this section */ - gpr_mu read_mu; - gpr_uint8 received_start; - gpr_uint8 start_ok; - gpr_uint8 reads_done; - gpr_uint8 received_finish; - gpr_uint8 received_metadata; - gpr_uint8 have_read; + /* is the alarm set */ gpr_uint8 have_alarm; - gpr_uint8 pending_writes_done; - gpr_uint8 got_status_code; - /* The current outstanding read message tag (only valid if have_read == 1) */ - void *read_tag; - void *metadata_tag; - void *finished_tag; - pending_read_queue prq; - + /* are we currently performing a send operation */ + gpr_uint8 sending; + /* pairs with completed_requests */ + gpr_uint8 num_completed_requests; + /* flag that we need to request more data */ + gpr_uint8 need_more_data; + + /* Active ioreqs. + request_set and request_data contain one element per active ioreq + operation. + + request_set[op] is an integer specifying a set of operations to which + the request belongs: + - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending + completion, and the integer represents to which group of operations + the ioreq belongs. Each group is represented by one master, and the + integer in request_set is an index into masters to find the master + data. + - if it is REQSET_EMPTY, the ioreq op is inactive and available to be + started + - finally, if request_set[op] is REQSET_DONE, then the operation is + complete and unavailable to be started again + + request_data[op] is the request data as supplied by the initiator of + a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT. + The set fields are as per the request type specified by op. + + Finally, one element of masters[op] is set per active _group_ of ioreq + operations. It describes work left outstanding, result status, and + what work to perform upon operation completion. As one ioreq of each + op type can be active at once, by convention we choose the first element + of a the group to be the master. This allows constant time allocation + and a strong upper bound of a count of masters to be calculated. */ + gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; + grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; + reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; + + /* Dynamic array of ioreq's that have completed: the count of + elements is queued in num_completed_requests. + This list is built up under lock(), and flushed entirely during + unlock(). + We know the upper bound of the number of elements as we can only + have one ioreq of each type active at once. */ + completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; + /* Incoming buffer of messages */ + grpc_byte_buffer_queue incoming_queue; + /* Buffered read metadata waiting to be returned to the application. + Element 0 is initial metadata, element 1 is trailing metadata. */ + grpc_metadata_array buffered_metadata[2]; + /* All metadata received - unreffed at once at the end of the call */ + grpc_mdelem **owned_metadata; + size_t owned_metadata_count; + size_t owned_metadata_capacity; + + /* Received call statuses from various sources */ + received_status status[STATUS_SOURCE_COUNT]; + + /* Deadline alarm - if have_alarm is non-zero */ grpc_alarm alarm; - /* The current outstanding send message/context/invoke/end tag (only valid if - have_write == 1) */ - void *write_tag; - grpc_byte_buffer *pending_write; - gpr_uint32 pending_write_flags; - - /* The final status of the call */ - grpc_status_code status_code; - grpc_mdstr *status_details; - + /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; + + /* Data that the legacy api needs to track. To be deleted at some point + soon */ + legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) +#define SWAP(type, x, y) \ + do { \ + type temp = x; \ + x = y; \ + y = temp; \ + } while (0) + static void do_nothing(void *ignored, grpc_op_error also_ignored) {} +static send_action choose_send_action(grpc_call *call); +static void enact_send_action(grpc_call *call, send_action sa); -grpc_call *grpc_call_create(grpc_channel *channel, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data) { + size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); - call->cq = NULL; + memset(call, 0, sizeof(grpc_call)); + gpr_mu_init(&call->mu); call->channel = channel; + call->cq = cq; + call->is_client = server_transport_data == NULL; + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + call->request_set[i] = REQSET_EMPTY; + } + if (call->is_client) { + call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; + call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; + } grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); - call->state = CALL_CREATED; - call->is_client = (server_transport_data == NULL); - call->write_tag = INVALID_TAG; - call->read_tag = INVALID_TAG; - call->metadata_tag = INVALID_TAG; - call->finished_tag = INVALID_TAG; - call->have_read = 0; - call->have_write = 0; - call->have_alarm = 0; - call->received_metadata = 0; - call->got_status_code = 0; - call->start_ok = 0; - call->status_code = - server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN; - call->status_details = NULL; - call->received_finish = 0; - call->reads_done = 0; - call->received_start = 0; - call->pending_write = NULL; - call->pending_writes_done = 0; - grpc_metadata_buffer_init(&call->incoming_metadata); - gpr_ref_init(&call->internal_refcount, 1); + /* one ref is dropped in response to destroy, the other in + stream_closed */ + gpr_ref_init(&call->internal_refcount, 2); grpc_call_stack_init(channel_stack, server_transport_data, CALL_STACK_FROM_CALL(call)); - prq_init(&call->prq); - gpr_mu_init(&call->read_mu); return call; } +void grpc_call_set_completion_queue(grpc_call *call, + grpc_completion_queue *cq) { + call->cq = cq; +} + void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } -void grpc_call_internal_unref(grpc_call *c) { - if (gpr_unref(&c->internal_refcount)) { - grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); - grpc_metadata_buffer_destroy(&c->incoming_metadata, GRPC_OP_OK); - if (c->status_details) { - grpc_mdstr_unref(c->status_details); +static void destroy_call(void *call, int ignored_success) { + size_t i; + grpc_call *c = call; + grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); + grpc_channel_internal_unref(c->channel); + gpr_mu_destroy(&c->mu); + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (c->status[i].details) { + grpc_mdstr_unref(c->status[i].details); } - prq_destroy(&c->prq); - gpr_mu_destroy(&c->read_mu); - grpc_channel_internal_unref(c->channel); - gpr_free(c); } -} - -void grpc_call_destroy(grpc_call *c) { - int cancel; - gpr_mu_lock(&c->read_mu); - if (c->have_alarm) { - grpc_alarm_cancel(&c->alarm); - c->have_alarm = 0; + for (i = 0; i < c->owned_metadata_count; i++) { + grpc_mdelem_unref(c->owned_metadata[i]); } - cancel = !c->received_finish; - gpr_mu_unlock(&c->read_mu); - if (cancel) grpc_call_cancel(c); - grpc_call_internal_unref(c); -} - -static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) { - if (!call->got_status_code) { - call->status_code = status; - call->got_status_code = 1; + gpr_free(c->owned_metadata); + for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { + gpr_free(c->buffered_metadata[i].metadata); } + if (c->legacy_state) { + destroy_legacy_state(c->legacy_state); + } + gpr_free(c); } -static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) { - if (!call->status_details) { - call->status_details = grpc_mdstr_ref(status); +void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { + if (gpr_unref(&c->internal_refcount)) { + if (allow_immediate_deletion) { + destroy_call(c, 1); + } else { + grpc_iomgr_add_callback(destroy_call, c); + } } } -grpc_call_error grpc_call_cancel(grpc_call *c) { - grpc_call_element *elem; - grpc_call_op op; - - op.type = GRPC_CANCEL_OP; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - - elem = CALL_ELEM_FROM_CALL(c, 0); - elem->filter->call_op(elem, NULL, &op); - - return GRPC_CALL_OK; +static void set_status_code(grpc_call *call, status_source source, + gpr_uint32 status) { + call->status[source].is_set = 1; + call->status[source].code = status; } -grpc_call_error grpc_call_cancel_with_status(grpc_call *c, - grpc_status_code status, - const char *description) { - grpc_mdstr *details = - description ? grpc_mdstr_from_string(c->metadata_context, description) - : NULL; - gpr_mu_lock(&c->read_mu); - maybe_set_status_code(c, status); - if (details) { - maybe_set_status_details(c, details); +static void set_status_details(grpc_call *call, status_source source, + grpc_mdstr *status) { + if (call->status[source].details != NULL) { + grpc_mdstr_unref(call->status[source].details); } - gpr_mu_unlock(&c->read_mu); - return grpc_call_cancel(c); + call->status[source].details = status; } -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { - grpc_call_element *elem; - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, op); +static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { + if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED; + call->cq = cq; + return GRPC_CALL_OK; } -void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem, - gpr_uint32 flags) { - grpc_call_element *elem; +static void request_more_data(grpc_call *call) { grpc_call_op op; - GPR_ASSERT(call->state < CALL_FINISHED); - - op.type = GRPC_SEND_METADATA; + /* call down */ + op.type = GRPC_REQUEST_DATA; op.dir = GRPC_CALL_DOWN; - op.flags = flags; + op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; - op.data.metadata = mdelem; - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + grpc_call_execute_op(call, &op); } -grpc_call_error grpc_call_add_metadata_old(grpc_call *call, - grpc_metadata *metadata, - gpr_uint32 flags) { - grpc_mdelem *mdelem; - - if (call->is_client) { - if (call->state >= CALL_STARTED) { - return GRPC_CALL_ERROR_ALREADY_INVOKED; - } - } else { - if (call->state >= CALL_FINISHED) { - return GRPC_CALL_ERROR_ALREADY_FINISHED; - } - } - - mdelem = grpc_mdelem_from_string_and_buffer( - call->metadata_context, metadata->key, (gpr_uint8 *)metadata->value, - metadata->value_length); - grpc_call_add_mdelem(call, mdelem, flags); - return GRPC_CALL_OK; -} - -static void finish_call(grpc_call *call) { - size_t count; - grpc_metadata *elements; - count = grpc_metadata_buffer_count(&call->incoming_metadata); - elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata); - grpc_cq_end_finished( - call->cq, call->finished_tag, call, grpc_metadata_buffer_cleanup_elements, - elements, call->status_code, - call->status_details - ? (char *)grpc_mdstr_as_c_string(call->status_details) - : NULL, - elements, count); -} - -static void done_write(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error); +static int is_op_live(grpc_call *call, grpc_ioreq_op op) { + gpr_uint8 set = call->request_set[op]; + reqinfo_master *master; + if (set >= GRPC_IOREQ_OP_COUNT) return 0; + master = &call->masters[set]; + return (master->complete_mask & (1u << op)) == 0; } -static void done_writes_done(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error); -} - -static void call_started(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - grpc_call_element *elem; - grpc_byte_buffer *pending_write = NULL; - gpr_uint32 pending_write_flags = 0; - gpr_uint8 pending_writes_done = 0; - int ok; - grpc_call_op op; - - gpr_mu_lock(&call->read_mu); - GPR_ASSERT(!call->received_start); - call->received_start = 1; - ok = call->start_ok = (error == GRPC_OP_OK); - pending_write = call->pending_write; - pending_write_flags = call->pending_write_flags; - pending_writes_done = call->pending_writes_done; - gpr_mu_unlock(&call->read_mu); - - if (pending_write) { - if (ok) { - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = pending_write_flags; - op.done_cb = done_write; - op.user_data = call; - op.data.message = pending_write; +static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); - } else { - done_write(call, error); - } - grpc_byte_buffer_destroy(pending_write); - } - if (pending_writes_done) { - if (ok) { - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = done_writes_done; - op.user_data = call; +static void unlock(grpc_call *call) { + send_action sa = SEND_NOTHING; + completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; + int num_completed_requests = call->num_completed_requests; + int need_more_data = call->need_more_data && + !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA); + int i; - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); - } else { - done_writes_done(call, error); - } + if (need_more_data) { + call->need_more_data = 0; } - grpc_call_internal_unref(call); -} - -grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, - void *metadata_read_tag, - void *finished_tag, gpr_uint32 flags) { - grpc_call_element *elem; - grpc_call_op op; - - /* validate preconditions */ - if (!call->is_client) { - gpr_log(GPR_ERROR, "can only call %s on clients", __FUNCTION__); - return GRPC_CALL_ERROR_NOT_ON_SERVER; + if (num_completed_requests != 0) { + memcpy(completed_requests, call->completed_requests, + sizeof(completed_requests)); + call->num_completed_requests = 0; } - if (call->state >= CALL_STARTED || call->cq) { - gpr_log(GPR_ERROR, "call is already invoked"); - return GRPC_CALL_ERROR_ALREADY_INVOKED; + if (!call->sending) { + sa = choose_send_action(call); + if (sa != SEND_NOTHING) { + call->sending = 1; + grpc_call_internal_ref(call); + } } - if (call->have_write) { - gpr_log(GPR_ERROR, "can only have one pending write operation at a time"); - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; - } + gpr_mu_unlock(&call->mu); - if (call->have_read) { - gpr_log(GPR_ERROR, "can only have one pending read operation at a time"); - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + if (need_more_data) { + request_more_data(call); } - if (flags & GRPC_WRITE_NO_COMPRESS) { - return GRPC_CALL_ERROR_INVALID_FLAGS; + if (sa != SEND_NOTHING) { + enact_send_action(call, sa); } - /* inform the completion queue of an incoming operation */ - grpc_cq_begin_op(cq, call, GRPC_FINISHED); - grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); - - gpr_mu_lock(&call->read_mu); - - /* update state */ - call->cq = cq; - call->state = CALL_STARTED; - call->finished_tag = finished_tag; - - if (call->received_finish) { - /* handle early cancellation */ - grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL, - NULL, 0, NULL); - finish_call(call); - - /* early out.. unlock & return */ - gpr_mu_unlock(&call->read_mu); - return GRPC_CALL_OK; + for (i = 0; i < num_completed_requests; i++) { + completed_requests[i].on_complete(call, completed_requests[i].status, + completed_requests[i].user_data); } - - call->metadata_tag = metadata_read_tag; - - gpr_mu_unlock(&call->read_mu); - - /* call down the filter stack */ - op.type = GRPC_SEND_START; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = call_started; - op.data.start.pollset = grpc_cq_pollset(cq); - op.user_data = call; - grpc_call_internal_ref(call); - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); - - return GRPC_CALL_OK; } -grpc_call_error grpc_call_server_accept_old(grpc_call *call, - grpc_completion_queue *cq, - void *finished_tag) { - /* validate preconditions */ - if (call->is_client) { - gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__); - return GRPC_CALL_ERROR_NOT_ON_CLIENT; +static void get_final_status(grpc_call *call, grpc_ioreq_data out) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + out.recv_status.set_value(call->status[i].code, + out.recv_status.user_data); + return; + } } + out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data); +} - if (call->state >= CALL_BOUNDCQ) { - gpr_log(GPR_ERROR, "call is already accepted"); - return GRPC_CALL_ERROR_ALREADY_ACCEPTED; +static void get_final_details(grpc_call *call, grpc_ioreq_data out) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + if (call->status[i].details) { + gpr_slice details = call->status[i].details->slice; + size_t len = GPR_SLICE_LENGTH(details); + if (len + 1 > *out.recv_status_details.details_capacity) { + *out.recv_status_details.details_capacity = GPR_MAX( + len + 1, *out.recv_status_details.details_capacity * 3 / 2); + *out.recv_status_details.details = + gpr_realloc(*out.recv_status_details.details, + *out.recv_status_details.details_capacity); + } + memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details), + len); + (*out.recv_status_details.details)[len] = 0; + } else { + goto no_details; + } + return; + } } - /* inform the completion queue of an incoming operation (corresponding to - finished_tag) */ - grpc_cq_begin_op(cq, call, GRPC_FINISHED); - - /* update state */ - gpr_mu_lock(&call->read_mu); - call->state = CALL_BOUNDCQ; - call->cq = cq; - call->finished_tag = finished_tag; - call->received_start = 1; - if (prq_is_empty(&call->prq) && call->received_finish) { - finish_call(call); - - /* early out.. unlock & return */ - gpr_mu_unlock(&call->read_mu); - return GRPC_CALL_OK; +no_details: + if (0 == *out.recv_status_details.details_capacity) { + *out.recv_status_details.details_capacity = 8; + *out.recv_status_details.details = + gpr_malloc(*out.recv_status_details.details_capacity); } - gpr_mu_unlock(&call->read_mu); - - return GRPC_CALL_OK; + **out.recv_status_details.details = 0; } -grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call, - gpr_uint32 flags) { - grpc_call_element *elem; - grpc_call_op op; - - /* validate preconditions */ - if (call->is_client) { - gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__); - return GRPC_CALL_ERROR_NOT_ON_CLIENT; +static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, + grpc_op_error status) { + completed_request *cr; + gpr_uint8 master_set = call->request_set[op]; + reqinfo_master *master; + size_t i; + /* ioreq is live: we need to do something */ + master = &call->masters[master_set]; + master->complete_mask |= 1u << op; + if (status != GRPC_OP_OK) { + master->status = status; + master->complete_mask = master->need_mask; } - - if (call->state >= CALL_STARTED) { - gpr_log(GPR_ERROR, "call is already started"); - return GRPC_CALL_ERROR_ALREADY_INVOKED; + if (master->complete_mask == master->need_mask) { + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (call->request_set[i] != master_set) { + continue; + } + call->request_set[i] = REQSET_DONE; + switch ((grpc_ioreq_op)i) { + case GRPC_IOREQ_RECV_MESSAGE: + case GRPC_IOREQ_SEND_MESSAGE: + if (master->status == GRPC_OP_OK) { + call->request_set[i] = REQSET_EMPTY; + } else { + call->write_state = WRITE_STATE_WRITE_CLOSED; + } + break; + case GRPC_IOREQ_RECV_CLOSE: + case GRPC_IOREQ_SEND_INITIAL_METADATA: + case GRPC_IOREQ_SEND_TRAILING_METADATA: + case GRPC_IOREQ_SEND_STATUS: + case GRPC_IOREQ_SEND_CLOSE: + break; + case GRPC_IOREQ_RECV_STATUS: + get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]); + break; + case GRPC_IOREQ_RECV_STATUS_DETAILS: + get_final_details(call, + call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]); + break; + case GRPC_IOREQ_RECV_INITIAL_METADATA: + SWAP(grpc_metadata_array, call->buffered_metadata[0], + *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA] + .recv_metadata); + break; + case GRPC_IOREQ_RECV_TRAILING_METADATA: + SWAP(grpc_metadata_array, call->buffered_metadata[1], + *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA] + .recv_metadata); + break; + case GRPC_IOREQ_OP_COUNT: + abort(); + break; + } + } + cr = &call->completed_requests[call->num_completed_requests++]; + cr->status = master->status; + cr->on_complete = master->on_complete; + cr->user_data = master->user_data; } +} - if (flags & GRPC_WRITE_NO_COMPRESS) { - return GRPC_CALL_ERROR_INVALID_FLAGS; +static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, + grpc_op_error status) { + if (is_op_live(call, op)) { + finish_live_ioreq_op(call, op, status); } +} - /* update state */ - call->state = CALL_STARTED; - - /* call down */ - op.type = GRPC_SEND_START; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = do_nothing; - op.data.start.pollset = grpc_cq_pollset(call->cq); - op.user_data = NULL; - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); +static void finish_send_op(grpc_call *call, grpc_ioreq_op op, + grpc_op_error error) { + lock(call); + finish_ioreq_op(call, op, error); + call->sending = 0; + unlock(call); + grpc_call_internal_unref(call, 0); +} - return GRPC_CALL_OK; +static void finish_write_step(void *pc, grpc_op_error error) { + finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, error); } -void grpc_call_client_initial_metadata_complete( - grpc_call_element *surface_element) { - grpc_call *call = grpc_call_from_top_element(surface_element); - size_t count; - grpc_metadata *elements; +static void finish_finish_step(void *pc, grpc_op_error error) { + finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, error); +} - gpr_mu_lock(&call->read_mu); - count = grpc_metadata_buffer_count(&call->incoming_metadata); - elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata); +static void finish_start_step(void *pc, grpc_op_error error) { + finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, error); +} - GPR_ASSERT(!call->received_metadata); - grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call, - grpc_metadata_buffer_cleanup_elements, - elements, count, elements); - call->received_metadata = 1; - call->metadata_tag = INVALID_TAG; - gpr_mu_unlock(&call->read_mu); +static send_action choose_send_action(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_INITIAL: + if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] != REQSET_EMPTY) { + call->write_state = WRITE_STATE_STARTED; + return SEND_INITIAL_METADATA; + } + return SEND_NOTHING; + case WRITE_STATE_STARTED: + if (call->request_set[GRPC_IOREQ_SEND_MESSAGE] != REQSET_EMPTY) { + return SEND_MESSAGE; + } + if (call->request_set[GRPC_IOREQ_SEND_CLOSE] != REQSET_EMPTY) { + call->write_state = WRITE_STATE_WRITE_CLOSED; + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); + return call->is_client ? SEND_FINISH + : SEND_TRAILING_METADATA_AND_FINISH; + } + return SEND_NOTHING; + case WRITE_STATE_WRITE_CLOSED: + return SEND_NOTHING; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + return SEND_NOTHING; } -static void request_more_data(grpc_call *call) { - grpc_call_element *elem; +static void send_metadata(grpc_call *call, grpc_mdelem *elem) { grpc_call_op op; - - /* call down */ - op.type = GRPC_REQUEST_DATA; + op.type = GRPC_SEND_METADATA; op.dir = GRPC_CALL_DOWN; op.flags = 0; + op.data.metadata = elem; op.done_cb = do_nothing; op.user_data = NULL; - - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + grpc_call_execute_op(call, &op); } -grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) { - gpr_uint8 request_more = 0; +static void enact_send_action(grpc_call *call, send_action sa) { + grpc_ioreq_data data; + grpc_call_op op; + size_t i; + char status_str[GPR_LTOA_MIN_BUFSIZE]; - switch (call->state) { - case CALL_CREATED: - return GRPC_CALL_ERROR_NOT_INVOKED; - case CALL_BOUNDCQ: - case CALL_STARTED: + switch (sa) { + case SEND_NOTHING: + abort(); + break; + case SEND_INITIAL_METADATA: + data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; + for (i = 0; i < data.send_metadata.count; i++) { + const grpc_metadata *md = &data.send_metadata.metadata[i]; + send_metadata(call, + grpc_mdelem_from_string_and_buffer( + call->metadata_context, md->key, + (const gpr_uint8 *)md->value, md->value_length)); + } + op.type = GRPC_SEND_START; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.data.start.pollset = grpc_cq_pollset(call->cq); + op.done_cb = finish_start_step; + op.user_data = call; + grpc_call_execute_op(call, &op); + break; + case SEND_MESSAGE: + data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + op.type = GRPC_SEND_MESSAGE; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.data.message = data.send_message; + op.done_cb = finish_write_step; + op.user_data = call; + grpc_call_execute_op(call, &op); + break; + case SEND_TRAILING_METADATA_AND_FINISH: + /* send trailing metadata */ + data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; + for (i = 0; i < data.send_metadata.count; i++) { + const grpc_metadata *md = &data.send_metadata.metadata[i]; + send_metadata(call, + grpc_mdelem_from_string_and_buffer( + call->metadata_context, md->key, + (const gpr_uint8 *)md->value, md->value_length)); + } + /* send status */ + /* TODO(ctiller): cache common status values */ + data = call->request_data[GRPC_IOREQ_SEND_STATUS]; + gpr_ltoa(data.send_status.code, status_str); + send_metadata( + call, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, status_str))); + if (data.send_status.details) { + send_metadata( + call, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_status.details))); + } + /* fallthrough: see choose_send_action for details */ + case SEND_FINISH: + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = finish_finish_step; + op.user_data = call; + grpc_call_execute_op(call, &op); break; - case CALL_FINISHED: - return GRPC_CALL_ERROR_ALREADY_FINISHED; } +} - gpr_mu_lock(&call->read_mu); - - if (call->have_read) { - gpr_mu_unlock(&call->read_mu); - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; +static grpc_call_error start_ioreq_error(grpc_call *call, + gpr_uint32 mutated_ops, + grpc_call_error ret) { + size_t i; + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (mutated_ops & (1u << i)) { + call->request_set[i] = REQSET_EMPTY; + } } + return ret; +} - grpc_cq_begin_op(call->cq, call, GRPC_READ); +static void finish_read_ops(grpc_call *call) { + int empty; - if (!prq_pop_to_cq(&call->prq, tag, call, call->cq)) { - if (call->reads_done) { - grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL); - } else { - call->read_tag = tag; - call->have_read = 1; - request_more = call->received_start; + if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) { + empty = + (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message = + grpc_bbq_pop(&call->incoming_queue))); + if (!empty) { + finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + empty = grpc_bbq_empty(&call->incoming_queue); } - } else if (prq_is_empty(&call->prq) && call->received_finish) { - finish_call(call); + } else { + empty = grpc_bbq_empty(&call->incoming_queue); } - gpr_mu_unlock(&call->read_mu); - - if (request_more) { - request_more_data(call); + switch (call->read_state) { + case READ_STATE_STREAM_CLOSED: + if (empty) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); + } + /* fallthrough */ + case READ_STATE_READ_CLOSED: + if (empty) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + } + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); + /* fallthrough */ + case READ_STATE_GOT_INITIAL_METADATA: + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); + /* fallthrough */ + case READ_STATE_INITIAL: + /* do nothing */ + break; } - - return GRPC_CALL_OK; } -grpc_call_error grpc_call_start_write_old(grpc_call *call, - grpc_byte_buffer *byte_buffer, - void *tag, gpr_uint32 flags) { - grpc_call_element *elem; - grpc_call_op op; - - switch (call->state) { - case CALL_CREATED: - case CALL_BOUNDCQ: - return GRPC_CALL_ERROR_NOT_INVOKED; - case CALL_STARTED: +static void early_out_write_ops(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_WRITE_CLOSED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK); + /* fallthrough */ + case WRITE_STATE_STARTED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_ERROR); + /* fallthrough */ + case WRITE_STATE_INITIAL: + /* do nothing */ break; - case CALL_FINISHED: - return GRPC_CALL_ERROR_ALREADY_FINISHED; - } - - if (call->have_write) { - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } +} - grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); +static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, + size_t nreqs, + grpc_ioreq_completion_func completion, + void *user_data) { + size_t i; + gpr_uint32 have_ops = 0; + grpc_ioreq_op op; + reqinfo_master *master; + grpc_ioreq_data data; + gpr_uint8 set; - /* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a - flush, and that flush should be propogated down from here */ - if (byte_buffer == NULL) { - grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, GRPC_OP_OK); + if (nreqs == 0) { return GRPC_CALL_OK; } - call->write_tag = tag; - call->have_write = 1; + set = reqs[0].op; - gpr_mu_lock(&call->read_mu); - if (!call->received_start) { - call->pending_write = grpc_byte_buffer_copy(byte_buffer); - call->pending_write_flags = flags; - - gpr_mu_unlock(&call->read_mu); - } else { - gpr_mu_unlock(&call->read_mu); - - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = done_write; - op.user_data = call; - op.data.message = byte_buffer; + for (i = 0; i < nreqs; i++) { + op = reqs[i].op; + if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) { + return start_ioreq_error(call, have_ops, + GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); + } else if (call->request_set[op] == REQSET_DONE) { + return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED); + } + have_ops |= 1u << op; + data = reqs[i].data; - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + call->request_data[op] = data; + call->request_set[op] = set; } - return GRPC_CALL_OK; -} + master = &call->masters[set]; + master->status = GRPC_OP_OK; + master->need_mask = have_ops; + master->complete_mask = 0; + master->on_complete = completion; + master->user_data = user_data; -grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) { - grpc_call_element *elem; - grpc_call_op op; - - if (!call->is_client) { - return GRPC_CALL_ERROR_NOT_ON_SERVER; + if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) { + call->need_more_data = 1; } - switch (call->state) { - case CALL_CREATED: - case CALL_BOUNDCQ: - return GRPC_CALL_ERROR_NOT_INVOKED; - case CALL_FINISHED: - return GRPC_CALL_ERROR_ALREADY_FINISHED; - case CALL_STARTED: - break; - } + finish_read_ops(call); + early_out_write_ops(call); - if (call->have_write) { - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; - } - - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - call->write_tag = tag; - call->have_write = 1; - - gpr_mu_lock(&call->read_mu); - if (!call->received_start) { - call->pending_writes_done = 1; - - gpr_mu_unlock(&call->read_mu); - } else { - gpr_mu_unlock(&call->read_mu); + return GRPC_CALL_OK; +} - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = done_writes_done; - op.user_data = call; +grpc_call_error grpc_call_start_ioreq_and_call_back( + grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, + grpc_ioreq_completion_func on_complete, void *user_data) { + grpc_call_error err; + lock(call); + err = start_ioreq(call, reqs, nreqs, on_complete, user_data); + unlock(call); + return err; +} - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); +void grpc_call_destroy(grpc_call *c) { + int cancel; + lock(c); + if (c->have_alarm) { + grpc_alarm_cancel(&c->alarm); + c->have_alarm = 0; } - - return GRPC_CALL_OK; + cancel = c->read_state != READ_STATE_STREAM_CLOSED; + unlock(c); + if (cancel) grpc_call_cancel(c); + grpc_call_internal_unref(c, 1); } -grpc_call_error grpc_call_start_write_status_old(grpc_call *call, - grpc_status_code status, - const char *details, - void *tag) { +grpc_call_error grpc_call_cancel(grpc_call *c) { grpc_call_element *elem; grpc_call_op op; - if (call->is_client) { - return GRPC_CALL_ERROR_NOT_ON_CLIENT; - } + op.type = GRPC_CANCEL_OP; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = do_nothing; + op.user_data = NULL; - switch (call->state) { - case CALL_CREATED: - case CALL_BOUNDCQ: - return GRPC_CALL_ERROR_NOT_INVOKED; - case CALL_FINISHED: - return GRPC_CALL_ERROR_ALREADY_FINISHED; - case CALL_STARTED: - break; - } + elem = CALL_ELEM_FROM_CALL(c, 0); + elem->filter->call_op(elem, NULL, &op); - if (call->have_write) { - return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; - } + return GRPC_CALL_OK; +} +grpc_call_error grpc_call_cancel_with_status(grpc_call *c, + grpc_status_code status, + const char *description) { + grpc_mdstr *details = + description ? grpc_mdstr_from_string(c->metadata_context, description) + : NULL; + lock(c); + set_status_code(c, STATUS_FROM_API_OVERRIDE, status); + set_status_details(c, STATUS_FROM_API_OVERRIDE, details); + unlock(c); + return grpc_call_cancel(c); +} + +void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { + grpc_call_element *elem; + GPR_ASSERT(op->dir == GRPC_CALL_DOWN); elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, op); +} - if (details && details[0]) { - grpc_mdelem *md = grpc_mdelem_from_strings(call->metadata_context, - "grpc-message", details); +grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { + return CALL_FROM_TOP_ELEM(elem); +} - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.data.metadata = md; - elem->filter->call_op(elem, NULL, &op); +static void call_alarm(void *arg, int success) { + grpc_call *call = arg; + if (success) { + if (call->is_client) { + grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, + "Deadline Exceeded"); + } else { + grpc_call_cancel(call); + } } + grpc_call_internal_unref(call, 1); +} - /* always send status */ - { - grpc_mdelem *md; - char buffer[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(status, buffer); - md = - grpc_mdelem_from_strings(call->metadata_context, "grpc-status", buffer); +void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.data.metadata = md; - elem->filter->call_op(elem, NULL, &op); + if (call->have_alarm) { + gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); } + grpc_call_internal_ref(call); + call->have_alarm = 1; + grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); +} - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - call->state = CALL_FINISHED; - call->write_tag = tag; - call->have_write = 1; - - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = done_writes_done; - op.user_data = call; +static void set_read_state(grpc_call *call, read_state state) { + lock(call); + GPR_ASSERT(call->read_state < state); + call->read_state = state; + finish_read_ops(call); + unlock(call); +} - elem->filter->call_op(elem, NULL, &op); +void grpc_call_read_closed(grpc_call_element *elem) { + set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); +} - return GRPC_CALL_OK; +void grpc_call_stream_closed(grpc_call_element *elem) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + set_read_state(call, READ_STATE_STREAM_CLOSED); + grpc_call_internal_unref(call, 0); } /* we offset status by a small amount when storing it into transport metadata @@ -865,7 +860,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -878,112 +873,477 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { +void grpc_call_recv_message(grpc_call_element *elem, + grpc_byte_buffer *byte_buffer) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + grpc_bbq_push(&call->incoming_queue, byte_buffer); + finish_read_ops(call); + unlock(call); +} + +void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdelem *md = op->data.metadata; grpc_mdstr *key = md->key; + grpc_metadata_array *dest; + grpc_metadata *mdusr; + lock(call); if (key == grpc_channel_get_status_string(call->channel)) { - maybe_set_status_code(call, decode_status(md)); + set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); grpc_mdelem_unref(md); - op->done_cb(op->user_data, GRPC_OP_OK); } else if (key == grpc_channel_get_message_string(call->channel)) { - maybe_set_status_details(call, md->value); + set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); grpc_mdelem_unref(md); - op->done_cb(op->user_data, GRPC_OP_OK); } else { - grpc_metadata_buffer_queue(&call->incoming_metadata, op); + dest = &call->buffered_metadata[call->read_state >= + READ_STATE_GOT_INITIAL_METADATA]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = (char *)grpc_mdstr_as_c_string(md->key); + mdusr->value = (char *)grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + if (call->owned_metadata_count == call->owned_metadata_capacity) { + call->owned_metadata_capacity = GPR_MAX( + call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); + call->owned_metadata = + gpr_realloc(call->owned_metadata, + sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + } + call->owned_metadata[call->owned_metadata_count++] = md; } + unlock(call); } -void grpc_call_recv_finish(grpc_call_element *elem, int is_full_close) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); +grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { + return CALL_STACK_FROM_CALL(call); +} - gpr_mu_lock(&call->read_mu); +void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { + grpc_call *call = grpc_call_from_top_element(surface_element); + set_read_state(call, READ_STATE_GOT_INITIAL_METADATA); +} + +/* + * BATCH API IMPLEMENTATION + */ + +static void set_status_value_directly(grpc_status_code status, void *dest) { + *(grpc_status_code *)dest = status; +} - if (call->have_read) { - grpc_cq_end_read(call->cq, call->read_tag, call, do_nothing, NULL, NULL); - call->read_tag = INVALID_TAG; - call->have_read = 0; +static void set_cancelled_value(grpc_status_code status, void *dest) { + *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); +} + +static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) { + grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); +} + +grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, + size_t nops, void *tag) { + grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; + size_t in; + size_t out; + const grpc_op *op; + grpc_ioreq *req; + + /* rewrite batch ops into ioreq ops */ + for (in = 0, out = 0; in < nops; in++) { + op = &ops[in]; + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; + req->data.send_metadata.count = op->data.send_initial_metadata.count; + req->data.send_metadata.metadata = + op->data.send_initial_metadata.metadata; + break; + case GRPC_OP_SEND_MESSAGE: + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_MESSAGE; + req->data.send_message = op->data.send_message; + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_CLOSE; + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + if (call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_CLIENT; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; + req->data.send_metadata.count = + op->data.send_status_from_server.trailing_metadata_count; + req->data.send_metadata.metadata = + op->data.send_status_from_server.trailing_metadata; + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_STATUS; + req->data.send_status.code = op->data.send_status_from_server.status; + req->data.send_status.details = + op->data.send_status_from_server.status_details; + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_CLOSE; + break; + case GRPC_OP_RECV_INITIAL_METADATA: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req->data.recv_metadata = op->data.recv_initial_metadata; + break; + case GRPC_OP_RECV_MESSAGE: + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_MESSAGE; + req->data.recv_message = op->data.recv_message; + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS; + req->data.recv_status.set_value = set_status_value_directly; + req->data.recv_status.user_data = op->data.recv_status_on_client.status; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS_DETAILS; + req->data.recv_status_details.details = + op->data.recv_status_on_client.status_details; + req->data.recv_status_details.details_capacity = + op->data.recv_status_on_client.status_details_capacity; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; + req->data.recv_metadata = + op->data.recv_status_on_client.trailing_metadata; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_CLOSE; + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS; + req->data.recv_status.set_value = set_cancelled_value; + req->data.recv_status.user_data = + op->data.recv_close_on_server.cancelled; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_CLOSE; + break; + } } - if (call->is_client && !call->received_metadata && call->cq) { - size_t count; - grpc_metadata *elements; - call->received_metadata = 1; + grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); + + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + tag); +} + +/* + * LEGACY API IMPLEMENTATION + * All this code will disappear as soon as wrappings are updated + */ + +struct legacy_state { + gpr_uint8 md_out_buffer; + size_t md_out_count[2]; + size_t md_out_capacity[2]; + grpc_metadata *md_out[2]; + grpc_byte_buffer *msg_out; + + /* input buffers */ + grpc_metadata_array initial_md_in; + grpc_metadata_array trailing_md_in; - count = grpc_metadata_buffer_count(&call->incoming_metadata); - elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata); - grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call, - grpc_metadata_buffer_cleanup_elements, - elements, count, elements); + size_t details_capacity; + char *details; + grpc_status_code status; + + size_t msg_in_read_idx; + grpc_byte_buffer *msg_in; + + void *finished_tag; +}; + +static legacy_state *get_legacy_state(grpc_call *call) { + if (call->legacy_state == NULL) { + call->legacy_state = gpr_malloc(sizeof(legacy_state)); + memset(call->legacy_state, 0, sizeof(legacy_state)); } - if (is_full_close) { - if (call->have_alarm) { - grpc_alarm_cancel(&call->alarm); - call->have_alarm = 0; - } - call->received_finish = 1; - if (prq_is_empty(&call->prq) && call->cq != NULL) { - finish_call(call); + return call->legacy_state; +} + +static void destroy_legacy_state(legacy_state *ls) { + size_t i, j; + for (i = 0; i < 2; i++) { + for (j = 0; j < ls->md_out_count[i]; j++) { + gpr_free(ls->md_out[i][j].key); + gpr_free(ls->md_out[i][j].value); } - } else { - call->reads_done = 1; + gpr_free(ls->md_out[i]); } - gpr_mu_unlock(&call->read_mu); + gpr_free(ls->initial_md_in.metadata); + gpr_free(ls->trailing_md_in.metadata); + gpr_free(ls); } -void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *message, - void (*on_finish)(void *user_data, - grpc_op_error error), - void *user_data) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); +grpc_call_error grpc_call_add_metadata_old(grpc_call *call, + grpc_metadata *metadata, + gpr_uint32 flags) { + legacy_state *ls; + grpc_metadata *mdout; + + lock(call); + ls = get_legacy_state(call); + + if (ls->md_out_count[ls->md_out_buffer] == + ls->md_out_capacity[ls->md_out_buffer]) { + ls->md_out_capacity[ls->md_out_buffer] = + GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, + ls->md_out_capacity[ls->md_out_buffer] + 8); + ls->md_out[ls->md_out_buffer] = gpr_realloc( + ls->md_out[ls->md_out_buffer], + sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]); + } + mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++]; + mdout->key = gpr_strdup(metadata->key); + mdout->value = gpr_malloc(metadata->value_length); + mdout->value_length = metadata->value_length; + memcpy(mdout->value, metadata->value, metadata->value_length); + + unlock(call); + + return GRPC_CALL_OK; +} + +static void finish_status(grpc_call *call, grpc_op_error status, + void *ignored) { + legacy_state *ls; + + lock(call); + ls = get_legacy_state(call); + grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, + ls->status, ls->details, ls->trailing_md_in.metadata, + ls->trailing_md_in.count); + unlock(call); +} + +static void finish_recv_metadata(grpc_call *call, grpc_op_error status, + void *tag) { + legacy_state *ls; + + lock(call); + ls = get_legacy_state(call); + if (status == GRPC_OP_OK) { + grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, + ls->initial_md_in.count, + ls->initial_md_in.metadata); - gpr_mu_lock(&call->read_mu); - if (call->have_read) { - grpc_cq_end_read(call->cq, call->read_tag, call, on_finish, user_data, - message); - call->read_tag = INVALID_TAG; - call->have_read = 0; } else { - prq_push(&call->prq, message, on_finish, user_data); + grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, + NULL); } - gpr_mu_unlock(&call->read_mu); + unlock(call); } -grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { - return CALL_FROM_TOP_ELEM(elem); +static void finish_send_metadata(grpc_call *call, grpc_op_error status, + void *tag) {} + +grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, + void *metadata_read_tag, + void *finished_tag, gpr_uint32 flags) { + grpc_ioreq reqs[4]; + legacy_state *ls; + grpc_call_error err; + + grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); + grpc_cq_begin_op(cq, call, GRPC_FINISHED); + + lock(call); + ls = get_legacy_state(call); + err = bind_cq(call, cq); + if (err != GRPC_CALL_OK) goto done; + + ls->finished_tag = finished_tag; + + reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA; + reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; + reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; + ls->md_out_buffer++; + err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL); + if (err != GRPC_CALL_OK) goto done; + + reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA; + reqs[0].data.recv_metadata = &ls->initial_md_in; + err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag); + if (err != GRPC_CALL_OK) goto done; + + reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; + reqs[0].data.recv_metadata = &ls->trailing_md_in; + reqs[1].op = GRPC_IOREQ_RECV_STATUS; + reqs[1].data.recv_status.user_data = &ls->status; + reqs[1].data.recv_status.set_value = set_status_value_directly; + reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS; + reqs[2].data.recv_status_details.details = &ls->details; + reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity; + reqs[3].op = GRPC_IOREQ_RECV_CLOSE; + err = start_ioreq(call, reqs, 4, finish_status, NULL); + if (err != GRPC_CALL_OK) goto done; + +done: + unlock(call); + return err; } -grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) { - return &call->incoming_metadata; +grpc_call_error grpc_call_server_accept_old(grpc_call *call, + grpc_completion_queue *cq, + void *finished_tag) { + grpc_ioreq reqs[2]; + grpc_call_error err; + legacy_state *ls; + + /* inform the completion queue of an incoming operation (corresponding to + finished_tag) */ + grpc_cq_begin_op(cq, call, GRPC_FINISHED); + + lock(call); + ls = get_legacy_state(call); + + err = bind_cq(call, cq); + if (err != GRPC_CALL_OK) return err; + + ls->finished_tag = finished_tag; + + reqs[0].op = GRPC_IOREQ_RECV_STATUS; + reqs[0].data.recv_status.user_data = &ls->status; + reqs[0].data.recv_status.set_value = set_status_value_directly; + reqs[1].op = GRPC_IOREQ_RECV_CLOSE; + err = start_ioreq(call, reqs, 2, finish_status, NULL); + unlock(call); + return err; } -static void call_alarm(void *arg, int success) { - grpc_call *call = arg; - if (success) { - if (call->is_client) { - grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, - "Deadline Exceeded"); - } else { - grpc_call_cancel(call); - } - } - grpc_call_internal_unref(call); +static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status, + void *tag) {} + +grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call, + gpr_uint32 flags) { + grpc_ioreq req; + grpc_call_error err; + legacy_state *ls; + + lock(call); + ls = get_legacy_state(call); + req.op = GRPC_IOREQ_SEND_INITIAL_METADATA; + req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; + req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; + err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL); + unlock(call); + + return err; } -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); +static void finish_read_event(void *p, grpc_op_error error) { + if (p) grpc_byte_buffer_destroy(p); +} - if (call->have_alarm) { - gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); - } - grpc_call_internal_ref(call); - call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); +static void finish_read(grpc_call *call, grpc_op_error error, void *tag) { + legacy_state *ls; + grpc_byte_buffer *msg; + lock(call); + ls = get_legacy_state(call); + msg = ls->msg_in; + grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg); + unlock(call); } -grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { - return CALL_STACK_FROM_CALL(call); +grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) { + legacy_state *ls; + grpc_ioreq req; + grpc_call_error err; + + grpc_cq_begin_op(call->cq, call, GRPC_READ); + + lock(call); + ls = get_legacy_state(call); + req.op = GRPC_IOREQ_RECV_MESSAGE; + req.data.recv_message = &ls->msg_in; + err = start_ioreq(call, &req, 1, finish_read, tag); + unlock(call); + return err; +} + +static void finish_write(grpc_call *call, grpc_op_error status, void *tag) { + lock(call); + grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out); + unlock(call); + grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status); } +grpc_call_error grpc_call_start_write_old(grpc_call *call, + grpc_byte_buffer *byte_buffer, + void *tag, gpr_uint32 flags) { + grpc_ioreq req; + legacy_state *ls; + grpc_call_error err; + + grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); + + lock(call); + ls = get_legacy_state(call); + ls->msg_out = grpc_byte_buffer_copy(byte_buffer); + req.op = GRPC_IOREQ_SEND_MESSAGE; + req.data.send_message = ls->msg_out; + err = start_ioreq(call, &req, 1, finish_write, tag); + unlock(call); + + return err; +} + +static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) { + grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status); +} + +grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) { + grpc_ioreq req; + grpc_call_error err; + grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); + + lock(call); + req.op = GRPC_IOREQ_SEND_CLOSE; + err = start_ioreq(call, &req, 1, finish_finish, tag); + unlock(call); + + return err; +} + +grpc_call_error grpc_call_start_write_status_old(grpc_call *call, + grpc_status_code status, + const char *details, + void *tag) { + grpc_ioreq reqs[3]; + grpc_call_error err; + legacy_state *ls; + grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); + + lock(call); + ls = get_legacy_state(call); + reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA; + reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; + reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; + reqs[1].op = GRPC_IOREQ_SEND_STATUS; + reqs[1].data.send_status.code = status; + /* MEMLEAK */ + reqs[1].data.send_status.details = gpr_strdup(details); + reqs[2].op = GRPC_IOREQ_SEND_CLOSE; + err = start_ioreq(call, reqs, 3, finish_finish, tag); + unlock(call); + + return err; +} diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 804b387cb1..05014c631c 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -38,28 +38,77 @@ #include "src/core/channel/metadata_buffer.h" #include <grpc/grpc.h> -grpc_call *grpc_call_create(grpc_channel *channel, +/* Primitive operation types - grpc_op's get rewritten into these */ +typedef enum { + GRPC_IOREQ_RECV_INITIAL_METADATA, + GRPC_IOREQ_RECV_MESSAGE, + GRPC_IOREQ_RECV_TRAILING_METADATA, + GRPC_IOREQ_RECV_STATUS, + GRPC_IOREQ_RECV_STATUS_DETAILS, + GRPC_IOREQ_RECV_CLOSE, + GRPC_IOREQ_SEND_INITIAL_METADATA, + GRPC_IOREQ_SEND_MESSAGE, + GRPC_IOREQ_SEND_TRAILING_METADATA, + GRPC_IOREQ_SEND_STATUS, + GRPC_IOREQ_SEND_CLOSE, + GRPC_IOREQ_OP_COUNT +} grpc_ioreq_op; + +typedef union { + grpc_metadata_array *recv_metadata; + grpc_byte_buffer **recv_message; + struct { + void (*set_value)(grpc_status_code status, void *user_data); + void *user_data; + } recv_status; + struct { + char **details; + size_t *details_capacity; + } recv_status_details; + struct { + size_t count; + const grpc_metadata *metadata; + } send_metadata; + grpc_byte_buffer *send_message; + struct { + grpc_status_code code; + const char *details; + } send_status; +} grpc_ioreq_data; + +typedef struct { + grpc_ioreq_op op; + grpc_ioreq_data data; +} grpc_ioreq; + +typedef void (*grpc_ioreq_completion_func)(grpc_call *call, + grpc_op_error status, + void *user_data); + +grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data); +void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); + void grpc_call_internal_ref(grpc_call *call); -void grpc_call_internal_unref(grpc_call *call); +void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); /* Helpers for grpc_client, grpc_server filters to publish received data to the completion queue/surface layer */ void grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_call_op *op); -void grpc_call_recv_message( - grpc_call_element *surface_element, grpc_byte_buffer *message, - void (*on_finish)(void *user_data, grpc_op_error error), void *user_data); -void grpc_call_recv_finish(grpc_call_element *surface_element, - int is_full_close); + grpc_mdelem *md); +void grpc_call_recv_message(grpc_call_element *surface_element, + grpc_byte_buffer *message); +void grpc_call_read_closed(grpc_call_element *surface_element); +void grpc_call_stream_closed(grpc_call_element *surface_element); void grpc_call_execute_op(grpc_call *call, grpc_call_op *op); +grpc_call_error grpc_call_start_ioreq_and_call_back( + grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, + grpc_ioreq_completion_func on_complete, void *user_data); -/* Called when it's known that the initial batch of metadata is complete on the - client side (must not be called on the server) */ -void grpc_call_client_initial_metadata_complete( - grpc_call_element *surface_element); +/* Called when it's known that the initial batch of metadata is complete */ +void grpc_call_initial_metadata_complete(grpc_call_element *surface_element); void grpc_call_set_deadline(grpc_call_element *surface_element, gpr_timespec deadline); @@ -69,10 +118,4 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); -/* Get the metadata buffer. */ -grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call); - -void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem, - gpr_uint32 flags); - #endif /* __GRPC_INTERNAL_SURFACE_CALL_H__ */ diff --git a/src/core/surface/call_details.c b/src/core/surface/call_details.c new file mode 100644 index 0000000000..51c05da640 --- /dev/null +++ b/src/core/surface/call_details.c @@ -0,0 +1,13 @@ +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +#include <string.h> + +void grpc_call_details_init(grpc_call_details *cd) { + memset(cd, 0, sizeof(*cd)); +} + +void grpc_call_details_destroy(grpc_call_details *cd) { + gpr_free(cd->method); + gpr_free(cd->host); +} diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 93a2c0609d..6d47787b7c 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -51,7 +51,7 @@ struct grpc_channel { grpc_mdstr *authority_string; }; -#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, @@ -74,37 +74,44 @@ grpc_channel *grpc_channel_create_from_filters( static void do_nothing(void *ignored, grpc_op_error error) {} -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec absolute_deadline) { +grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_completion_queue *cq, + const char *method, const char *host, + gpr_timespec absolute_deadline) { grpc_call *call; grpc_mdelem *path_mdelem; grpc_mdelem *authority_mdelem; + grpc_call_op op; if (!channel->is_client) { gpr_log(GPR_ERROR, "Cannot create a call on the server."); return NULL; } - call = grpc_call_create(channel, NULL); + call = grpc_call_create(channel, cq, NULL); /* Add :path and :authority headers. */ /* TODO(klempner): Consider optimizing this by stashing mdelems for common values of method and host. */ - grpc_mdstr_ref(channel->path_string); path_mdelem = grpc_mdelem_from_metadata_strings( - channel->metadata_context, channel->path_string, + channel->metadata_context, grpc_mdstr_ref(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method)); - grpc_call_add_mdelem(call, path_mdelem, 0); + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.data.metadata = path_mdelem; + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); grpc_mdstr_ref(channel->authority_string); authority_mdelem = grpc_mdelem_from_metadata_strings( channel->metadata_context, channel->authority_string, grpc_mdstr_from_string(channel->metadata_context, host)); - grpc_call_add_mdelem(call, authority_mdelem, 0); + op.data.metadata = authority_mdelem; + grpc_call_execute_op(call, &op); if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) { - grpc_call_op op; op.type = GRPC_SEND_DEADLINE; op.dir = GRPC_CALL_DOWN; op.flags = 0; @@ -117,6 +124,13 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, return call; } +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, + const char *method, const char *host, + gpr_timespec absolute_deadline) { + return grpc_channel_create_call(channel, NULL, method, host, + absolute_deadline); +} + void grpc_channel_internal_ref(grpc_channel *channel) { gpr_ref(&channel->refs); } diff --git a/src/core/surface/client.c b/src/core/surface/client.c index a7c9b902ed..fa63e855cc 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -56,23 +56,23 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_next_op(elem, op); break; case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op); + grpc_call_recv_metadata(elem, op->data.metadata); break; case GRPC_RECV_DEADLINE: gpr_log(GPR_ERROR, "Deadline received by client (ignored)"); break; case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message, op->done_cb, - op->user_data); + grpc_call_recv_message(elem, op->data.message); + op->done_cb(op->user_data, GRPC_OP_OK); break; case GRPC_RECV_HALF_CLOSE: - grpc_call_recv_finish(elem, 0); + grpc_call_read_closed(elem); break; case GRPC_RECV_FINISH: - grpc_call_recv_finish(elem, 1); + grpc_call_stream_closed(elem); break; case GRPC_RECV_END_OF_INITIAL_METADATA: - grpc_call_client_initial_metadata_complete(elem); + grpc_call_initial_metadata_complete(elem); break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 2bf31c50a8..8b94aa920a 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -173,18 +173,6 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } -void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data); - ev->base.data.invoke_accepted = error; - end_op_locked(cc, GRPC_INVOKE_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, @@ -197,6 +185,28 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } +void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, + grpc_call *call, grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error) { + event *ev; + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); + ev->base.data.write_accepted = error; + end_op_locked(cc, GRPC_OP_COMPLETE); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +} + +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_op_error error) { + event *ev; + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); + ev->base.data.write_accepted = error; + end_op_locked(cc, GRPC_OP_COMPLETE); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +} + void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, @@ -389,7 +399,7 @@ void grpc_event_finish(grpc_event *base) { event *ev = (event *)base; ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK); if (ev->base.call) { - grpc_call_internal_unref(ev->base.call); + grpc_call_internal_unref(ev->base.call, 1); } gpr_free(ev); } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 85984075f7..205cb76cee 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -78,6 +78,10 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error); +/* Queue a GRPC_OP_COMPLETED operation */ +void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, + grpc_call *call, grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error); /* Queue a GRPC_CLIENT_METADATA_READ operation */ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -97,6 +101,10 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements); +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_op_error error); + void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag); /* disable polling for some tests */ diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 8975d312ee..ab9435351e 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -87,10 +87,10 @@ char *grpc_event_string(grpc_event *ev) { gpr_strvec_add(&buf, gpr_strdup(" end-of-stream")); } break; - case GRPC_INVOKE_ACCEPTED: - gpr_strvec_add(&buf, gpr_strdup("INVOKE_ACCEPTED: ")); + case GRPC_OP_COMPLETE: + gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: ")); addhdr(&buf, ev); - adderr(&buf, ev->data.invoke_accepted); + adderr(&buf, ev->data.op_complete); break; case GRPC_WRITE_ACCEPTED: gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: ")); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 6098ac78de..2f5eff5584 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -50,26 +50,16 @@ typedef struct { grpc_mdelem *message; } channel_data; -static void do_nothing(void *data, grpc_op_error error) {} - static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { channel_data *channeld = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_START: { - grpc_call_op set_status_op; - grpc_mdelem_ref(channeld->message); - memset(&set_status_op, 0, sizeof(grpc_call_op)); - set_status_op.dir = GRPC_CALL_UP; - set_status_op.type = GRPC_RECV_METADATA; - set_status_op.done_cb = do_nothing; - set_status_op.data.metadata = channeld->message; - grpc_call_recv_metadata(elem, &set_status_op); - grpc_call_recv_finish(elem, 1); + case GRPC_SEND_START: + grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); + grpc_call_stream_closed(elem); break; - } case GRPC_SEND_METADATA: grpc_mdelem_unref(op->data.metadata); break; diff --git a/src/core/surface/metadata_array.c b/src/core/surface/metadata_array.c new file mode 100644 index 0000000000..257ff1f820 --- /dev/null +++ b/src/core/surface/metadata_array.c @@ -0,0 +1,17 @@ +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +#include <string.h> + +void grpc_metadata_array_init(grpc_metadata_array *array) { + memset(array, 0, sizeof(*array)); +} + +void grpc_metadata_array_destroy(grpc_metadata_array *array) { + size_t i; + for (i = 0; i < array->count; i++) { + gpr_free(array->metadata[i].key); + gpr_free(array->metadata[i].value); + } + gpr_free(array->metadata); +} diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 9e2e4d5478..d7e1dcd800 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -44,6 +44,7 @@ #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" +#include "src/core/transport/metadata.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -63,11 +64,27 @@ typedef struct channel_data channel_data; struct channel_data { grpc_server *server; grpc_channel *channel; + grpc_mdstr *path_key; + grpc_mdstr *authority_key; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; }; +typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, + grpc_metadata_array *initial_metadata, + call_data *calld, void *user_data); + +typedef struct { + void *user_data; + grpc_completion_queue *cq; + grpc_call **call; + grpc_call_details *details; + grpc_metadata_array *initial_metadata; + new_call_cb cb; +} requested_call; + struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; @@ -76,9 +93,9 @@ struct grpc_server { gpr_mu mu; - void **tags; - size_t ntags; - size_t tag_cap; + requested_call *requested_calls; + size_t requested_call_count; + size_t requested_call_capacity; gpr_uint8 shutdown; gpr_uint8 have_shutdown_tag; @@ -107,11 +124,20 @@ typedef enum { ZOMBIED } call_state; +typedef struct legacy_data { + grpc_metadata_array *initial_metadata; +} legacy_data; + struct call_data { grpc_call *call; call_state state; gpr_timespec deadline; + grpc_mdstr *path; + grpc_mdstr *host; + + legacy_data *legacy; + grpc_call_details *details; gpr_uint8 included[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; @@ -179,7 +205,7 @@ static void server_unref(grpc_server *server) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); gpr_free(server->channel_filters); - gpr_free(server->tags); + gpr_free(server->requested_calls); gpr_free(server); } } @@ -210,62 +236,38 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(finish_destroy_channel, chand); } -static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) { - grpc_call *call = calld->call; - grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call); - size_t count = grpc_metadata_buffer_count(mdbuf); - grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf); - const char *host = NULL; - const char *method = NULL; - size_t i; - - for (i = 0; i < count; i++) { - if (0 == strcmp(elements[i].key, ":authority")) { - host = elements[i].value; - } else if (0 == strcmp(elements[i].key, ":path")) { - method = elements[i].value; - } - } - - grpc_call_internal_ref(call); - grpc_cq_end_new_rpc(server->cq, tag, call, - grpc_metadata_buffer_cleanup_elements, elements, method, - host, calld->deadline, count, elements); -} - static void start_new_rpc(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_server *server = chand->server; gpr_mu_lock(&server->mu); - if (server->ntags) { + if (server->requested_call_count > 0) { + requested_call rc = server->requested_calls[--server->requested_call_count]; calld->state = ACTIVATED; - queue_new_rpc(server, calld, server->tags[--server->ntags]); + gpr_mu_unlock(&server->mu); + rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld, + rc.user_data); } else { calld->state = PENDING; call_list_join(server, calld, PENDING_START); + gpr_mu_unlock(&server->mu); } - gpr_mu_unlock(&server->mu); } static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } -static void finish_rpc(grpc_call_element *elem, int is_full_close) { +static void stream_closed(grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->server->mu); switch (calld->state) { case ACTIVATED: - grpc_call_recv_finish(elem, is_full_close); + grpc_call_stream_closed(elem); break; case PENDING: - if (!is_full_close) { - grpc_call_recv_finish(elem, is_full_close); - break; - } call_list_remove(chand->server, calld, PENDING_START); /* fallthrough intended */ case NOT_STARTED: @@ -278,25 +280,57 @@ static void finish_rpc(grpc_call_element *elem, int is_full_close) { gpr_mu_unlock(&chand->server->mu); } +static void read_closed(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->server->mu); + switch (calld->state) { + case ACTIVATED: + case PENDING: + grpc_call_read_closed(elem); + break; + case NOT_STARTED: + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + break; + case ZOMBIED: + break; + } + gpr_mu_unlock(&chand->server->mu); +} + static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, grpc_call_op *op) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + grpc_mdelem *md; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op); + md = op->data.metadata; + if (md->key == chand->path_key) { + calld->path = grpc_mdstr_ref(md->value); + grpc_mdelem_unref(md); + } else if (md->key == chand->authority_key) { + calld->host = grpc_mdstr_ref(md->value); + grpc_mdelem_unref(md); + } else { + grpc_call_recv_metadata(elem, md); + } break; case GRPC_RECV_END_OF_INITIAL_METADATA: start_new_rpc(elem); + grpc_call_initial_metadata_complete(elem); break; case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message, op->done_cb, - op->user_data); + grpc_call_recv_message(elem, op->data.message); + op->done_cb(op->user_data, GRPC_OP_OK); break; case GRPC_RECV_HALF_CLOSE: - finish_rpc(elem, 0); + read_closed(elem); break; case GRPC_RECV_FINISH: - finish_rpc(elem, 1); + stream_closed(elem); break; case GRPC_RECV_DEADLINE: grpc_call_set_deadline(elem, op->data.deadline); @@ -316,7 +350,7 @@ static void channel_op(grpc_channel_element *elem, switch (op->type) { case GRPC_ACCEPT_CALL: /* create a call */ - grpc_call_create(chand->channel, + grpc_call_create(chand->channel, NULL, op->data.accept_call.transport_server_data); break; case GRPC_TRANSPORT_CLOSED: @@ -371,6 +405,7 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; int i; gpr_mu_lock(&chand->server->mu); @@ -383,6 +418,19 @@ static void destroy_call_elem(grpc_call_element *elem) { } gpr_mu_unlock(&chand->server->mu); + if (calld->host) { + grpc_mdstr_unref(calld->host); + } + if (calld->path) { + grpc_mdstr_unref(calld->path); + } + + if (calld->legacy) { + gpr_free(calld->legacy->initial_metadata->metadata); + gpr_free(calld->legacy->initial_metadata); + gpr_free(calld->legacy); + } + server_unref(chand->server); } @@ -395,6 +443,8 @@ static void init_channel_elem(grpc_channel_element *elem, GPR_ASSERT(!is_last); chand->server = NULL; chand->channel = NULL; + chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); + chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->next = chand->prev = chand; } @@ -406,6 +456,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { chand->prev->next = chand->next; chand->next = chand->prev = chand; gpr_mu_unlock(&chand->server->mu); + grpc_mdstr_unref(chand->path_key); + grpc_mdstr_unref(chand->authority_key); server_unref(chand->server); } } @@ -413,17 +465,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { static const grpc_channel_filter server_surface_filter = { call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "server", }; - -static void early_terminate_requested_calls(grpc_completion_queue *cq, - void **tags, size_t ntags) { - size_t i; - - for (i = 0; i < ntags; i++) { - grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL, - gpr_inf_past, 0, NULL); - } -} + init_channel_elem, destroy_channel_elem, "server", +}; grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, grpc_channel_filter **filters, @@ -517,8 +560,8 @@ grpc_transport_setup_result grpc_server_setup_transport( void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, void *shutdown_tag) { listener *l; - void **tags; - size_t ntags; + requested_call *requested_calls; + size_t requested_call_count; channel_data **channels; channel_data *c; size_t nchannels; @@ -547,10 +590,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, i++; } - tags = server->tags; - ntags = server->ntags; - server->tags = NULL; - server->ntags = 0; + requested_calls = server->requested_calls; + requested_call_count = server->requested_call_count; + server->requested_calls = NULL; + server->requested_call_count = 0; server->shutdown = 1; server->have_shutdown_tag = have_shutdown_tag; @@ -579,8 +622,13 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, gpr_free(channels); /* terminate all the requested calls */ - early_terminate_requested_calls(server->cq, tags, ntags); - gpr_free(tags); + for (i = 0; i < requested_call_count; i++) { + requested_calls[i].cb(server, requested_calls[i].cq, + requested_calls[i].call, requested_calls[i].details, + requested_calls[i].initial_metadata, NULL, + requested_calls[i].user_data); + } + gpr_free(requested_calls); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -625,36 +673,158 @@ void grpc_server_add_listener(grpc_server *server, void *arg, server->listeners = l; } -grpc_call_error grpc_server_request_call_old(grpc_server *server, - void *tag_new) { +static grpc_call_error queue_call_request(grpc_server *server, + grpc_completion_queue *cq, + grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *initial_metadata, + new_call_cb cb, void *user_data) { call_data *calld; - - grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); - + requested_call *rc; gpr_mu_lock(&server->mu); - if (server->shutdown) { gpr_mu_unlock(&server->mu); - early_terminate_requested_calls(server->cq, &tag_new, 1); + cb(server, cq, call, details, initial_metadata, NULL, user_data); return GRPC_CALL_OK; } - calld = call_list_remove_head(server, PENDING_START); if (calld) { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; - queue_new_rpc(server, calld, tag_new); + gpr_mu_unlock(&server->mu); + cb(server, cq, call, details, initial_metadata, calld, user_data); + return GRPC_CALL_OK; } else { - if (server->tag_cap == server->ntags) { - server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1); - server->tags = - gpr_realloc(server->tags, sizeof(void *) * server->tag_cap); + if (server->requested_call_count == server->requested_call_capacity) { + server->requested_call_capacity = + GPR_MAX(server->requested_call_capacity + 8, + server->requested_call_capacity * 2); + server->requested_calls = + gpr_realloc(server->requested_calls, + sizeof(requested_call) * server->requested_call_capacity); } - server->tags[server->ntags++] = tag_new; + rc = &server->requested_calls[server->requested_call_count++]; + rc->cb = cb; + rc->cq = cq; + rc->call = call; + rc->details = details; + rc->user_data = user_data; + rc->initial_metadata = initial_metadata; + gpr_mu_unlock(&server->mu); + return GRPC_CALL_OK; } - gpr_mu_unlock(&server->mu); +} + +static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { + gpr_slice slice = value->slice; + size_t len = GPR_SLICE_LENGTH(slice); - return GRPC_CALL_OK; + if (len + 1 > *capacity) { + *capacity = GPR_MAX(len + 1, *capacity * 2); + *dest = gpr_realloc(*dest, *capacity); + } + memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1); +} + +static void publish_request(grpc_call *call, grpc_op_error status, void *tag) { + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; + + if (status == GRPC_OP_OK) { + cpstr(&calld->details->host, &calld->details->host_capacity, calld->host); + cpstr(&calld->details->method, &calld->details->method_capacity, + calld->path); + calld->details->deadline = calld->deadline; + grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, + GRPC_OP_OK); + } else { + abort(); + } +} + +static void begin_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, + grpc_metadata_array *initial_metadata, + call_data *calld, void *tag) { + grpc_ioreq req; + if (!calld) { + *call = NULL; + initial_metadata->count = 0; + grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); + return; + } + calld->details = details; + grpc_call_set_completion_queue(calld->call, cq); + *call = calld->call; + req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req.data.recv_metadata = initial_metadata; + grpc_call_internal_ref(calld->call); + grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, publish_request, + tag); +} + +grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq, void *tag) { + grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); + return queue_call_request(server, cq, call, details, initial_metadata, + begin_request, tag); +} + +static void publish_legacy_request(grpc_call *call, grpc_op_error status, + void *tag) { + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; + + if (status == GRPC_OP_OK) { + grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL, + grpc_mdstr_as_c_string(calld->path), + grpc_mdstr_as_c_string(calld->host), calld->deadline, + calld->legacy->initial_metadata->count, + calld->legacy->initial_metadata->metadata); + } else { + abort(); + } +} + +static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, + grpc_metadata_array *initial_metadata, + call_data *calld, void *tag) { + grpc_ioreq req; + GPR_ASSERT(call == NULL); + GPR_ASSERT(details == NULL); + if (!calld) { + gpr_free(initial_metadata); + grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL, + gpr_inf_past, 0, NULL); + return; + } + req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req.data.recv_metadata = initial_metadata; + calld->legacy = gpr_malloc(sizeof(legacy_data)); + memset(calld->legacy, 0, sizeof(legacy_data)); + calld->legacy->initial_metadata = initial_metadata; + grpc_call_internal_ref(calld->call); + grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, + publish_legacy_request, tag); +} + +grpc_call_error grpc_server_request_call_old(grpc_server *server, + void *tag_new) { + grpc_metadata_array *client_metadata = + gpr_malloc(sizeof(grpc_metadata_array)); + memset(client_metadata, 0, sizeof(*client_metadata)); + grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); + return queue_call_request(server, server->cq, NULL, NULL, client_metadata, + begin_legacy_request, tag_new); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index c4e3ca516d..2af18c3035 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -432,7 +432,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, framer_state *st) { - char timeout_str[32]; + char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str); hpack_enc(c, grpc_mdelem_from_metadata_strings( c->mdctx, grpc_mdstr_ref(c->timeout_key_str), diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 48a1005833..f560417617 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -957,7 +957,7 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, stream_list_join(t, s, WRITABLE); } } else { - grpc_stream_ops_unref_owned_objects(ops, ops_count); + grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count); } if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) { stream_list_join(t, s, PENDING_CALLBACKS); diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 5822e3015f..3f39364bda 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -102,6 +102,7 @@ Status Channel::StartBlockingRpc(const RpcMethod &method, grpc_call *call = grpc_channel_create_call_old( c_channel_, method.name(), target_.c_str(), context->RawDeadline()); context->set_call(call); + grpc_event *ev; void *finished_tag = reinterpret_cast<char *>(call); void *metadata_read_tag = reinterpret_cast<char *>(call) + 2; |