aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/channel_stack.c16
-rw-r--r--src/core/channel/connected_channel.c8
-rw-r--r--src/core/iomgr/pollset_posix.c4
-rw-r--r--src/core/surface/byte_buffer_queue.c79
-rw-r--r--src/core/surface/byte_buffer_queue.h59
-rw-r--r--src/core/surface/call.c1664
-rw-r--r--src/core/surface/call.h72
-rw-r--r--src/core/surface/channel.c18
-rw-r--r--src/core/surface/client.c12
-rw-r--r--src/core/surface/completion_queue.c25
-rw-r--r--src/core/surface/completion_queue.h4
-rw-r--r--src/core/surface/event_string.c6
-rw-r--r--src/core/surface/lame_client.c16
-rw-r--r--src/core/surface/server.c261
-rw-r--r--src/core/transport/chttp2/stream_encoder.c2
15 files changed, 1374 insertions, 872 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index e28bbd798d..d9e722c4f1 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -210,6 +210,7 @@ void grpc_call_element_recv_metadata(grpc_call_element *cur_elem,
metadata_op.dir = GRPC_CALL_UP;
metadata_op.done_cb = do_nothing;
metadata_op.user_data = NULL;
+ metadata_op.flags = 0;
metadata_op.data.metadata = mdelem;
grpc_call_next_op(cur_elem, &metadata_op);
}
@@ -221,6 +222,7 @@ void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
metadata_op.dir = GRPC_CALL_DOWN;
metadata_op.done_cb = do_nothing;
metadata_op.user_data = NULL;
+ metadata_op.flags = 0;
metadata_op.data.metadata = mdelem;
grpc_call_next_op(cur_elem, &metadata_op);
}
@@ -231,14 +233,16 @@ void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
cancel_op.dir = GRPC_CALL_DOWN;
cancel_op.done_cb = do_nothing;
cancel_op.user_data = NULL;
+ cancel_op.flags = 0;
grpc_call_next_op(cur_elem, &cancel_op);
}
void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
- grpc_call_op cancel_op;
- cancel_op.type = GRPC_SEND_FINISH;
- cancel_op.dir = GRPC_CALL_DOWN;
- cancel_op.done_cb = do_nothing;
- cancel_op.user_data = NULL;
- grpc_call_next_op(cur_elem, &cancel_op);
+ grpc_call_op finish_op;
+ finish_op.type = GRPC_SEND_FINISH;
+ finish_op.dir = GRPC_CALL_DOWN;
+ finish_op.done_cb = do_nothing;
+ finish_op.user_data = NULL;
+ finish_op.flags = 0;
+ grpc_call_next_op(cur_elem, &finish_op);
}
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index d35cede97b..61a6caf032 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -298,10 +298,6 @@ static void recv_error(channel_data *chand, call_data *calld, int line,
static void do_nothing(void *calldata, grpc_op_error error) {}
-static void done_message(void *user_data, grpc_op_error error) {
- grpc_byte_buffer_destroy(user_data);
-}
-
static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_element *elem = calld->elem;
grpc_call_op call_op;
@@ -309,9 +305,9 @@ static void finish_message(channel_data *chand, call_data *calld) {
call_op.flags = 0;
/* if we got all the bytes for this message, call up the stack */
call_op.type = GRPC_RECV_MESSAGE;
- call_op.done_cb = done_message;
+ call_op.done_cb = do_nothing;
/* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.user_data = call_op.data.message = grpc_byte_buffer_create(
+ call_op.data.message = grpc_byte_buffer_create(
calld->incoming_message.slices, calld->incoming_message.count);
gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 994dbe495d..b1c2c64a18 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -80,9 +80,7 @@ void grpc_pollset_kick(grpc_pollset *p) {
}
}
-void grpc_pollset_force_kick(grpc_pollset *p) {
- grpc_pollset_kick_kick(&p->kick_state);
-}
+void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick_kick(&p->kick_state); }
/* global state management */
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
new file mode 100644
index 0000000000..36c082f484
--- /dev/null
+++ b/src/core/surface/byte_buffer_queue.c
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/byte_buffer_queue.h"
+#include <grpc/support/alloc.h>
+
+static void bba_destroy(grpc_bbq_array *array) {
+ gpr_free(array->data);
+}
+
+/* Append an operation to an array, expanding as needed */
+static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) {
+ if (a->count == a->capacity) {
+ a->capacity *= 2;
+ a->data = gpr_realloc(a->data, sizeof(grpc_byte_buffer*) * a->capacity);
+ }
+ a->data[a->count++] = buffer;
+}
+
+void grpc_bbq_destroy(grpc_byte_buffer_queue *q) {
+ bba_destroy(&q->filling);
+ bba_destroy(&q->draining);
+}
+
+int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
+ return (q->drain_pos == q->draining.count && q->filling.count == 0);
+}
+
+void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+ bba_push(&q->filling, buffer);
+}
+
+grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
+ grpc_bbq_array temp_array;
+
+ if (q->drain_pos == q->draining.count) {
+ if (q->filling.count == 0) {
+ return NULL;
+ }
+ q->draining.count = 0;
+ q->drain_pos = 0;
+ /* swap arrays */
+ temp_array = q->filling;
+ q->filling = q->draining;
+ q->draining = temp_array;
+ }
+
+ return q->draining.data[q->drain_pos++];
+}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
new file mode 100644
index 0000000000..358a42d5af
--- /dev/null
+++ b/src/core/surface/byte_buffer_queue.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__
+#define __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__
+
+#include <grpc/byte_buffer.h>
+
+/* TODO(ctiller): inline an element or two into this struct to avoid per-call
+ allocations */
+typedef struct {
+ grpc_byte_buffer **data;
+ size_t count;
+ size_t capacity;
+} grpc_bbq_array;
+
+/* should be initialized by zeroing memory */
+typedef struct {
+ size_t drain_pos;
+ grpc_bbq_array filling;
+ grpc_bbq_array draining;
+} grpc_byte_buffer_queue;
+
+void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
+grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
+int grpc_bbq_empty(grpc_byte_buffer_queue *q);
+void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+
+#endif /* __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5a24264cce..da966c874a 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -36,6 +36,7 @@
#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/support/string.h"
+#include "src/core/surface/byte_buffer_queue.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
@@ -45,814 +46,763 @@
#include <stdlib.h>
#include <string.h>
-#define INVALID_TAG ((void *)0xdeadbeef)
+#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0)
-/* Pending read queue
+typedef struct legacy_state legacy_state;
+static void destroy_legacy_state(legacy_state *ls);
- This data structure tracks reads that need to be presented to the completion
- queue but are waiting for the application to ask for them. */
+typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
-#define INITIAL_PENDING_READ_COUNT 4
+typedef enum {
+ SEND_NOTHING,
+ SEND_INITIAL_METADATA,
+ SEND_MESSAGE,
+ SEND_TRAILING_METADATA_AND_FINISH,
+ SEND_FINISH
+} send_action;
typedef struct {
- grpc_byte_buffer *byte_buffer;
+ grpc_ioreq_completion_func on_complete;
void *user_data;
- void (*on_finish)(void *user_data, grpc_op_error error);
-} pending_read;
-
-/* TODO(ctiller): inline an element or two into this struct to avoid per-call
- allocations */
-typedef struct {
- pending_read *data;
- size_t count;
- size_t capacity;
-} pending_read_array;
+ grpc_op_error status;
+} completed_request;
-typedef struct {
- size_t drain_pos;
- pending_read_array filling;
- pending_read_array draining;
-} pending_read_queue;
+/* See reqinfo.set below for a description */
+#define REQSET_EMPTY 255
+#define REQSET_DONE 254
-static void pra_init(pending_read_array *array) {
- array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT);
- array->count = 0;
- array->capacity = INITIAL_PENDING_READ_COUNT;
-}
+/* The state of an ioreq - we keep one of these on the call for each
+ grpc_ioreq_op type.
-static void pra_destroy(pending_read_array *array,
- size_t finish_starting_from) {
- size_t i;
- for (i = finish_starting_from; i < array->count; i++) {
- array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR);
- }
- gpr_free(array->data);
-}
-
-/* Append an operation to an array, expanding as needed */
-static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer,
- void (*on_finish)(void *user_data, grpc_op_error error),
- void *user_data) {
- if (a->count == a->capacity) {
- a->capacity *= 2;
- a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity);
- }
- a->data[a->count].byte_buffer = buffer;
- a->data[a->count].user_data = user_data;
- a->data[a->count].on_finish = on_finish;
- a->count++;
-}
-
-static void prq_init(pending_read_queue *q) {
- q->drain_pos = 0;
- pra_init(&q->filling);
- pra_init(&q->draining);
-}
-
-static void prq_destroy(pending_read_queue *q) {
- pra_destroy(&q->filling, 0);
- pra_destroy(&q->draining, q->drain_pos);
-}
-
-static int prq_is_empty(pending_read_queue *q) {
- return (q->drain_pos == q->draining.count && q->filling.count == 0);
-}
-
-static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer,
- void (*on_finish)(void *user_data, grpc_op_error error),
- void *user_data) {
- pra_push(&q->filling, buffer, on_finish, user_data);
-}
-
-/* Take the first queue element and move it to the completion queue. Do nothing
- if q is empty */
-static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call,
- grpc_completion_queue *cq) {
- pending_read_array temp_array;
- pending_read *pr;
-
- if (q->drain_pos == q->draining.count) {
- if (q->filling.count == 0) {
- return 0;
- }
- q->draining.count = 0;
- q->drain_pos = 0;
- /* swap arrays */
- temp_array = q->filling;
- q->filling = q->draining;
- q->draining = temp_array;
- }
+ These structures are manipulated in sets, where a set is a set of
+ operations begin with the same call to start_ioreq and the various
+ public and private api's that call it. Each set has a master reqinfo
+ in which we set a few additional fields - see reqinfo_master. */
+typedef struct {
+ /* User supplied parameters */
+ grpc_ioreq_data data;
+ /* In which set is this ioreq?
+ This value could be:
+ - an element of grpc_ioreq_op enumeration, in which case
+ it designates the master ioreq in a set of requests
+ - REQSET_EMPTY, in which case this reqinfo type has no application
+ request against it
+ - REQSET_DONE, in which case this reqinfo has been satisfied for
+ all time for this call, and no further use will be made of it */
+ gpr_uint8 set;
+} reqinfo;
- pr = q->draining.data + q->drain_pos;
- q->drain_pos++;
- grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data,
- pr->byte_buffer);
- return 1;
-}
+typedef struct {
+ /* Overall status of the operation: starts OK, may degrade to
+ non-OK */
+ grpc_op_error status;
+ /* Completion function to call at the end of the operation */
+ grpc_ioreq_completion_func on_complete;
+ void *user_data;
+ /* a bit mask of which request ops are needed (1 << opid) */
+ gpr_uint32 need_mask;
+ /* a bit mask of which request ops are now completed */
+ gpr_uint32 complete_mask;
+} reqinfo_master;
+
+/* Status data for a request can come from several sources; this
+ enumerates them all, and acts as a priority sorting for which
+ status to return to the application - earlier entries override
+ later ones */
+typedef enum {
+ /* Status came from the application layer overriding whatever
+ the wire says */
+ STATUS_FROM_API_OVERRIDE = 0,
+ /* Status came from 'the wire' - or somewhere below the surface
+ layer */
+ STATUS_FROM_WIRE,
+ STATUS_SOURCE_COUNT
+} status_source;
-/* grpc_call proper */
+typedef struct {
+ gpr_uint8 is_set;
+ grpc_status_code code;
+ grpc_mdstr *details;
+} received_status;
-/* the state of a call, based upon which functions have been called against
- said call */
+/* How far through the GRPC stream have we read? */
typedef enum {
- CALL_CREATED,
- CALL_BOUNDCQ,
- CALL_STARTED,
- CALL_FINISHED
-} call_state;
+ /* We are still waiting for initial metadata to complete */
+ READ_STATE_INITIAL,
+ /* We have gotten initial metadata, and are reading either
+ messages or trailing metadata */
+ READ_STATE_GOT_INITIAL_METADATA,
+ /* The stream is closed for reading */
+ READ_STATE_READ_CLOSED,
+ /* The stream is closed for reading & writing */
+ READ_STATE_STREAM_CLOSED
+} read_state;
struct grpc_call {
grpc_completion_queue *cq;
grpc_channel *channel;
grpc_mdctx *metadata_context;
+ /* TODO(ctiller): share with cq if possible? */
+ gpr_mu mu;
- call_state state;
gpr_uint8 is_client;
- gpr_uint8 have_write;
- grpc_metadata_buffer incoming_metadata;
-
- /* protects variables in this section */
- gpr_mu read_mu;
- gpr_uint8 received_start;
- gpr_uint8 start_ok;
- gpr_uint8 reads_done;
- gpr_uint8 received_finish;
- gpr_uint8 received_metadata;
- gpr_uint8 have_read;
+ read_state read_state;
gpr_uint8 have_alarm;
- gpr_uint8 pending_writes_done;
- gpr_uint8 got_status_code;
- /* The current outstanding read message tag (only valid if have_read == 1) */
- void *read_tag;
- void *metadata_tag;
- void *finished_tag;
- pending_read_queue prq;
+ gpr_uint8 sending;
+ gpr_uint8 num_completed_requests;
+ gpr_uint8 need_more_data;
+
+ reqinfo requests[GRPC_IOREQ_OP_COUNT];
+ reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
+ completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
+ grpc_byte_buffer_queue incoming_queue;
+ grpc_metadata_array buffered_initial_metadata;
+ grpc_metadata_array buffered_trailing_metadata;
+ grpc_mdelem **owned_metadata;
+ size_t owned_metadata_count;
+ size_t owned_metadata_capacity;
+
+ received_status status[STATUS_SOURCE_COUNT];
grpc_alarm alarm;
- /* The current outstanding send message/context/invoke/end tag (only valid if
- have_write == 1) */
- void *write_tag;
- grpc_byte_buffer *pending_write;
- gpr_uint32 pending_write_flags;
-
- /* The final status of the call */
- grpc_status_code status_code;
- grpc_mdstr *status_details;
-
gpr_refcount internal_refcount;
+
+ legacy_state *legacy_state;
};
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
+#define SWAP(type, x, y) \
+ do { \
+ type temp = x; \
+ x = y; \
+ y = temp; \
+ } while (0)
+
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
+static send_action choose_send_action(grpc_call *call);
+static void enact_send_action(grpc_call *call, send_action sa);
grpc_call *grpc_call_create(grpc_channel *channel,
const void *server_transport_data) {
+ size_t i;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
- call->cq = NULL;
+ memset(call, 0, sizeof(grpc_call));
+ gpr_mu_init(&call->mu);
call->channel = channel;
+ call->is_client = server_transport_data == NULL;
+ for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+ call->requests[i].set = REQSET_EMPTY;
+ }
+ if (call->is_client) {
+ call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE;
+ call->requests[GRPC_IOREQ_SEND_STATUS].set = REQSET_DONE;
+ }
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
- call->state = CALL_CREATED;
- call->is_client = (server_transport_data == NULL);
- call->write_tag = INVALID_TAG;
- call->read_tag = INVALID_TAG;
- call->metadata_tag = INVALID_TAG;
- call->finished_tag = INVALID_TAG;
- call->have_read = 0;
- call->have_write = 0;
- call->have_alarm = 0;
- call->received_metadata = 0;
- call->got_status_code = 0;
- call->start_ok = 0;
- call->status_code =
- server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
- call->status_details = NULL;
- call->received_finish = 0;
- call->reads_done = 0;
- call->received_start = 0;
- call->pending_write = NULL;
- call->pending_writes_done = 0;
- grpc_metadata_buffer_init(&call->incoming_metadata);
- gpr_ref_init(&call->internal_refcount, 1);
+ /* one ref is dropped in response to destroy, the other in
+ stream_closed */
+ gpr_ref_init(&call->internal_refcount, 2);
grpc_call_stack_init(channel_stack, server_transport_data,
CALL_STACK_FROM_CALL(call));
- prq_init(&call->prq);
- gpr_mu_init(&call->read_mu);
return call;
}
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
-void grpc_call_internal_unref(grpc_call *c) {
- if (gpr_unref(&c->internal_refcount)) {
- grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
- grpc_metadata_buffer_destroy(&c->incoming_metadata, GRPC_OP_OK);
- if (c->status_details) {
- grpc_mdstr_unref(c->status_details);
+static void destroy_call(void *call, int ignored_success) {
+ size_t i;
+ grpc_call *c = call;
+ grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
+ grpc_channel_internal_unref(c->channel);
+ gpr_mu_destroy(&c->mu);
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ if (c->status[i].details) {
+ grpc_mdstr_unref(c->status[i].details);
}
- prq_destroy(&c->prq);
- gpr_mu_destroy(&c->read_mu);
- grpc_channel_internal_unref(c->channel);
- gpr_free(c);
}
+ for (i = 0; i < c->owned_metadata_count; i++) {
+ grpc_mdelem_unref(c->owned_metadata[i]);
+ }
+ gpr_free(c->owned_metadata);
+ gpr_free(c->buffered_initial_metadata.metadata);
+ gpr_free(c->buffered_trailing_metadata.metadata);
+ if (c->legacy_state) {
+ destroy_legacy_state(c->legacy_state);
+ }
+ gpr_free(c);
}
-void grpc_call_destroy(grpc_call *c) {
- int cancel;
- gpr_mu_lock(&c->read_mu);
- if (c->have_alarm) {
- grpc_alarm_cancel(&c->alarm);
- c->have_alarm = 0;
+void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
+ if (gpr_unref(&c->internal_refcount)) {
+ if (allow_immediate_deletion) {
+ destroy_call(c, 1);
+ } else {
+ grpc_iomgr_add_callback(destroy_call, c);
+ }
}
- cancel = !c->received_finish;
- gpr_mu_unlock(&c->read_mu);
- if (cancel) grpc_call_cancel(c);
- grpc_call_internal_unref(c);
}
-static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
- if (!call->got_status_code) {
- call->status_code = status;
- call->got_status_code = 1;
- }
+static void set_status_code(grpc_call *call, status_source source,
+ gpr_uint32 status) {
+ call->status[source].is_set = 1;
+ call->status[source].code = status;
}
-static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
- if (!call->status_details) {
- call->status_details = grpc_mdstr_ref(status);
+static void set_status_details(grpc_call *call, status_source source,
+ grpc_mdstr *status) {
+ if (call->status[source].details != NULL) {
+ grpc_mdstr_unref(call->status[source].details);
}
+ call->status[source].details = status;
}
-grpc_call_error grpc_call_cancel(grpc_call *c) {
- grpc_call_element *elem;
+static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
+ if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
+ call->cq = cq;
+ return GRPC_CALL_OK;
+}
+
+static void request_more_data(grpc_call *call) {
grpc_call_op op;
- op.type = GRPC_CANCEL_OP;
+ /* call down */
+ op.type = GRPC_REQUEST_DATA;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
- elem = CALL_ELEM_FROM_CALL(c, 0);
- elem->filter->call_op(elem, NULL, &op);
-
- return GRPC_CALL_OK;
+ grpc_call_execute_op(call, &op);
}
-grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
- grpc_status_code status,
- const char *description) {
- grpc_mdstr *details =
- description ? grpc_mdstr_from_string(c->metadata_context, description)
- : NULL;
- gpr_mu_lock(&c->read_mu);
- maybe_set_status_code(c, status);
- if (details) {
- maybe_set_status_details(c, details);
+static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
+
+static void unlock(grpc_call *call) {
+ send_action sa = SEND_NOTHING;
+ completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
+ int num_completed_requests = call->num_completed_requests;
+ int need_more_data =
+ call->need_more_data &&
+ call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set == REQSET_DONE;
+ int i;
+
+ if (need_more_data) {
+ call->need_more_data = 0;
}
- gpr_mu_unlock(&c->read_mu);
- return grpc_call_cancel(c);
-}
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
- grpc_call_element *elem;
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, op);
-}
+ if (num_completed_requests != 0) {
+ memcpy(completed_requests, call->completed_requests,
+ sizeof(completed_requests));
+ call->num_completed_requests = 0;
+ }
-void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem,
- gpr_uint32 flags) {
- grpc_call_element *elem;
- grpc_call_op op;
+ if (!call->sending) {
+ sa = choose_send_action(call);
+ if (sa != SEND_NOTHING) {
+ call->sending = 1;
+ grpc_call_internal_ref(call);
+ }
+ }
- GPR_ASSERT(call->state < CALL_FINISHED);
+ gpr_mu_unlock(&call->mu);
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.data.metadata = mdelem;
+ if (need_more_data) {
+ request_more_data(call);
+ }
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ if (sa != SEND_NOTHING) {
+ enact_send_action(call, sa);
+ }
+
+ for (i = 0; i < num_completed_requests; i++) {
+ completed_requests[i].on_complete(call, completed_requests[i].status,
+ completed_requests[i].user_data);
+ }
}
-grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
- grpc_metadata *metadata,
- gpr_uint32 flags) {
- grpc_mdelem *mdelem;
+static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
+ int i;
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ if (call->status[i].is_set) {
+ *args.code = call->status[i].code;
+ if (!args.details) return;
+ if (call->status[i].details) {
+ gpr_slice details = call->status[i].details->slice;
+ size_t len = GPR_SLICE_LENGTH(details);
+ if (len + 1 > *args.details_capacity) {
+ *args.details_capacity =
+ GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
+ *args.details = gpr_realloc(*args.details, *args.details_capacity);
+ }
+ memcpy(*args.details, GPR_SLICE_START_PTR(details), len);
+ (*args.details)[len] = 0;
+ } else {
+ goto no_details;
+ }
+ return;
+ }
+ }
+ *args.code = GRPC_STATUS_UNKNOWN;
+ if (!args.details) return;
- if (call->is_client) {
- if (call->state >= CALL_STARTED) {
- return GRPC_CALL_ERROR_ALREADY_INVOKED;
+no_details:
+ if (0 == *args.details_capacity) {
+ *args.details_capacity = 8;
+ *args.details = gpr_malloc(*args.details_capacity);
+ }
+ **args.details = 0;
+}
+
+static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
+ grpc_op_error status) {
+ completed_request *cr;
+ size_t i;
+ if (call->requests[op].set < GRPC_IOREQ_OP_COUNT) {
+ reqinfo_master *master = &call->masters[call->requests[op].set];
+ /* ioreq is live: we need to do something */
+ master->complete_mask |= 1 << op;
+ if (status != GRPC_OP_OK) {
+ master->status = status;
}
- } else {
- if (call->state >= CALL_FINISHED) {
- return GRPC_CALL_ERROR_ALREADY_FINISHED;
+ call->requests[op].set =
+ (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE)
+ ? REQSET_EMPTY
+ : REQSET_DONE;
+ if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) {
+ if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
+ get_final_status(
+ call, call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
+ }
+ for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+ if (call->requests[i].set == op) {
+ call->requests[i].set = REQSET_EMPTY;
+ }
+ }
+ cr = &call->completed_requests[call->num_completed_requests++];
+ cr->status = master->status;
+ cr->on_complete = master->on_complete;
+ cr->user_data = master->user_data;
}
}
+}
- mdelem = grpc_mdelem_from_string_and_buffer(
- call->metadata_context, metadata->key, (gpr_uint8 *)metadata->value,
- metadata->value_length);
- grpc_call_add_mdelem(call, mdelem, flags);
- return GRPC_CALL_OK;
+static void finish_send_op(grpc_call *call, grpc_ioreq_op op,
+ grpc_op_error error) {
+ lock(call);
+ finish_ioreq_op(call, op, error);
+ call->sending = 0;
+ unlock(call);
+ grpc_call_internal_unref(call, 0);
}
-static void finish_call(grpc_call *call) {
- size_t count;
- grpc_metadata *elements;
- count = grpc_metadata_buffer_count(&call->incoming_metadata);
- elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata);
- grpc_cq_end_finished(
- call->cq, call->finished_tag, call, grpc_metadata_buffer_cleanup_elements,
- elements, call->status_code,
- call->status_details
- ? (char *)grpc_mdstr_as_c_string(call->status_details)
- : NULL,
- elements, count);
+static void finish_write_step(void *pc, grpc_op_error error) {
+ finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, error);
}
-static void done_write(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
+static void finish_finish_step(void *pc, grpc_op_error error) {
+ finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, error);
+}
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
+static void finish_start_step(void *pc, grpc_op_error error) {
+ finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
}
-static void done_writes_done(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
+static send_action choose_send_action(grpc_call *call) {
+ switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set) {
+ case REQSET_EMPTY:
+ return SEND_NOTHING;
+ default:
+ return SEND_INITIAL_METADATA;
+ case REQSET_DONE:
+ break;
+ }
+ switch (call->requests[GRPC_IOREQ_SEND_MESSAGE].set) {
+ case REQSET_EMPTY:
+ return SEND_NOTHING;
+ default:
+ return SEND_MESSAGE;
+ case REQSET_DONE:
+ break;
+ }
+ switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) {
+ case REQSET_EMPTY:
+ case REQSET_DONE:
+ return SEND_NOTHING;
+ default:
+ if (call->is_client) {
+ return SEND_FINISH;
+ } else if (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set !=
+ REQSET_EMPTY &&
+ call->requests[GRPC_IOREQ_SEND_STATUS].set != REQSET_EMPTY) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
+ return SEND_TRAILING_METADATA_AND_FINISH;
+ } else {
+ return SEND_NOTHING;
+ }
+ }
+}
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
+static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
+ grpc_call_op op;
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.data.metadata = elem;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ grpc_call_execute_op(call, &op);
}
-static void call_started(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- grpc_call_element *elem;
- grpc_byte_buffer *pending_write = NULL;
- gpr_uint32 pending_write_flags = 0;
- gpr_uint8 pending_writes_done = 0;
- int ok;
+static void enact_send_action(grpc_call *call, send_action sa) {
+ grpc_ioreq_data data;
grpc_call_op op;
+ size_t i;
+ char status_str[GPR_LTOA_MIN_BUFSIZE];
- gpr_mu_lock(&call->read_mu);
- GPR_ASSERT(!call->received_start);
- call->received_start = 1;
- ok = call->start_ok = (error == GRPC_OP_OK);
- pending_write = call->pending_write;
- pending_write_flags = call->pending_write_flags;
- pending_writes_done = call->pending_writes_done;
- gpr_mu_unlock(&call->read_mu);
-
- if (pending_write) {
- if (ok) {
+ switch (sa) {
+ case SEND_NOTHING:
+ abort();
+ break;
+ case SEND_INITIAL_METADATA:
+ data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data;
+ for (i = 0; i < data.send_metadata.count; i++) {
+ const grpc_metadata *md = &data.send_metadata.metadata[i];
+ send_metadata(call,
+ grpc_mdelem_from_string_and_buffer(
+ call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value, md->value_length));
+ }
+ op.type = GRPC_SEND_START;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.data.start.pollset = grpc_cq_pollset(call->cq);
+ op.done_cb = finish_start_step;
+ op.user_data = call;
+ grpc_call_execute_op(call, &op);
+ break;
+ case SEND_MESSAGE:
+ data = call->requests[GRPC_IOREQ_SEND_MESSAGE].data;
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
- op.flags = pending_write_flags;
- op.done_cb = done_write;
+ op.flags = 0;
+ op.data.message = data.send_message;
+ op.done_cb = finish_write_step;
op.user_data = call;
- op.data.message = pending_write;
-
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
- } else {
- done_write(call, error);
- }
- grpc_byte_buffer_destroy(pending_write);
- }
- if (pending_writes_done) {
- if (ok) {
+ grpc_call_execute_op(call, &op);
+ break;
+ case SEND_TRAILING_METADATA_AND_FINISH:
+ /* send trailing metadata */
+ data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
+ for (i = 0; i < data.send_metadata.count; i++) {
+ const grpc_metadata *md = &data.send_metadata.metadata[i];
+ send_metadata(call,
+ grpc_mdelem_from_string_and_buffer(
+ call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value, md->value_length));
+ }
+ /* send status */
+ /* TODO(ctiller): cache common status values */
+ data = call->requests[GRPC_IOREQ_SEND_STATUS].data;
+ gpr_ltoa(data.send_status.code, status_str);
+ send_metadata(
+ call,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context, status_str)));
+ if (data.send_status.details) {
+ send_metadata(
+ call,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context,
+ data.send_status.details)));
+ }
+ /* fallthrough: see choose_send_action for details */
+ case SEND_FINISH:
op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
- op.done_cb = done_writes_done;
+ op.done_cb = finish_finish_step;
op.user_data = call;
-
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
- } else {
- done_writes_done(call, error);
- }
+ grpc_call_execute_op(call, &op);
+ break;
}
-
- grpc_call_internal_unref(call);
}
-grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
- void *metadata_read_tag,
- void *finished_tag, gpr_uint32 flags) {
- grpc_call_element *elem;
- grpc_call_op op;
-
- /* validate preconditions */
- if (!call->is_client) {
- gpr_log(GPR_ERROR, "can only call %s on clients", __FUNCTION__);
- return GRPC_CALL_ERROR_NOT_ON_SERVER;
+static grpc_call_error start_ioreq_error(grpc_call *call,
+ gpr_uint32 mutated_ops,
+ grpc_call_error ret) {
+ size_t i;
+ for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+ if (mutated_ops & (1 << i)) {
+ call->requests[i].set = REQSET_EMPTY;
+ }
}
+ return ret;
+}
- if (call->state >= CALL_STARTED || call->cq) {
- gpr_log(GPR_ERROR, "call is already invoked");
- return GRPC_CALL_ERROR_ALREADY_INVOKED;
+static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
+ size_t nreqs,
+ grpc_ioreq_completion_func completion,
+ void *user_data) {
+ size_t i;
+ gpr_uint32 have_ops = 0;
+ grpc_ioreq_op op;
+ reqinfo *requests = call->requests;
+ reqinfo_master *master;
+ grpc_ioreq_data data;
+ gpr_uint8 set;
+
+ if (nreqs == 0) {
+ return GRPC_CALL_OK;
}
- if (call->have_write) {
- gpr_log(GPR_ERROR, "can only have one pending write operation at a time");
- return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
- }
+ set = reqs[0].op;
- if (call->have_read) {
- gpr_log(GPR_ERROR, "can only have one pending read operation at a time");
- return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
- }
+ for (i = 0; i < nreqs; i++) {
+ op = reqs[i].op;
+ if (requests[op].set < GRPC_IOREQ_OP_COUNT) {
+ return start_ioreq_error(call, have_ops,
+ GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
+ } else if (requests[op].set == REQSET_DONE) {
+ return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
+ }
+ have_ops |= 1 << op;
+ data = reqs[i].data;
- if (flags & GRPC_WRITE_NO_COMPRESS) {
- return GRPC_CALL_ERROR_INVALID_FLAGS;
+ requests[op].data = data;
+ requests[op].set = set;
}
- /* inform the completion queue of an incoming operation */
- grpc_cq_begin_op(cq, call, GRPC_FINISHED);
- grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
-
- gpr_mu_lock(&call->read_mu);
-
- /* update state */
- call->cq = cq;
- call->state = CALL_STARTED;
- call->finished_tag = finished_tag;
-
- if (call->received_finish) {
- /* handle early cancellation */
- grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL,
- NULL, 0, NULL);
- finish_call(call);
-
- /* early out.. unlock & return */
- gpr_mu_unlock(&call->read_mu);
- return GRPC_CALL_OK;
+ master = &call->masters[set];
+ master->status = GRPC_OP_OK;
+ master->need_mask = have_ops;
+ master->complete_mask = 0;
+ master->on_complete = completion;
+ master->user_data = user_data;
+
+ for (i = 0; i < nreqs; i++) {
+ op = reqs[i].op;
+ data = reqs[i].data;
+ switch (op) {
+ case GRPC_IOREQ_OP_COUNT:
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+ break;
+ case GRPC_IOREQ_RECV_MESSAGE:
+ *data.recv_message = grpc_bbq_pop(&call->incoming_queue);
+ if (*data.recv_message) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ if (call->read_state == READ_STATE_STREAM_CLOSED && grpc_bbq_empty(&call->incoming_queue)) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+ }
+ } else {
+ /* no message: either end of stream or we need more bytes */
+ if (call->read_state >= READ_STATE_READ_CLOSED) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ if (call->read_state == READ_STATE_STREAM_CLOSED) {
+ /* stream closed AND we've drained all messages: signal to the application */
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+ }
+ } else {
+ call->need_more_data = 1;
+ }
+ }
+ break;
+ case GRPC_IOREQ_RECV_STATUS:
+ if (call->read_state >= READ_STATE_READ_CLOSED) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
+ }
+ break;
+ case GRPC_IOREQ_RECV_CLOSE:
+ if (call->read_state == READ_STATE_STREAM_CLOSED) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+ }
+ break;
+ case GRPC_IOREQ_SEND_CLOSE:
+ if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) {
+ requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE;
+ }
+ if (call->read_state == READ_STATE_STREAM_CLOSED) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_ERROR);
+ }
+ break;
+ case GRPC_IOREQ_SEND_MESSAGE:
+ case GRPC_IOREQ_SEND_INITIAL_METADATA:
+ case GRPC_IOREQ_SEND_TRAILING_METADATA:
+ case GRPC_IOREQ_SEND_STATUS:
+ if (call->read_state == READ_STATE_STREAM_CLOSED) {
+ finish_ioreq_op(call, op, GRPC_OP_ERROR);
+ }
+ break;
+ case GRPC_IOREQ_RECV_INITIAL_METADATA:
+ data.recv_metadata->count = 0;
+ if (call->buffered_initial_metadata.count > 0) {
+ SWAP(grpc_metadata_array, *data.recv_metadata,
+ call->buffered_initial_metadata);
+ }
+ if (call->read_state >= READ_STATE_GOT_INITIAL_METADATA) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
+ }
+ break;
+ case GRPC_IOREQ_RECV_TRAILING_METADATA:
+ data.recv_metadata->count = 0;
+ if (call->buffered_trailing_metadata.count > 0) {
+ SWAP(grpc_metadata_array, *data.recv_metadata,
+ call->buffered_trailing_metadata);
+ }
+ if (call->read_state >= READ_STATE_READ_CLOSED) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
+ }
+ break;
+ }
}
- call->metadata_tag = metadata_read_tag;
-
- gpr_mu_unlock(&call->read_mu);
-
- /* call down the filter stack */
- op.type = GRPC_SEND_START;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.done_cb = call_started;
- op.data.start.pollset = grpc_cq_pollset(cq);
- op.user_data = call;
- grpc_call_internal_ref(call);
-
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
-
return GRPC_CALL_OK;
}
-grpc_call_error grpc_call_server_accept_old(grpc_call *call,
- grpc_completion_queue *cq,
- void *finished_tag) {
- /* validate preconditions */
- if (call->is_client) {
- gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__);
- return GRPC_CALL_ERROR_NOT_ON_CLIENT;
- }
-
- if (call->state >= CALL_BOUNDCQ) {
- gpr_log(GPR_ERROR, "call is already accepted");
- return GRPC_CALL_ERROR_ALREADY_ACCEPTED;
- }
+static void call_start_ioreq_done(grpc_call *call, grpc_op_error status,
+ void *user_data) {
+ grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status);
+}
- /* inform the completion queue of an incoming operation (corresponding to
- finished_tag) */
- grpc_cq_begin_op(cq, call, GRPC_FINISHED);
+grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
+ size_t nreqs, void *tag) {
+ grpc_call_error err;
+ lock(call);
+ err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag);
+ unlock(call);
+ return err;
+}
- /* update state */
- gpr_mu_lock(&call->read_mu);
- call->state = CALL_BOUNDCQ;
- call->cq = cq;
- call->finished_tag = finished_tag;
- call->received_start = 1;
- if (prq_is_empty(&call->prq) && call->received_finish) {
- finish_call(call);
+grpc_call_error grpc_call_start_ioreq_and_call_back(
+ grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
+ grpc_ioreq_completion_func on_complete, void *user_data) {
+ grpc_call_error err;
+ lock(call);
+ err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
+ unlock(call);
+ return err;
+}
- /* early out.. unlock & return */
- gpr_mu_unlock(&call->read_mu);
- return GRPC_CALL_OK;
+void grpc_call_destroy(grpc_call *c) {
+ int cancel;
+ lock(c);
+ if (c->have_alarm) {
+ grpc_alarm_cancel(&c->alarm);
+ c->have_alarm = 0;
}
- gpr_mu_unlock(&call->read_mu);
-
- return GRPC_CALL_OK;
+ cancel = c->read_state != READ_STATE_STREAM_CLOSED;
+ unlock(c);
+ if (cancel) grpc_call_cancel(c);
+ grpc_call_internal_unref(c, 1);
}
-grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
- gpr_uint32 flags) {
+grpc_call_error grpc_call_cancel(grpc_call *c) {
grpc_call_element *elem;
grpc_call_op op;
- /* validate preconditions */
- if (call->is_client) {
- gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__);
- return GRPC_CALL_ERROR_NOT_ON_CLIENT;
- }
-
- if (call->state >= CALL_STARTED) {
- gpr_log(GPR_ERROR, "call is already started");
- return GRPC_CALL_ERROR_ALREADY_INVOKED;
- }
-
- if (flags & GRPC_WRITE_NO_COMPRESS) {
- return GRPC_CALL_ERROR_INVALID_FLAGS;
- }
-
- /* update state */
- call->state = CALL_STARTED;
-
- /* call down */
- op.type = GRPC_SEND_START;
+ op.type = GRPC_CANCEL_OP;
op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
+ op.flags = 0;
op.done_cb = do_nothing;
- op.data.start.pollset = grpc_cq_pollset(call->cq);
op.user_data = NULL;
- elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem = CALL_ELEM_FROM_CALL(c, 0);
elem->filter->call_op(elem, NULL, &op);
return GRPC_CALL_OK;
}
-void grpc_call_client_initial_metadata_complete(
- grpc_call_element *surface_element) {
- grpc_call *call = grpc_call_from_top_element(surface_element);
- size_t count;
- grpc_metadata *elements;
-
- gpr_mu_lock(&call->read_mu);
- count = grpc_metadata_buffer_count(&call->incoming_metadata);
- elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata);
-
- GPR_ASSERT(!call->received_metadata);
- grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call,
- grpc_metadata_buffer_cleanup_elements,
- elements, count, elements);
- call->received_metadata = 1;
- call->metadata_tag = INVALID_TAG;
- gpr_mu_unlock(&call->read_mu);
+grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
+ grpc_status_code status,
+ const char *description) {
+ grpc_mdstr *details =
+ description ? grpc_mdstr_from_string(c->metadata_context, description)
+ : NULL;
+ lock(c);
+ set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
+ set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
+ unlock(c);
+ return grpc_call_cancel(c);
}
-static void request_more_data(grpc_call *call) {
+void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
grpc_call_element *elem;
- grpc_call_op op;
-
- /* call down */
- op.type = GRPC_REQUEST_DATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
-
+ GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ elem->filter->call_op(elem, NULL, op);
}
-grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) {
- gpr_uint8 request_more = 0;
-
- switch (call->state) {
- case CALL_CREATED:
- return GRPC_CALL_ERROR_NOT_INVOKED;
- case CALL_BOUNDCQ:
- case CALL_STARTED:
- break;
- case CALL_FINISHED:
- return GRPC_CALL_ERROR_ALREADY_FINISHED;
- }
-
- gpr_mu_lock(&call->read_mu);
-
- if (call->have_read) {
- gpr_mu_unlock(&call->read_mu);
- return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
- }
-
- grpc_cq_begin_op(call->cq, call, GRPC_READ);
+grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
+ return CALL_FROM_TOP_ELEM(elem);
+}
- if (!prq_pop_to_cq(&call->prq, tag, call, call->cq)) {
- if (call->reads_done) {
- grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL);
+static void call_alarm(void *arg, int success) {
+ grpc_call *call = arg;
+ if (success) {
+ if (call->is_client) {
+ grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
+ "Deadline Exceeded");
} else {
- call->read_tag = tag;
- call->have_read = 1;
- request_more = call->received_start;
+ grpc_call_cancel(call);
}
- } else if (prq_is_empty(&call->prq) && call->received_finish) {
- finish_call(call);
- }
-
- gpr_mu_unlock(&call->read_mu);
-
- if (request_more) {
- request_more_data(call);
}
-
- return GRPC_CALL_OK;
+ grpc_call_internal_unref(call, 1);
}
-grpc_call_error grpc_call_start_write_old(grpc_call *call,
- grpc_byte_buffer *byte_buffer,
- void *tag, gpr_uint32 flags) {
- grpc_call_element *elem;
- grpc_call_op op;
-
- switch (call->state) {
- case CALL_CREATED:
- case CALL_BOUNDCQ:
- return GRPC_CALL_ERROR_NOT_INVOKED;
- case CALL_STARTED:
- break;
- case CALL_FINISHED:
- return GRPC_CALL_ERROR_ALREADY_FINISHED;
- }
-
- if (call->have_write) {
- return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
- }
-
- grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
-
- /* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a
- flush, and that flush should be propogated down from here */
- if (byte_buffer == NULL) {
- grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, GRPC_OP_OK);
- return GRPC_CALL_OK;
- }
-
- call->write_tag = tag;
- call->have_write = 1;
-
- gpr_mu_lock(&call->read_mu);
- if (!call->received_start) {
- call->pending_write = grpc_byte_buffer_copy(byte_buffer);
- call->pending_write_flags = flags;
-
- gpr_mu_unlock(&call->read_mu);
- } else {
- gpr_mu_unlock(&call->read_mu);
-
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.done_cb = done_write;
- op.user_data = call;
- op.data.message = byte_buffer;
+void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ if (call->have_alarm) {
+ gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
}
-
- return GRPC_CALL_OK;
+ grpc_call_internal_ref(call);
+ call->have_alarm = 1;
+ grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) {
- grpc_call_element *elem;
- grpc_call_op op;
-
- if (!call->is_client) {
- return GRPC_CALL_ERROR_NOT_ON_SERVER;
- }
-
- switch (call->state) {
- case CALL_CREATED:
- case CALL_BOUNDCQ:
- return GRPC_CALL_ERROR_NOT_INVOKED;
- case CALL_FINISHED:
- return GRPC_CALL_ERROR_ALREADY_FINISHED;
- case CALL_STARTED:
- break;
- }
-
- if (call->have_write) {
- return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
- }
-
- grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
-
- call->write_tag = tag;
- call->have_write = 1;
-
- gpr_mu_lock(&call->read_mu);
- if (!call->received_start) {
- call->pending_writes_done = 1;
-
- gpr_mu_unlock(&call->read_mu);
- } else {
- gpr_mu_unlock(&call->read_mu);
-
- op.type = GRPC_SEND_FINISH;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = done_writes_done;
- op.user_data = call;
-
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
- }
-
- return GRPC_CALL_OK;
+static void mark_read_closed(grpc_call *call) {
+ call->read_state = READ_STATE_READ_CLOSED;
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
}
-grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
- grpc_status_code status,
- const char *details,
- void *tag) {
- grpc_call_element *elem;
- grpc_call_op op;
-
- if (call->is_client) {
- return GRPC_CALL_ERROR_NOT_ON_CLIENT;
- }
-
- switch (call->state) {
- case CALL_CREATED:
- case CALL_BOUNDCQ:
- return GRPC_CALL_ERROR_NOT_INVOKED;
- case CALL_FINISHED:
- return GRPC_CALL_ERROR_ALREADY_FINISHED;
- case CALL_STARTED:
- break;
- }
-
- if (call->have_write) {
- return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
- }
-
- elem = CALL_ELEM_FROM_CALL(call, 0);
-
- if (details && details[0]) {
- grpc_mdelem *md = grpc_mdelem_from_strings(call->metadata_context,
- "grpc-message", details);
+void grpc_call_read_closed(grpc_call_element *elem) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ GPR_ASSERT(call->read_state < READ_STATE_READ_CLOSED);
+ mark_read_closed(call);
+ unlock(call);
+}
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.data.metadata = md;
- elem->filter->call_op(elem, NULL, &op);
+void grpc_call_stream_closed(grpc_call_element *elem) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ GPR_ASSERT(call->read_state < READ_STATE_STREAM_CLOSED);
+ if (call->read_state < READ_STATE_READ_CLOSED) {
+ mark_read_closed(call);
}
-
- /* always send status */
- {
- grpc_mdelem *md;
- char buffer[GPR_LTOA_MIN_BUFSIZE];
- gpr_ltoa(status, buffer);
- md =
- grpc_mdelem_from_strings(call->metadata_context, "grpc-status", buffer);
-
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.data.metadata = md;
- elem->filter->call_op(elem, NULL, &op);
+ call->read_state = READ_STATE_STREAM_CLOSED;
+ if (grpc_bbq_empty(&call->incoming_queue)) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
}
-
- grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
-
- call->state = CALL_FINISHED;
- call->write_tag = tag;
- call->have_write = 1;
-
- op.type = GRPC_SEND_FINISH;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = done_writes_done;
- op.user_data = call;
-
- elem->filter->call_op(elem, NULL, &op);
-
- return GRPC_CALL_OK;
+ unlock(call);
+ grpc_call_internal_unref(call, 0);
}
/* we offset status by a small amount when storing it into transport metadata
@@ -865,7 +815,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
- status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
+ status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
@@ -878,112 +828,380 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
-void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
+void grpc_call_recv_message(grpc_call_element *elem,
+ grpc_byte_buffer *byte_buffer) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ lock(call);
+ if (call->requests[GRPC_IOREQ_RECV_MESSAGE].set < GRPC_IOREQ_OP_COUNT) {
+ /* there's an outstanding read */
+ *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ } else {
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+ }
+ unlock(call);
+}
+
+void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- grpc_mdelem *md = op->data.metadata;
grpc_mdstr *key = md->key;
+ grpc_metadata_array *dest;
+ grpc_metadata *mdusr;
+ lock(call);
if (key == grpc_channel_get_status_string(call->channel)) {
- maybe_set_status_code(call, decode_status(md));
+ set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
grpc_mdelem_unref(md);
- op->done_cb(op->user_data, GRPC_OP_OK);
} else if (key == grpc_channel_get_message_string(call->channel)) {
- maybe_set_status_details(call, md->value);
+ set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md);
- op->done_cb(op->user_data, GRPC_OP_OK);
} else {
- grpc_metadata_buffer_queue(&call->incoming_metadata, op);
+ if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) {
+ dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
+ GRPC_IOREQ_OP_COUNT
+ ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
+ .data.recv_metadata
+ : &call->buffered_initial_metadata;
+ } else {
+ dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set <
+ GRPC_IOREQ_OP_COUNT
+ ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
+ .data.recv_metadata
+ : &call->buffered_trailing_metadata;
+ }
+ if (dest->count == dest->capacity) {
+ dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
+ dest->metadata =
+ gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
+ }
+ mdusr = &dest->metadata[dest->count++];
+ mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
+ mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
+ mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
+ if (call->owned_metadata_count == call->owned_metadata_capacity) {
+ call->owned_metadata_capacity = GPR_MAX(
+ call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
+ call->owned_metadata =
+ gpr_realloc(call->owned_metadata,
+ sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
+ }
+ call->owned_metadata[call->owned_metadata_count++] = md;
}
+ unlock(call);
}
-void grpc_call_recv_finish(grpc_call_element *elem, int is_full_close) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
+ return CALL_STACK_FROM_CALL(call);
+}
- gpr_mu_lock(&call->read_mu);
+/*
+ * LEGACY API IMPLEMENTATION
+ * All this code will disappear as soon as wrappings are updated
+ */
- if (call->have_read) {
- grpc_cq_end_read(call->cq, call->read_tag, call, do_nothing, NULL, NULL);
- call->read_tag = INVALID_TAG;
- call->have_read = 0;
- }
- if (call->is_client && !call->received_metadata && call->cq) {
- size_t count;
- grpc_metadata *elements;
+struct legacy_state {
+ gpr_uint8 md_out_buffer;
+ size_t md_out_count[2];
+ size_t md_out_capacity[2];
+ grpc_metadata *md_out[2];
+ grpc_byte_buffer *msg_out;
+
+ /* input buffers */
+ grpc_metadata_array initial_md_in;
+ grpc_metadata_array trailing_md_in;
- call->received_metadata = 1;
+ size_t details_capacity;
+ char *details;
+ grpc_status_code status;
- count = grpc_metadata_buffer_count(&call->incoming_metadata);
- elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata);
- grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call,
- grpc_metadata_buffer_cleanup_elements,
- elements, count, elements);
+ size_t msg_in_read_idx;
+ grpc_byte_buffer *msg_in;
+
+ void *finished_tag;
+};
+
+static legacy_state *get_legacy_state(grpc_call *call) {
+ if (call->legacy_state == NULL) {
+ call->legacy_state = gpr_malloc(sizeof(legacy_state));
+ memset(call->legacy_state, 0, sizeof(legacy_state));
}
- if (is_full_close) {
- if (call->have_alarm) {
- grpc_alarm_cancel(&call->alarm);
- call->have_alarm = 0;
- }
- call->received_finish = 1;
- if (prq_is_empty(&call->prq) && call->cq != NULL) {
- finish_call(call);
+ return call->legacy_state;
+}
+
+static void destroy_legacy_state(legacy_state *ls) {
+ size_t i, j;
+ for (i = 0; i < 2; i++) {
+ for (j = 0; j < ls->md_out_count[i]; j++) {
+ gpr_free(ls->md_out[i][j].key);
+ gpr_free(ls->md_out[i][j].value);
}
- } else {
- call->reads_done = 1;
+ gpr_free(ls->md_out[i]);
}
- gpr_mu_unlock(&call->read_mu);
+ gpr_free(ls->initial_md_in.metadata);
+ gpr_free(ls->trailing_md_in.metadata);
+ gpr_free(ls);
}
-void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *message,
- void (*on_finish)(void *user_data,
- grpc_op_error error),
- void *user_data) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
+ grpc_metadata *metadata,
+ gpr_uint32 flags) {
+ legacy_state *ls;
+ grpc_metadata *mdout;
+
+ lock(call);
+ ls = get_legacy_state(call);
+
+ if (ls->md_out_count[ls->md_out_buffer] ==
+ ls->md_out_capacity[ls->md_out_buffer]) {
+ ls->md_out_capacity[ls->md_out_buffer] =
+ GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2,
+ ls->md_out_capacity[ls->md_out_buffer] + 8);
+ ls->md_out[ls->md_out_buffer] = gpr_realloc(
+ ls->md_out[ls->md_out_buffer],
+ sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
+ }
+ mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++];
+ mdout->key = gpr_strdup(metadata->key);
+ mdout->value = gpr_malloc(metadata->value_length);
+ mdout->value_length = metadata->value_length;
+ memcpy(mdout->value, metadata->value, metadata->value_length);
+
+ unlock(call);
+
+ return GRPC_CALL_OK;
+}
+
+static void finish_status(grpc_call *call, grpc_op_error status,
+ void *ignored) {
+ legacy_state *ls;
+
+ lock(call);
+ ls = get_legacy_state(call);
+ grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
+ ls->status, ls->details, ls->trailing_md_in.metadata,
+ ls->trailing_md_in.count);
+ unlock(call);
+}
+
+static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
+ void *tag) {
+ legacy_state *ls;
+
+ lock(call);
+ ls = get_legacy_state(call);
+ if (status == GRPC_OP_OK) {
+ grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
+ ls->initial_md_in.count,
+ ls->initial_md_in.metadata);
- gpr_mu_lock(&call->read_mu);
- if (call->have_read) {
- grpc_cq_end_read(call->cq, call->read_tag, call, on_finish, user_data,
- message);
- call->read_tag = INVALID_TAG;
- call->have_read = 0;
} else {
- prq_push(&call->prq, message, on_finish, user_data);
+ grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
+ NULL);
}
- gpr_mu_unlock(&call->read_mu);
+ unlock(call);
}
-grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
- return CALL_FROM_TOP_ELEM(elem);
+static void finish_send_metadata(grpc_call *call, grpc_op_error status,
+ void *tag) {}
+
+grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
+ void *metadata_read_tag,
+ void *finished_tag, gpr_uint32 flags) {
+ grpc_ioreq reqs[3];
+ legacy_state *ls;
+ grpc_call_error err;
+
+ grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
+ grpc_cq_begin_op(cq, call, GRPC_FINISHED);
+
+ lock(call);
+ ls = get_legacy_state(call);
+ err = bind_cq(call, cq);
+ if (err != GRPC_CALL_OK) goto done;
+
+ ls->finished_tag = finished_tag;
+
+ reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA;
+ reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
+ reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
+ ls->md_out_buffer++;
+ err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL);
+ if (err != GRPC_CALL_OK) goto done;
+
+ reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+ reqs[0].data.recv_metadata = &ls->initial_md_in;
+ err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag);
+ if (err != GRPC_CALL_OK) goto done;
+
+ reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
+ reqs[0].data.recv_metadata = &ls->trailing_md_in;
+ reqs[1].op = GRPC_IOREQ_RECV_STATUS;
+ reqs[1].data.recv_status.details = &ls->details;
+ reqs[1].data.recv_status.details_capacity = &ls->details_capacity;
+ reqs[1].data.recv_status.code = &ls->status;
+ reqs[2].op = GRPC_IOREQ_RECV_CLOSE;
+ err = start_ioreq(call, reqs, 3, finish_status, NULL);
+ if (err != GRPC_CALL_OK) goto done;
+
+done:
+ unlock(call);
+ return err;
+}
+
+grpc_call_error grpc_call_server_accept_old(grpc_call *call,
+ grpc_completion_queue *cq,
+ void *finished_tag) {
+ grpc_ioreq reqs[2];
+ grpc_call_error err;
+ legacy_state *ls;
+
+ /* inform the completion queue of an incoming operation (corresponding to
+ finished_tag) */
+ grpc_cq_begin_op(cq, call, GRPC_FINISHED);
+
+ lock(call);
+ ls = get_legacy_state(call);
+
+ err = bind_cq(call, cq);
+ if (err != GRPC_CALL_OK) return err;
+
+ ls->finished_tag = finished_tag;
+
+ reqs[0].op = GRPC_IOREQ_RECV_STATUS;
+ reqs[0].data.recv_status.details = NULL;
+ reqs[0].data.recv_status.details_capacity = 0;
+ reqs[0].data.recv_status.code = &ls->status;
+ reqs[1].op = GRPC_IOREQ_RECV_CLOSE;
+ err = start_ioreq(call, reqs, 2, finish_status, NULL);
+ unlock(call);
+ return err;
}
-grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) {
- return &call->incoming_metadata;
+static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status,
+ void *tag) {}
+
+grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
+ gpr_uint32 flags) {
+ grpc_ioreq req;
+ grpc_call_error err;
+ legacy_state *ls;
+
+ lock(call);
+ ls = get_legacy_state(call);
+ req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
+ req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
+ req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
+ err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
+ unlock(call);
+
+ return err;
}
-static void call_alarm(void *arg, int success) {
- grpc_call *call = arg;
- if (success) {
- if (call->is_client) {
- grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
- "Deadline Exceeded");
- } else {
- grpc_call_cancel(call);
- }
+void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
+ grpc_call *call = grpc_call_from_top_element(surface_element);
+ lock(call);
+ if (call->read_state < READ_STATE_GOT_INITIAL_METADATA) {
+ call->read_state = READ_STATE_GOT_INITIAL_METADATA;
}
- grpc_call_internal_unref(call);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
+ unlock(call);
}
-void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+static void finish_read_event(void *p, grpc_op_error error) {
+ if (p) grpc_byte_buffer_destroy(p);
+}
- if (call->have_alarm) {
- gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
- }
- grpc_call_internal_ref(call);
- call->have_alarm = 1;
- grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
+static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
+ legacy_state *ls;
+ grpc_byte_buffer *msg;
+ lock(call);
+ ls = get_legacy_state(call);
+ msg = ls->msg_in;
+ grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
+ unlock(call);
}
-grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
- return CALL_STACK_FROM_CALL(call);
+grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) {
+ legacy_state *ls;
+ grpc_ioreq req;
+ grpc_call_error err;
+
+ grpc_cq_begin_op(call->cq, call, GRPC_READ);
+
+ lock(call);
+ ls = get_legacy_state(call);
+ req.op = GRPC_IOREQ_RECV_MESSAGE;
+ req.data.recv_message = &ls->msg_in;
+ err = start_ioreq(call, &req, 1, finish_read, tag);
+ unlock(call);
+ return err;
+}
+
+static void finish_write(grpc_call *call, grpc_op_error status, void *tag) {
+ lock(call);
+ grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out);
+ unlock(call);
+ grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status);
+}
+
+grpc_call_error grpc_call_start_write_old(grpc_call *call,
+ grpc_byte_buffer *byte_buffer,
+ void *tag, gpr_uint32 flags) {
+ grpc_ioreq req;
+ legacy_state *ls;
+ grpc_call_error err;
+
+ grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
+
+ lock(call);
+ ls = get_legacy_state(call);
+ ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
+ req.op = GRPC_IOREQ_SEND_MESSAGE;
+ req.data.send_message = ls->msg_out;
+ err = start_ioreq(call, &req, 1, finish_write, tag);
+ unlock(call);
+
+ return err;
+}
+
+static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) {
+ grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status);
+}
+
+grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) {
+ grpc_ioreq req;
+ grpc_call_error err;
+ grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
+
+ lock(call);
+ req.op = GRPC_IOREQ_SEND_CLOSE;
+ err = start_ioreq(call, &req, 1, finish_finish, tag);
+ unlock(call);
+
+ return err;
}
+grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
+ grpc_status_code status,
+ const char *details,
+ void *tag) {
+ grpc_ioreq reqs[3];
+ grpc_call_error err;
+ legacy_state *ls;
+ grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
+
+ lock(call);
+ ls = get_legacy_state(call);
+ reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
+ reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
+ reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
+ reqs[1].op = GRPC_IOREQ_SEND_STATUS;
+ reqs[1].data.send_status.code = status;
+ /* MEMLEAK */
+ reqs[1].data.send_status.details = gpr_strdup(details);
+ reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
+ err = start_ioreq(call, reqs, 3, finish_finish, tag);
+ unlock(call);
+
+ return err;
+}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 804b387cb1..936fb29f2e 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -38,27 +38,73 @@
#include "src/core/channel/metadata_buffer.h"
#include <grpc/grpc.h>
+/* Primitive operation types - grpc_op's get rewritten into these */
+typedef enum {
+ GRPC_IOREQ_RECV_INITIAL_METADATA,
+ GRPC_IOREQ_RECV_MESSAGE,
+ GRPC_IOREQ_RECV_TRAILING_METADATA,
+ GRPC_IOREQ_RECV_STATUS,
+ GRPC_IOREQ_RECV_CLOSE,
+ GRPC_IOREQ_SEND_INITIAL_METADATA,
+ GRPC_IOREQ_SEND_MESSAGE,
+ GRPC_IOREQ_SEND_TRAILING_METADATA,
+ GRPC_IOREQ_SEND_STATUS,
+ GRPC_IOREQ_SEND_CLOSE,
+ GRPC_IOREQ_OP_COUNT
+} grpc_ioreq_op;
+
+typedef struct {
+ grpc_status_code *code;
+ char **details;
+ size_t *details_capacity;
+} grpc_recv_status_args;
+
+typedef union {
+ grpc_metadata_array *recv_metadata;
+ grpc_byte_buffer **recv_message;
+ grpc_recv_status_args recv_status;
+ struct {
+ size_t count;
+ grpc_metadata *metadata;
+ } send_metadata;
+ grpc_byte_buffer *send_message;
+ struct {
+ grpc_status_code code;
+ char *details;
+ } send_status;
+} grpc_ioreq_data;
+
+typedef struct {
+ grpc_ioreq_op op;
+ grpc_ioreq_data data;
+} grpc_ioreq;
+
+typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
+ grpc_op_error status,
+ void *user_data);
+
grpc_call *grpc_call_create(grpc_channel *channel,
const void *server_transport_data);
void grpc_call_internal_ref(grpc_call *call);
-void grpc_call_internal_unref(grpc_call *call);
+void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
/* Helpers for grpc_client, grpc_server filters to publish received data to
the completion queue/surface layer */
void grpc_call_recv_metadata(grpc_call_element *surface_element,
- grpc_call_op *op);
-void grpc_call_recv_message(
- grpc_call_element *surface_element, grpc_byte_buffer *message,
- void (*on_finish)(void *user_data, grpc_op_error error), void *user_data);
-void grpc_call_recv_finish(grpc_call_element *surface_element,
- int is_full_close);
+ grpc_mdelem *md);
+void grpc_call_recv_message(grpc_call_element *surface_element,
+ grpc_byte_buffer *message);
+void grpc_call_read_closed(grpc_call_element *surface_element);
+void grpc_call_stream_closed(grpc_call_element *surface_element);
void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
+grpc_call_error grpc_call_start_ioreq_and_call_back(
+ grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
+ grpc_ioreq_completion_func on_complete, void *user_data);
-/* Called when it's known that the initial batch of metadata is complete on the
- client side (must not be called on the server) */
-void grpc_call_client_initial_metadata_complete(
+/* Called when it's known that the initial batch of metadata is complete */
+void grpc_call_initial_metadata_complete(
grpc_call_element *surface_element);
void grpc_call_set_deadline(grpc_call_element *surface_element,
@@ -69,10 +115,4 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
-/* Get the metadata buffer. */
-grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call);
-
-void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem,
- gpr_uint32 flags);
-
#endif /* __GRPC_INTERNAL_SURFACE_CALL_H__ */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 93a2c0609d..c33ea923e8 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -51,7 +51,7 @@ struct grpc_channel {
grpc_mdstr *authority_string;
};
-#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
@@ -80,6 +80,7 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
grpc_call *call;
grpc_mdelem *path_mdelem;
grpc_mdelem *authority_mdelem;
+ grpc_call_op op;
if (!channel->is_client) {
gpr_log(GPR_ERROR, "Cannot create a call on the server.");
@@ -91,20 +92,25 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
/* Add :path and :authority headers. */
/* TODO(klempner): Consider optimizing this by stashing mdelems for common
values of method and host. */
- grpc_mdstr_ref(channel->path_string);
path_mdelem = grpc_mdelem_from_metadata_strings(
- channel->metadata_context, channel->path_string,
+ channel->metadata_context, grpc_mdstr_ref(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method));
- grpc_call_add_mdelem(call, path_mdelem, 0);
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.data.metadata = path_mdelem;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ grpc_call_execute_op(call, &op);
grpc_mdstr_ref(channel->authority_string);
authority_mdelem = grpc_mdelem_from_metadata_strings(
channel->metadata_context, channel->authority_string,
grpc_mdstr_from_string(channel->metadata_context, host));
- grpc_call_add_mdelem(call, authority_mdelem, 0);
+ op.data.metadata = authority_mdelem;
+ grpc_call_execute_op(call, &op);
if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) {
- grpc_call_op op;
op.type = GRPC_SEND_DEADLINE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index a7c9b902ed..fa63e855cc 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -56,23 +56,23 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_next_op(elem, op);
break;
case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, op);
+ grpc_call_recv_metadata(elem, op->data.metadata);
break;
case GRPC_RECV_DEADLINE:
gpr_log(GPR_ERROR, "Deadline received by client (ignored)");
break;
case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message, op->done_cb,
- op->user_data);
+ grpc_call_recv_message(elem, op->data.message);
+ op->done_cb(op->user_data, GRPC_OP_OK);
break;
case GRPC_RECV_HALF_CLOSE:
- grpc_call_recv_finish(elem, 0);
+ grpc_call_read_closed(elem);
break;
case GRPC_RECV_FINISH:
- grpc_call_recv_finish(elem, 1);
+ grpc_call_stream_closed(elem);
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
- grpc_call_client_initial_metadata_complete(elem);
+ grpc_call_initial_metadata_complete(elem);
break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 2bf31c50a8..ae3b96035c 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -173,18 +173,6 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
-void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
- grpc_call *call,
- grpc_event_finish_func on_finish,
- void *user_data, grpc_op_error error) {
- event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
- ev->base.data.invoke_accepted = error;
- end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
grpc_call *call,
grpc_event_finish_func on_finish,
@@ -197,6 +185,17 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
+void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ grpc_op_error error) {
+ event *ev;
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ ev = add_locked(cc, GRPC_IOREQ, tag, call, on_finish, user_data);
+ ev->base.data.write_accepted = error;
+ end_op_locked(cc, GRPC_IOREQ);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+}
+
void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_call *call,
grpc_event_finish_func on_finish,
@@ -389,7 +388,7 @@ void grpc_event_finish(grpc_event *base) {
event *ev = (event *)base;
ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
if (ev->base.call) {
- grpc_call_internal_unref(ev->base.call);
+ grpc_call_internal_unref(ev->base.call, 1);
}
gpr_free(ev);
}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 85984075f7..fea8336b63 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -97,6 +97,10 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements);
+void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ grpc_op_error error);
+
void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag);
/* disable polling for some tests */
diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c
index 8975d312ee..7c76bf93d7 100644
--- a/src/core/surface/event_string.c
+++ b/src/core/surface/event_string.c
@@ -87,10 +87,10 @@ char *grpc_event_string(grpc_event *ev) {
gpr_strvec_add(&buf, gpr_strdup(" end-of-stream"));
}
break;
- case GRPC_INVOKE_ACCEPTED:
- gpr_strvec_add(&buf, gpr_strdup("INVOKE_ACCEPTED: "));
+ case GRPC_IOREQ:
+ gpr_strvec_add(&buf, gpr_strdup("IOREQ: "));
addhdr(&buf, ev);
- adderr(&buf, ev->data.invoke_accepted);
+ adderr(&buf, ev->data.ioreq);
break;
case GRPC_WRITE_ACCEPTED:
gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: "));
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 6098ac78de..2f5eff5584 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -50,26 +50,16 @@ typedef struct {
grpc_mdelem *message;
} channel_data;
-static void do_nothing(void *data, grpc_op_error error) {}
-
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
- case GRPC_SEND_START: {
- grpc_call_op set_status_op;
- grpc_mdelem_ref(channeld->message);
- memset(&set_status_op, 0, sizeof(grpc_call_op));
- set_status_op.dir = GRPC_CALL_UP;
- set_status_op.type = GRPC_RECV_METADATA;
- set_status_op.done_cb = do_nothing;
- set_status_op.data.metadata = channeld->message;
- grpc_call_recv_metadata(elem, &set_status_op);
- grpc_call_recv_finish(elem, 1);
+ case GRPC_SEND_START:
+ grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message));
+ grpc_call_stream_closed(elem);
break;
- }
case GRPC_SEND_METADATA:
grpc_mdelem_unref(op->data.metadata);
break;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 9e2e4d5478..a057694f13 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -44,6 +44,7 @@
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
+#include "src/core/transport/metadata.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -63,11 +64,24 @@ typedef struct channel_data channel_data;
struct channel_data {
grpc_server *server;
grpc_channel *channel;
+ grpc_mdstr *path_key;
+ grpc_mdstr *authority_key;
/* linked list of all channels on a server */
channel_data *next;
channel_data *prev;
};
+typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq,
+ grpc_metadata_array *initial_metadata,
+ call_data *calld, void *user_data);
+
+typedef struct {
+ void *user_data;
+ grpc_completion_queue *cq;
+ grpc_metadata_array *initial_metadata;
+ new_call_cb cb;
+} requested_call;
+
struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
@@ -76,9 +90,9 @@ struct grpc_server {
gpr_mu mu;
- void **tags;
- size_t ntags;
- size_t tag_cap;
+ requested_call *requested_calls;
+ size_t requested_call_count;
+ size_t requested_call_capacity;
gpr_uint8 shutdown;
gpr_uint8 have_shutdown_tag;
@@ -107,11 +121,17 @@ typedef enum {
ZOMBIED
} call_state;
+typedef struct legacy_data { grpc_metadata_array *initial_metadata; } legacy_data;
+
struct call_data {
grpc_call *call;
call_state state;
gpr_timespec deadline;
+ grpc_mdstr *path;
+ grpc_mdstr *host;
+
+ legacy_data *legacy;
gpr_uint8 included[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
@@ -179,7 +199,7 @@ static void server_unref(grpc_server *server) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
gpr_free(server->channel_filters);
- gpr_free(server->tags);
+ gpr_free(server->requested_calls);
gpr_free(server);
}
}
@@ -210,62 +230,37 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(finish_destroy_channel, chand);
}
-static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) {
- grpc_call *call = calld->call;
- grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call);
- size_t count = grpc_metadata_buffer_count(mdbuf);
- grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf);
- const char *host = NULL;
- const char *method = NULL;
- size_t i;
-
- for (i = 0; i < count; i++) {
- if (0 == strcmp(elements[i].key, ":authority")) {
- host = elements[i].value;
- } else if (0 == strcmp(elements[i].key, ":path")) {
- method = elements[i].value;
- }
- }
-
- grpc_call_internal_ref(call);
- grpc_cq_end_new_rpc(server->cq, tag, call,
- grpc_metadata_buffer_cleanup_elements, elements, method,
- host, calld->deadline, count, elements);
-}
-
static void start_new_rpc(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_server *server = chand->server;
gpr_mu_lock(&server->mu);
- if (server->ntags) {
+ if (server->requested_call_count > 0) {
+ requested_call rc = server->requested_calls[--server->requested_call_count];
calld->state = ACTIVATED;
- queue_new_rpc(server, calld, server->tags[--server->ntags]);
+ gpr_mu_unlock(&server->mu);
+ rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data);
} else {
calld->state = PENDING;
call_list_join(server, calld, PENDING_START);
+ gpr_mu_unlock(&server->mu);
}
- gpr_mu_unlock(&server->mu);
}
static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
-static void finish_rpc(grpc_call_element *elem, int is_full_close) {
+static void stream_closed(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->server->mu);
switch (calld->state) {
case ACTIVATED:
- grpc_call_recv_finish(elem, is_full_close);
+ grpc_call_stream_closed(elem);
break;
case PENDING:
- if (!is_full_close) {
- grpc_call_recv_finish(elem, is_full_close);
- break;
- }
call_list_remove(chand->server, calld, PENDING_START);
/* fallthrough intended */
case NOT_STARTED:
@@ -278,25 +273,57 @@ static void finish_rpc(grpc_call_element *elem, int is_full_close) {
gpr_mu_unlock(&chand->server->mu);
}
+static void read_closed(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->server->mu);
+ switch (calld->state) {
+ case ACTIVATED:
+ case PENDING:
+ grpc_call_read_closed(elem);
+ break;
+ case NOT_STARTED:
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ break;
+ case ZOMBIED:
+ break;
+ }
+ gpr_mu_unlock(&chand->server->mu);
+}
+
static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
grpc_call_op *op) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ grpc_mdelem *md;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, op);
+ md = op->data.metadata;
+ if (md->key == chand->path_key) {
+ calld->path = grpc_mdstr_ref(md->value);
+ grpc_mdelem_unref(md);
+ } else if (md->key == chand->authority_key) {
+ calld->host = grpc_mdstr_ref(md->value);
+ grpc_mdelem_unref(md);
+ } else {
+ grpc_call_recv_metadata(elem, md);
+ }
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
start_new_rpc(elem);
+ grpc_call_initial_metadata_complete(elem);
break;
case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message, op->done_cb,
- op->user_data);
+ grpc_call_recv_message(elem, op->data.message);
+ op->done_cb(op->user_data, GRPC_OP_OK);
break;
case GRPC_RECV_HALF_CLOSE:
- finish_rpc(elem, 0);
+ read_closed(elem);
break;
case GRPC_RECV_FINISH:
- finish_rpc(elem, 1);
+ stream_closed(elem);
break;
case GRPC_RECV_DEADLINE:
grpc_call_set_deadline(elem, op->data.deadline);
@@ -371,6 +398,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
int i;
gpr_mu_lock(&chand->server->mu);
@@ -383,6 +411,19 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
gpr_mu_unlock(&chand->server->mu);
+ if (calld->host) {
+ grpc_mdstr_unref(calld->host);
+ }
+ if (calld->path) {
+ grpc_mdstr_unref(calld->path);
+ }
+
+ if (calld->legacy) {
+ gpr_free(calld->legacy->initial_metadata->metadata);
+ gpr_free(calld->legacy->initial_metadata);
+ gpr_free(calld->legacy);
+ }
+
server_unref(chand->server);
}
@@ -395,6 +436,8 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT(!is_last);
chand->server = NULL;
chand->channel = NULL;
+ chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
+ chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
chand->next = chand->prev = chand;
}
@@ -406,6 +449,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
gpr_mu_unlock(&chand->server->mu);
+ grpc_mdstr_unref(chand->path_key);
+ grpc_mdstr_unref(chand->authority_key);
server_unref(chand->server);
}
}
@@ -413,17 +458,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
static const grpc_channel_filter server_surface_filter = {
call_op, channel_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "server", };
-
-static void early_terminate_requested_calls(grpc_completion_queue *cq,
- void **tags, size_t ntags) {
- size_t i;
-
- for (i = 0; i < ntags; i++) {
- grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL,
- gpr_inf_past, 0, NULL);
- }
-}
+ init_channel_elem, destroy_channel_elem, "server",
+};
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_channel_filter **filters,
@@ -517,8 +553,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
void *shutdown_tag) {
listener *l;
- void **tags;
- size_t ntags;
+ requested_call *requested_calls;
+ size_t requested_call_count;
channel_data **channels;
channel_data *c;
size_t nchannels;
@@ -547,10 +583,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
i++;
}
- tags = server->tags;
- ntags = server->ntags;
- server->tags = NULL;
- server->ntags = 0;
+ requested_calls = server->requested_calls;
+ requested_call_count = server->requested_call_count;
+ server->requested_calls = NULL;
+ server->requested_call_count = 0;
server->shutdown = 1;
server->have_shutdown_tag = have_shutdown_tag;
@@ -579,8 +615,12 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
gpr_free(channels);
/* terminate all the requested calls */
- early_terminate_requested_calls(server->cq, tags, ntags);
- gpr_free(tags);
+ for (i = 0; i < requested_call_count; i++) {
+ requested_calls[i].cb(server, requested_calls[i].cq,
+ requested_calls[i].initial_metadata, NULL,
+ requested_calls[i].user_data);
+ }
+ gpr_free(requested_calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@@ -625,36 +665,105 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
server->listeners = l;
}
-grpc_call_error grpc_server_request_call_old(grpc_server *server,
- void *tag_new) {
+static grpc_call_error queue_call_request(grpc_server *server,
+ grpc_completion_queue *cq,
+ grpc_metadata_array *initial_metadata,
+ new_call_cb cb, void *user_data) {
call_data *calld;
-
- grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
-
+ requested_call *rc;
gpr_mu_lock(&server->mu);
-
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
- early_terminate_requested_calls(server->cq, &tag_new, 1);
+ cb(server, cq, initial_metadata, NULL, user_data);
return GRPC_CALL_OK;
}
-
calld = call_list_remove_head(server, PENDING_START);
if (calld) {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
- queue_new_rpc(server, calld, tag_new);
+ gpr_mu_unlock(&server->mu);
+ cb(server, cq, initial_metadata, calld, user_data);
+ return GRPC_CALL_OK;
} else {
- if (server->tag_cap == server->ntags) {
- server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1);
- server->tags =
- gpr_realloc(server->tags, sizeof(void *) * server->tag_cap);
+ if (server->requested_call_count == server->requested_call_capacity) {
+ server->requested_call_capacity =
+ GPR_MAX(server->requested_call_capacity + 8,
+ server->requested_call_capacity * 2);
+ server->requested_calls =
+ gpr_realloc(server->requested_calls,
+ sizeof(requested_call) * server->requested_call_capacity);
}
- server->tags[server->ntags++] = tag_new;
+ rc = &server->requested_calls[server->requested_call_count++];
+ rc->cb = cb;
+ rc->cq = cq;
+ rc->user_data = user_data;
+ rc->initial_metadata = initial_metadata;
+ gpr_mu_unlock(&server->mu);
+ return GRPC_CALL_OK;
}
- gpr_mu_unlock(&server->mu);
+}
+
+static void begin_request(grpc_server *server, grpc_completion_queue *cq,
+ grpc_metadata_array *initial_metadata,
+ call_data *call_data, void *tag) {
+ abort();
+}
- return GRPC_CALL_OK;
+grpc_call_error grpc_server_request_call(
+ grpc_server *server, grpc_call_details *details,
+ grpc_metadata_array *initial_metadata, grpc_completion_queue *cq,
+ void *tag) {
+ grpc_cq_begin_op(cq, NULL, GRPC_IOREQ);
+ return queue_call_request(server, cq, initial_metadata, begin_request, tag);
+}
+
+static void publish_legacy_request(grpc_call *call, grpc_op_error status,
+ void *tag) {
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_server *server = chand->server;
+
+ if (status == GRPC_OP_OK) {
+ grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
+ grpc_mdstr_as_c_string(calld->path),
+ grpc_mdstr_as_c_string(calld->host), calld->deadline,
+ calld->legacy->initial_metadata->count,
+ calld->legacy->initial_metadata->metadata);
+ } else {
+ abort();
+ }
+}
+
+static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
+ grpc_metadata_array *initial_metadata,
+ call_data *calld, void *tag) {
+ grpc_ioreq req;
+ if (!calld) {
+ gpr_free(initial_metadata);
+ grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL,
+ gpr_inf_past, 0, NULL);
+ return;
+ }
+ req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+ req.data.recv_metadata = initial_metadata;
+ calld->legacy = gpr_malloc(sizeof(legacy_data));
+ memset(calld->legacy, 0, sizeof(legacy_data));
+ calld->legacy->initial_metadata = initial_metadata;
+ grpc_call_internal_ref(calld->call);
+ grpc_call_start_ioreq_and_call_back(calld->call, &req, 1,
+ publish_legacy_request, tag);
+}
+
+grpc_call_error grpc_server_request_call_old(grpc_server *server,
+ void *tag_new) {
+ grpc_metadata_array *client_metadata =
+ gpr_malloc(sizeof(grpc_metadata_array));
+ memset(client_metadata, 0, sizeof(*client_metadata));
+ grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
+ return queue_call_request(server, server->cq, client_metadata,
+ begin_legacy_request, tag_new);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index c4e3ca516d..2af18c3035 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -432,7 +432,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
framer_state *st) {
- char timeout_str[32];
+ char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str);
hpack_enc(c, grpc_mdelem_from_metadata_strings(
c->mdctx, grpc_mdstr_ref(c->timeout_key_str),