aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nnoble@google.com>2014-11-26 16:33:03 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2014-11-26 16:33:03 -0800
commitb7ebd3b8c6fe39f99c40b10c1b563e4adb607b6c (patch)
treec1decf819492d455ec81cd471942c5516138f825 /src/core/surface
parent0e905e63db21bcdd85d3d1af051fcdc5bb5caa38 (diff)
Initial import.
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer.c68
-rw-r--r--src/core/surface/byte_buffer_reader.c74
-rw-r--r--src/core/surface/call.c835
-rw-r--r--src/core/surface/call.h73
-rw-r--r--src/core/surface/channel.c152
-rw-r--r--src/core/surface/channel.h51
-rw-r--r--src/core/surface/channel_create.c213
-rw-r--r--src/core/surface/client.c115
-rw-r--r--src/core/surface/client.h41
-rw-r--r--src/core/surface/completion_queue.c392
-rw-r--r--src/core/surface/completion_queue.h102
-rw-r--r--src/core/surface/event_string.c119
-rw-r--r--src/core/surface/event_string.h42
-rw-r--r--src/core/surface/init.c46
-rw-r--r--src/core/surface/lame_client.c94
-rw-r--r--src/core/surface/lame_client.h42
-rw-r--r--src/core/surface/secure_channel_create.c243
-rw-r--r--src/core/surface/secure_server_create.c57
-rw-r--r--src/core/surface/server.c609
-rw-r--r--src/core/surface/server.h62
-rw-r--r--src/core/surface/server_chttp2.c123
-rw-r--r--src/core/surface/server_create.c41
-rw-r--r--src/core/surface/surface_em.c55
-rw-r--r--src/core/surface/surface_em.h47
-rw-r--r--src/core/surface/surface_trace.h54
25 files changed, 3750 insertions, 0 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
new file mode 100644
index 0000000000..27a6c6e33d
--- /dev/null
+++ b/src/core/surface/byte_buffer.c
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/byte_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
+ size_t i;
+ grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
+
+ bb->type = GRPC_BB_SLICE_BUFFER;
+ gpr_slice_buffer_init(&bb->data.slice_buffer);
+ for (i = 0; i < nslices; i++) {
+ gpr_slice_ref(slices[i]);
+ gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]);
+ }
+
+ return bb;
+}
+
+void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
+ switch (bb->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ gpr_slice_buffer_destroy(&bb->data.slice_buffer);
+ break;
+ }
+ free(bb);
+}
+
+size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) {
+ switch (bb->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ return bb->data.slice_buffer.length;
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
new file mode 100644
index 0000000000..18500b83e8
--- /dev/null
+++ b/src/core/surface/byte_buffer_reader.c
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/byte_buffer_reader.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/byte_buffer.h>
+
+grpc_byte_buffer_reader *grpc_byte_buffer_reader_create(
+ grpc_byte_buffer *buffer) {
+ grpc_byte_buffer_reader *reader = malloc(sizeof(grpc_byte_buffer_reader));
+ reader->buffer = buffer;
+ switch (buffer->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ reader->current.index = 0;
+ }
+ return reader;
+}
+
+int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
+ gpr_slice *slice) {
+ grpc_byte_buffer *buffer = reader->buffer;
+ gpr_slice_buffer *slice_buffer;
+ switch (buffer->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ slice_buffer = &buffer->data.slice_buffer;
+ if (reader->current.index < slice_buffer->count) {
+ *slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]);
+ reader->current.index += 1;
+ return 1;
+ } else {
+ return 0;
+ }
+ break;
+ }
+ return 0;
+}
+
+void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
+ free(reader);
+}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
new file mode 100644
index 0000000000..63d408d2d5
--- /dev/null
+++ b/src/core/surface/call.c
@@ -0,0 +1,835 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/call.h"
+#include "src/core/channel/channel_stack.h"
+#include "src/core/channel/metadata_buffer.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+#include "src/core/surface/channel.h"
+#include "src/core/surface/completion_queue.h"
+#include "src/core/surface/surface_em.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define INVALID_TAG ((void *)0xdeadbeef)
+
+/* Pending read queue
+
+ This data structure tracks reads that need to be presented to the completion
+ queue but are waiting for the application to ask for them. */
+
+#define INITIAL_PENDING_READ_COUNT 4
+
+typedef struct {
+ grpc_byte_buffer *byte_buffer;
+ void *user_data;
+ void (*on_finish)(void *user_data, grpc_op_error error);
+} pending_read;
+
+/* TODO(ctiller): inline an element or two into this struct to avoid per-call
+ allocations */
+typedef struct {
+ pending_read *data;
+ size_t count;
+ size_t capacity;
+} pending_read_array;
+
+typedef struct {
+ size_t drain_pos;
+ pending_read_array filling;
+ pending_read_array draining;
+} pending_read_queue;
+
+static void pra_init(pending_read_array *array) {
+ array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT);
+ array->count = 0;
+ array->capacity = INITIAL_PENDING_READ_COUNT;
+}
+
+static void pra_destroy(pending_read_array *array,
+ size_t finish_starting_from) {
+ size_t i;
+ for (i = finish_starting_from; i < array->count; i++) {
+ array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR);
+ }
+ gpr_free(array->data);
+}
+
+/* Append an operation to an array, expanding as needed */
+static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer,
+ void (*on_finish)(void *user_data, grpc_op_error error),
+ void *user_data) {
+ if (a->count == a->capacity) {
+ a->capacity *= 2;
+ a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity);
+ }
+ a->data[a->count].byte_buffer = buffer;
+ a->data[a->count].user_data = user_data;
+ a->data[a->count].on_finish = on_finish;
+ a->count++;
+}
+
+static void prq_init(pending_read_queue *q) {
+ q->drain_pos = 0;
+ pra_init(&q->filling);
+ pra_init(&q->draining);
+}
+
+static void prq_destroy(pending_read_queue *q) {
+ pra_destroy(&q->filling, 0);
+ pra_destroy(&q->draining, q->drain_pos);
+}
+
+static int prq_is_empty(pending_read_queue *q) {
+ return (q->drain_pos == q->draining.count && q->filling.count == 0);
+}
+
+static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer,
+ void (*on_finish)(void *user_data, grpc_op_error error),
+ void *user_data) {
+ pra_push(&q->filling, buffer, on_finish, user_data);
+}
+
+/* Take the first queue element and move it to the completion queue. Do nothing
+ if q is empty */
+static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call,
+ grpc_completion_queue *cq) {
+ pending_read_array temp_array;
+ pending_read *pr;
+
+ if (q->drain_pos == q->draining.count) {
+ if (q->filling.count == 0) {
+ return 0;
+ }
+ q->draining.count = 0;
+ q->drain_pos = 0;
+ /* swap arrays */
+ temp_array = q->filling;
+ q->filling = q->draining;
+ q->draining = temp_array;
+ }
+
+ pr = q->draining.data + q->drain_pos;
+ q->drain_pos++;
+ grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data,
+ pr->byte_buffer);
+ return 1;
+}
+
+/* grpc_call proper */
+
+/* the state of a call, based upon which functions have been called against
+ said call */
+typedef enum { CALL_CREATED, CALL_STARTED, CALL_FINISHED } call_state;
+
+struct grpc_call {
+ grpc_completion_queue *cq;
+ grpc_channel *channel;
+ grpc_mdctx *metadata_context;
+
+ call_state state;
+ gpr_uint8 is_client;
+ gpr_uint8 have_write;
+ grpc_metadata_buffer incoming_metadata;
+
+ /* protects variables in this section */
+ gpr_mu read_mu;
+ gpr_uint8 reads_done;
+ gpr_uint8 received_finish;
+ gpr_uint8 received_metadata;
+ gpr_uint8 have_read;
+ gpr_uint8 have_alarm;
+ /* The current outstanding read message tag (only valid if have_read == 1) */
+ void *read_tag;
+ void *metadata_tag;
+ void *finished_tag;
+ pending_read_queue prq;
+
+ grpc_em_alarm alarm;
+
+ /* The current outstanding send message/context/invoke/end tag (only valid if
+ have_write == 1) */
+ void *write_tag;
+
+ /* The final status of the call */
+ grpc_status_code status_code;
+ grpc_mdstr *status_details;
+
+ gpr_refcount internal_refcount;
+};
+
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
+#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
+#define CALL_ELEM_FROM_CALL(call, idx) \
+ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
+#define CALL_FROM_TOP_ELEM(top_elem) \
+ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
+
+static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
+
+grpc_call *grpc_call_create(grpc_channel *channel,
+ const void *server_transport_data) {
+ grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
+ grpc_call *call =
+ gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
+ call->cq = NULL;
+ call->channel = channel;
+ grpc_channel_internal_ref(channel);
+ call->metadata_context = grpc_channel_get_metadata_context(channel);
+ call->state = CALL_CREATED;
+ call->is_client = (server_transport_data == NULL);
+ call->write_tag = INVALID_TAG;
+ call->read_tag = INVALID_TAG;
+ call->metadata_tag = INVALID_TAG;
+ call->finished_tag = INVALID_TAG;
+ call->have_read = 0;
+ call->have_write = 0;
+ call->have_alarm = 0;
+ call->received_metadata = 0;
+ call->status_code =
+ server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
+ call->status_details = NULL;
+ call->received_finish = 0;
+ call->reads_done = 0;
+ grpc_metadata_buffer_init(&call->incoming_metadata);
+ gpr_ref_init(&call->internal_refcount, 1);
+ grpc_call_stack_init(channel_stack, server_transport_data,
+ CALL_STACK_FROM_CALL(call));
+ prq_init(&call->prq);
+ gpr_mu_init(&call->read_mu);
+ return call;
+}
+
+void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
+
+void grpc_call_internal_unref(grpc_call *c) {
+ if (gpr_unref(&c->internal_refcount)) {
+ grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
+ grpc_metadata_buffer_destroy(&c->incoming_metadata, GRPC_OP_OK);
+ if (c->status_details) {
+ grpc_mdstr_unref(c->status_details);
+ }
+ prq_destroy(&c->prq);
+ gpr_mu_destroy(&c->read_mu);
+ grpc_channel_internal_unref(c->channel);
+ gpr_free(c);
+ }
+}
+
+void grpc_call_destroy(grpc_call *c) {
+ gpr_mu_lock(&c->read_mu);
+ if (c->have_alarm) {
+ void *arg_was;
+ grpc_em_alarm_cancel(&c->alarm, &arg_was);
+ c->have_alarm = 0;
+ }
+ gpr_mu_unlock(&c->read_mu);
+ grpc_call_internal_unref(c);
+}
+
+grpc_call_error grpc_call_cancel(grpc_call *c) {
+ grpc_call_element *elem;
+ grpc_call_op op;
+
+ op.type = GRPC_CANCEL_OP;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+
+ elem = CALL_ELEM_FROM_CALL(c, 0);
+ elem->filter->call_op(elem, &op);
+
+ return GRPC_CALL_OK;
+}
+
+void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
+ grpc_call_element *elem;
+ GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, op);
+}
+
+grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
+ gpr_uint32 flags) {
+ grpc_call_element *elem;
+ grpc_call_op op;
+
+ if (call->state >= CALL_STARTED) {
+ return GRPC_CALL_ERROR_ALREADY_INVOKED;
+ }
+
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ op.data.metadata = grpc_mdelem_from_string_and_buffer(
+ call->metadata_context, metadata->key, (gpr_uint8 *)metadata->value,
+ metadata->value_length);
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, &op);
+
+ return GRPC_CALL_OK;
+}
+
+static void done_invoke(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void finish_call(grpc_call *call) {
+ grpc_status status;
+ status.code = call->status_code;
+ status.details = call->status_details
+ ? (char *)grpc_mdstr_as_c_string(call->status_details)
+ : NULL;
+ grpc_cq_end_finished(call->cq, call->finished_tag, call, NULL, NULL, status);
+}
+
+grpc_call_error grpc_call_start_invoke(grpc_call *call,
+ grpc_completion_queue *cq,
+ void *invoke_accepted_tag,
+ void *metadata_read_tag,
+ void *finished_tag, gpr_uint32 flags) {
+ grpc_call_element *elem;
+ grpc_call_op op;
+
+ /* validate preconditions */
+ if (!call->is_client) {
+ gpr_log(GPR_ERROR, "can only call %s on clients", __FUNCTION__);
+ return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ }
+
+ if (call->state >= CALL_STARTED || call->cq) {
+ gpr_log(GPR_ERROR, "call is already invoked");
+ return GRPC_CALL_ERROR_ALREADY_INVOKED;
+ }
+
+ if (call->have_write) {
+ gpr_log(GPR_ERROR, "can only have one pending write operation at a time");
+ return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+ }
+
+ if (call->have_read) {
+ gpr_log(GPR_ERROR, "can only have one pending read operation at a time");
+ return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+ }
+
+ if (flags & GRPC_WRITE_NO_COMPRESS) {
+ return GRPC_CALL_ERROR_INVALID_FLAGS;
+ }
+
+ /* inform the completion queue of an incoming operation */
+ grpc_cq_begin_op(cq, call, GRPC_FINISHED);
+ grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
+ grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED);
+
+ gpr_mu_lock(&call->read_mu);
+
+ /* update state */
+ call->cq = cq;
+ call->state = CALL_STARTED;
+ call->finished_tag = finished_tag;
+
+ if (call->received_finish) {
+ /* handle early cancellation */
+ grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL,
+ GRPC_OP_ERROR);
+ grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL,
+ NULL, 0, NULL);
+ finish_call(call);
+
+ /* early out.. unlock & return */
+ gpr_mu_unlock(&call->read_mu);
+ return GRPC_CALL_OK;
+ }
+
+ call->write_tag = invoke_accepted_tag;
+ call->metadata_tag = metadata_read_tag;
+
+ call->have_write = 1;
+
+ gpr_mu_unlock(&call->read_mu);
+
+ /* call down the filter stack */
+ op.type = GRPC_SEND_START;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.done_cb = done_invoke;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, &op);
+
+ return GRPC_CALL_OK;
+}
+
+grpc_call_error grpc_call_accept(grpc_call *call, grpc_completion_queue *cq,
+ void *finished_tag, gpr_uint32 flags) {
+ grpc_call_element *elem;
+ grpc_call_op op;
+
+ /* validate preconditions */
+ if (call->is_client) {
+ gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__);
+ return GRPC_CALL_ERROR_NOT_ON_CLIENT;
+ }
+
+ if (call->state >= CALL_STARTED) {
+ gpr_log(GPR_ERROR, "call is already invoked");
+ return GRPC_CALL_ERROR_ALREADY_INVOKED;
+ }
+
+ if (flags & GRPC_WRITE_NO_COMPRESS) {
+ return GRPC_CALL_ERROR_INVALID_FLAGS;
+ }
+
+ /* inform the completion queue of an incoming operation (corresponding to
+ finished_tag) */
+ grpc_cq_begin_op(cq, call, GRPC_FINISHED);
+
+ /* update state */
+ gpr_mu_lock(&call->read_mu);
+ call->state = CALL_STARTED;
+ call->cq = cq;
+ call->finished_tag = finished_tag;
+ if (prq_is_empty(&call->prq) && call->received_finish) {
+ finish_call(call);
+
+ /* early out.. unlock & return */
+ gpr_mu_unlock(&call->read_mu);
+ return GRPC_CALL_OK;
+ }
+ gpr_mu_unlock(&call->read_mu);
+
+ /* call down */
+ op.type = GRPC_SEND_START;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, &op);
+
+ return GRPC_CALL_OK;
+}
+
+static void done_writes_done(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void done_write(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+void grpc_call_client_initial_metadata_complete(
+ grpc_call_element *surface_element) {
+ grpc_call *call = grpc_call_from_top_element(surface_element);
+ size_t count;
+ grpc_metadata *elements;
+
+ gpr_mu_lock(&call->read_mu);
+ count = grpc_metadata_buffer_count(&call->incoming_metadata);
+ elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata);
+
+ GPR_ASSERT(!call->received_metadata);
+ grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call,
+ grpc_metadata_buffer_cleanup_elements,
+ elements, count, elements);
+ call->received_metadata = 1;
+ call->metadata_tag = INVALID_TAG;
+ gpr_mu_unlock(&call->read_mu);
+}
+
+static void request_more_data(grpc_call *call) {
+ grpc_call_element *elem;
+ grpc_call_op op;
+
+ /* call down */
+ op.type = GRPC_REQUEST_DATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, &op);
+}
+
+grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
+ gpr_uint8 request_more = 0;
+
+ switch (call->state) {
+ case CALL_CREATED:
+ return GRPC_CALL_ERROR_NOT_INVOKED;
+ case CALL_STARTED:
+ break;
+ case CALL_FINISHED:
+ return GRPC_CALL_ERROR_ALREADY_FINISHED;
+ }
+
+ gpr_mu_lock(&call->read_mu);
+
+ if (call->have_read) {
+ gpr_mu_unlock(&call->read_mu);
+ return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+ }
+
+ grpc_cq_begin_op(call->cq, call, GRPC_READ);
+
+ if (!prq_pop_to_cq(&call->prq, tag, call, call->cq)) {
+ if (call->reads_done) {
+ grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL);
+ } else {
+ call->read_tag = tag;
+ call->have_read = 1;
+ request_more = 1;
+ }
+ } else if (prq_is_empty(&call->prq) && call->received_finish) {
+ finish_call(call);
+ }
+
+ gpr_mu_unlock(&call->read_mu);
+
+ if (request_more) {
+ request_more_data(call);
+ }
+
+ return GRPC_CALL_OK;
+}
+
+grpc_call_error grpc_call_start_write(grpc_call *call,
+ grpc_byte_buffer *byte_buffer, void *tag,
+ gpr_uint32 flags) {
+ grpc_call_element *elem;
+ grpc_call_op op;
+
+ switch (call->state) {
+ case CALL_CREATED:
+ return GRPC_CALL_ERROR_NOT_INVOKED;
+ case CALL_STARTED:
+ break;
+ case CALL_FINISHED:
+ return GRPC_CALL_ERROR_ALREADY_FINISHED;
+ }
+
+ if (call->have_write) {
+ return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+ }
+
+ grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
+
+ /* for now we do no buffering, so a NULL byte_buffer can have no impact
+ on our behavior -- succeed immediately */
+ /* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a
+ flush, and that flush should be propogated down from here */
+ if (byte_buffer == NULL) {
+ grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, GRPC_OP_OK);
+ return GRPC_CALL_OK;
+ }
+
+ call->write_tag = tag;
+ call->have_write = 1;
+
+ op.type = GRPC_SEND_MESSAGE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.done_cb = done_write;
+ op.user_data = call;
+ op.data.message = byte_buffer;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, &op);
+
+ return GRPC_CALL_OK;
+}
+
+grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
+ grpc_call_element *elem;
+ grpc_call_op op;
+
+ if (!call->is_client) {
+ return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ }
+
+ switch (call->state) {
+ case CALL_CREATED:
+ return GRPC_CALL_ERROR_NOT_INVOKED;
+ case CALL_FINISHED:
+ return GRPC_CALL_ERROR_ALREADY_FINISHED;
+ case CALL_STARTED:
+ break;
+ }
+
+ if (call->have_write) {
+ return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+ }
+
+ grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
+
+ call->write_tag = tag;
+ call->have_write = 1;
+
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, &op);
+
+ return GRPC_CALL_OK;
+}
+
+grpc_call_error grpc_call_start_write_status(grpc_call *call,
+ grpc_status status, void *tag) {
+ grpc_call_element *elem;
+ grpc_call_op op;
+
+ if (call->is_client) {
+ return GRPC_CALL_ERROR_NOT_ON_CLIENT;
+ }
+
+ switch (call->state) {
+ case CALL_CREATED:
+ return GRPC_CALL_ERROR_NOT_INVOKED;
+ case CALL_FINISHED:
+ return GRPC_CALL_ERROR_ALREADY_FINISHED;
+ case CALL_STARTED:
+ break;
+ }
+
+ if (call->have_write) {
+ return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+ }
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+
+ if (status.details && status.details[0]) {
+ grpc_mdelem *md = grpc_mdelem_from_strings(call->metadata_context,
+ "grpc-message", status.details);
+
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ op.data.metadata = md;
+ elem->filter->call_op(elem, &op);
+ }
+
+ /* always send status */
+ {
+ grpc_mdelem *md;
+ char buffer[32];
+ sprintf(buffer, "%d", status.code);
+ md =
+ grpc_mdelem_from_strings(call->metadata_context, "grpc-status", buffer);
+
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ op.data.metadata = md;
+ elem->filter->call_op(elem, &op);
+ }
+
+ grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
+
+ call->state = CALL_FINISHED;
+ call->write_tag = tag;
+ call->have_write = 1;
+
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem->filter->call_op(elem, &op);
+
+ return GRPC_CALL_OK;
+}
+
+/* we offset status by a small amount when storing it into transport metadata
+ as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
+ */
+#define STATUS_OFFSET 1
+static void destroy_status(void *ignored) {}
+
+static gpr_uint32 decode_status(grpc_mdelem *md) {
+ gpr_uint32 status;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ if (user_data) {
+ status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
+ } else {
+ if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
+ GPR_SLICE_LENGTH(md->value->slice),
+ &status)) {
+ status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
+ }
+ grpc_mdelem_set_user_data(md, destroy_status,
+ (void *)(gpr_intptr)(status + STATUS_OFFSET));
+ }
+ return status;
+}
+
+void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ grpc_mdelem *md = op->data.metadata;
+ grpc_mdstr *key = md->key;
+ if (key == grpc_channel_get_status_string(call->channel)) {
+ call->status_code = decode_status(md);
+ grpc_mdelem_unref(md);
+ op->done_cb(op->user_data, GRPC_OP_OK);
+ } else if (key == grpc_channel_get_message_string(call->channel)) {
+ if (call->status_details) {
+ grpc_mdstr_unref(call->status_details);
+ }
+ call->status_details = grpc_mdstr_ref(md->value);
+ grpc_mdelem_unref(md);
+ op->done_cb(op->user_data, GRPC_OP_OK);
+ } else {
+ grpc_metadata_buffer_queue(&call->incoming_metadata, op);
+ }
+}
+
+void grpc_call_recv_finish(grpc_call_element *elem, int is_full_close) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+
+ gpr_mu_lock(&call->read_mu);
+
+ if (call->have_read) {
+ grpc_cq_end_read(call->cq, call->read_tag, call, do_nothing, NULL, NULL);
+ call->read_tag = INVALID_TAG;
+ call->have_read = 0;
+ }
+ if (call->is_client && !call->received_metadata && call->cq) {
+ size_t count;
+ grpc_metadata *elements;
+
+ call->received_metadata = 1;
+
+ count = grpc_metadata_buffer_count(&call->incoming_metadata);
+ elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata);
+ grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call,
+ grpc_metadata_buffer_cleanup_elements,
+ elements, count, elements);
+ }
+ if (is_full_close) {
+ if (call->have_alarm) {
+ void *arg_was;
+ grpc_em_alarm_cancel(&call->alarm, &arg_was);
+ call->have_alarm = 0;
+ }
+ call->received_finish = 1;
+ if (prq_is_empty(&call->prq) && call->cq != NULL) {
+ finish_call(call);
+ }
+ } else {
+ call->reads_done = 1;
+ }
+ gpr_mu_unlock(&call->read_mu);
+}
+
+void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *message,
+ void (*on_finish)(void *user_data,
+ grpc_op_error error),
+ void *user_data) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+
+ gpr_mu_lock(&call->read_mu);
+ if (call->have_read) {
+ grpc_cq_end_read(call->cq, call->read_tag, call, on_finish, user_data,
+ message);
+ call->read_tag = INVALID_TAG;
+ call->have_read = 0;
+ } else {
+ prq_push(&call->prq, message, on_finish, user_data);
+ }
+ gpr_mu_unlock(&call->read_mu);
+}
+
+grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
+ return CALL_FROM_TOP_ELEM(elem);
+}
+
+grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) {
+ return &call->incoming_metadata;
+}
+
+static void call_alarm(void *arg, grpc_em_cb_status status) {
+ grpc_call *call = arg;
+ if (status == GRPC_CALLBACK_SUCCESS) {
+ grpc_call_cancel(call);
+ }
+ grpc_call_internal_unref(call);
+}
+
+void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+
+ if (call->have_alarm) {
+ gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
+ }
+ grpc_call_internal_ref(call);
+ call->have_alarm = 1;
+ grpc_em_alarm_init(&call->alarm, grpc_surface_em(), call_alarm, call);
+ grpc_em_alarm_add(&call->alarm, deadline);
+}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
new file mode 100644
index 0000000000..2c785a59fc
--- /dev/null
+++ b/src/core/surface/call.h
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_CALL_H__
+#define __GRPC_INTERNAL_SURFACE_CALL_H__
+
+#include "src/core/channel/channel_stack.h"
+#include "src/core/channel/metadata_buffer.h"
+#include <grpc/grpc.h>
+
+grpc_call *grpc_call_create(grpc_channel *channel,
+ const void *server_transport_data);
+
+void grpc_call_internal_ref(grpc_call *call);
+void grpc_call_internal_unref(grpc_call *call);
+
+/* Helpers for grpc_client, grpc_server filters to publish received data to
+ the completion queue/surface layer */
+void grpc_call_recv_metadata(grpc_call_element *surface_element,
+ grpc_call_op *op);
+void grpc_call_recv_message(
+ grpc_call_element *surface_element, grpc_byte_buffer *message,
+ void (*on_finish)(void *user_data, grpc_op_error error), void *user_data);
+void grpc_call_recv_finish(grpc_call_element *surface_element,
+ int is_full_close);
+
+void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
+
+/* Called when it's known that the initial batch of metadata is complete on the
+ client side (must not be called on the server) */
+void grpc_call_client_initial_metadata_complete(
+ grpc_call_element *surface_element);
+
+void grpc_call_set_deadline(grpc_call_element *surface_element,
+ gpr_timespec deadline);
+
+/* Given the top call_element, get the call object. */
+grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
+
+/* Get the metadata buffer. */
+grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call);
+
+#endif /* __GRPC_INTERNAL_SURFACE_CALL_H__ */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
new file mode 100644
index 0000000000..ff994257f4
--- /dev/null
+++ b/src/core/surface/channel.c
@@ -0,0 +1,152 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/channel.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/surface/call.h"
+#include "src/core/surface/client.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+struct grpc_channel {
+ int is_client;
+ gpr_refcount refs;
+ grpc_mdctx *metadata_context;
+ grpc_mdstr *grpc_status_string;
+ grpc_mdstr *grpc_message_string;
+};
+
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
+
+grpc_channel *grpc_channel_create_from_filters(
+ const grpc_channel_filter **filters, size_t num_filters,
+ const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+ size_t size =
+ sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
+ grpc_channel *channel = gpr_malloc(size);
+ channel->is_client = is_client;
+ /* decremented by grpc_channel_destroy */
+ gpr_ref_init(&channel->refs, 1);
+ channel->metadata_context = mdctx;
+ channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
+ channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
+ grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context,
+ CHANNEL_STACK_FROM_CHANNEL(channel));
+ return channel;
+}
+
+static void do_nothing(void *ignored, grpc_op_error error) {}
+
+grpc_call *grpc_channel_create_call(grpc_channel *channel, const char *method,
+ const char *host,
+ gpr_timespec absolute_deadline) {
+ grpc_call *call;
+ grpc_metadata md;
+
+ if (!channel->is_client) {
+ gpr_log(GPR_ERROR, "Cannot create a call on the server.");
+ return NULL;
+ }
+
+ call = grpc_call_create(channel, NULL);
+
+#define ADDMD(k, v) \
+ do { \
+ md.key = (k); \
+ md.value = (char *)(v); \
+ md.value_length = strlen((v)); \
+ grpc_call_add_metadata(call, &md, 0); \
+ } while (0)
+ ADDMD(":method", "POST");
+ ADDMD(":scheme", "grpc");
+ ADDMD(":path", method);
+ ADDMD(":authority", host);
+ ADDMD("content-type", "application/grpc");
+ if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) {
+ grpc_call_op op;
+ op.type = GRPC_SEND_DEADLINE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.data.deadline = absolute_deadline;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ grpc_call_execute_op(call, &op);
+ }
+
+ return call;
+}
+
+void grpc_channel_internal_ref(grpc_channel *channel) {
+ gpr_ref(&channel->refs);
+}
+
+void grpc_channel_internal_unref(grpc_channel *channel) {
+ if (gpr_unref(&channel->refs)) {
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
+ grpc_mdstr_unref(channel->grpc_status_string);
+ grpc_mdstr_unref(channel->grpc_message_string);
+ grpc_mdctx_orphan(channel->metadata_context);
+ gpr_free(channel);
+ }
+}
+
+void grpc_channel_destroy(grpc_channel *channel) {
+ grpc_channel_op op;
+ grpc_channel_element *elem;
+
+ op.type = GRPC_CHANNEL_SHUTDOWN;
+ op.dir = GRPC_CALL_DOWN;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
+ elem->filter->channel_op(elem, &op);
+
+ grpc_channel_internal_unref(channel);
+}
+
+grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
+ return CHANNEL_STACK_FROM_CHANNEL(channel);
+}
+
+grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel) {
+ return channel->metadata_context;
+}
+
+grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
+ return channel->grpc_status_string;
+}
+
+grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
+ return channel->grpc_message_string;
+}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
new file mode 100644
index 0000000000..11d4939916
--- /dev/null
+++ b/src/core/surface/channel.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_CHANNEL_H__
+#define __GRPC_INTERNAL_SURFACE_CHANNEL_H__
+
+#include "src/core/channel/channel_stack.h"
+
+grpc_channel *grpc_channel_create_from_filters(
+ const grpc_channel_filter **filters, size_t count,
+ const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
+
+grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
+grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
+
+void grpc_channel_internal_ref(grpc_channel *channel);
+void grpc_channel_internal_unref(grpc_channel *channel);
+
+#endif /* __GRPC_INTERNAL_SURFACE_CHANNEL_H__ */
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
new file mode 100644
index 0000000000..ec1c8477fa
--- /dev/null
+++ b/src/core/surface/channel_create.c
@@ -0,0 +1,213 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/channel/census_filter.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/client_channel.h"
+#include "src/core/channel/client_setup.h"
+#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/http_client_filter.h"
+#include "src/core/channel/http_filter.h"
+#include "src/core/endpoint/resolve_address.h"
+#include "src/core/endpoint/tcp.h"
+#include "src/core/endpoint/tcp_client.h"
+#include "src/core/surface/channel.h"
+#include "src/core/surface/client.h"
+#include "src/core/surface/surface_em.h"
+#include "src/core/transport/chttp2_transport.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+typedef struct setup setup;
+
+/* A single setup request (started via initiate) */
+typedef struct {
+ grpc_client_setup_request *cs_request;
+ setup *setup;
+ /* Resolved addresses, or null if resolution not yet completed */
+ grpc_resolved_addresses *resolved;
+ /* which address in resolved should we pick for the next connection attempt */
+ size_t resolved_index;
+} request;
+
+/* Global setup logic (may be running many simultaneous setup requests, but
+ with only one 'active' */
+struct setup {
+ const char *target;
+ grpc_transport_setup_callback setup_callback;
+ void *setup_user_data;
+ grpc_em *em;
+};
+
+static int maybe_try_next_resolved(request *r);
+
+static void done(request *r, int was_successful) {
+ grpc_client_setup_request_finish(r->cs_request, was_successful);
+ if (r->resolved) {
+ grpc_resolved_addresses_destroy(r->resolved);
+ }
+ gpr_free(r);
+}
+
+/* connection callback: tcp is either valid, or null on error */
+static void on_connect(void *rp, grpc_endpoint *tcp) {
+ request *r = rp;
+
+ if (!grpc_client_setup_request_should_continue(r->cs_request)) {
+ if (tcp) {
+ grpc_endpoint_shutdown(tcp);
+ grpc_endpoint_destroy(tcp);
+ }
+ done(r, 0);
+ return;
+ }
+
+ if (!tcp) {
+ if (!maybe_try_next_resolved(r)) {
+ done(r, 0);
+ return;
+ } else {
+ return;
+ }
+ } else {
+ grpc_create_chttp2_transport(
+ r->setup->setup_callback, r->setup->setup_user_data,
+ grpc_client_setup_get_channel_args(r->cs_request), tcp, NULL, 0,
+ grpc_client_setup_get_mdctx(r->cs_request), 1);
+ done(r, 1);
+ return;
+ }
+}
+
+/* attempt to connect to the next available resolved address */
+static int maybe_try_next_resolved(request *r) {
+ grpc_resolved_address *addr;
+ if (!r->resolved) return 0;
+ if (r->resolved_index == r->resolved->naddrs) return 0;
+ addr = &r->resolved->addrs[r->resolved_index++];
+ grpc_tcp_client_connect(on_connect, r, r->setup->em,
+ (struct sockaddr *)&addr->addr, addr->len,
+ grpc_client_setup_request_deadline(r->cs_request));
+ return 1;
+}
+
+/* callback for when our target address has been resolved */
+static void on_resolved(void *rp, grpc_resolved_addresses *resolved) {
+ request *r = rp;
+
+ /* if we're not still the active request, abort */
+ if (!grpc_client_setup_request_should_continue(r->cs_request)) {
+ if (resolved) {
+ grpc_resolved_addresses_destroy(resolved);
+ }
+ done(r, 0);
+ return;
+ }
+
+ if (!resolved) {
+ done(r, 0);
+ return;
+ } else {
+ r->resolved = resolved;
+ r->resolved_index = 0;
+ if (!maybe_try_next_resolved(r)) {
+ done(r, 0);
+ }
+ }
+}
+
+static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) {
+ request *r = gpr_malloc(sizeof(request));
+ r->setup = sp;
+ r->cs_request = cs_request;
+ r->resolved = NULL;
+ r->resolved_index = 0;
+ /* TODO(klempner): Make grpc_resolve_address respect deadline */
+ grpc_resolve_address(r->setup->target, "http", on_resolved, r);
+}
+
+static void done_setup(void *sp) {
+ setup *s = sp;
+ gpr_free((void *)s->target);
+ gpr_free(s);
+}
+
+static grpc_transport_setup_result complete_setup(void *channel_stack,
+ grpc_transport *transport,
+ grpc_mdctx *mdctx) {
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter,
+ &grpc_http_filter};
+ return grpc_client_channel_transport_setup_complete(
+ channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
+ mdctx);
+}
+
+/* Create a client channel:
+ Asynchronously: - resolve target
+ - connect to it (trying alternatives as presented)
+ - perform handshakes */
+grpc_channel *grpc_channel_create(const char *target,
+ const grpc_channel_args *args) {
+ setup *s = gpr_malloc(sizeof(setup));
+ grpc_mdctx *mdctx = grpc_mdctx_create();
+ grpc_channel *channel = NULL;
+#define MAX_FILTERS 3
+ const grpc_channel_filter *filters[MAX_FILTERS];
+ int n = 0;
+ filters[n++] = &grpc_client_surface_filter;
+ if (grpc_channel_args_is_census_enabled(args)) {
+ filters[n++] = &grpc_client_census_filter;
+ }
+ filters[n++] = &grpc_client_channel_filter;
+ GPR_ASSERT(n <= MAX_FILTERS);
+ channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
+
+ s->target = gpr_strdup(target);
+ s->em = grpc_surface_em();
+ s->setup_callback = complete_setup;
+ s->setup_user_data = grpc_channel_get_channel_stack(channel);
+
+ grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel),
+ args, mdctx, initiate_setup, done_setup,
+ s, s->em);
+
+ return channel;
+}
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
new file mode 100644
index 0000000000..26abffa817
--- /dev/null
+++ b/src/core/surface/client.c
@@ -0,0 +1,115 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/client.h"
+
+#include "src/core/surface/call.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+
+typedef struct { void *unused; } call_data;
+
+typedef struct { void *unused; } channel_data;
+
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ switch (op->type) {
+ case GRPC_SEND_DEADLINE:
+ grpc_call_set_deadline(elem, op->data.deadline);
+ grpc_call_next_op(elem, op);
+ break;
+ case GRPC_RECV_METADATA:
+ grpc_call_recv_metadata(elem, op);
+ break;
+ case GRPC_RECV_DEADLINE:
+ gpr_log(GPR_ERROR, "Deadline received by client (ignored)");
+ break;
+ case GRPC_RECV_MESSAGE:
+ grpc_call_recv_message(elem, op->data.message, op->done_cb,
+ op->user_data);
+ break;
+ case GRPC_RECV_HALF_CLOSE:
+ grpc_call_recv_finish(elem, 0);
+ break;
+ case GRPC_RECV_FINISH:
+ grpc_call_recv_finish(elem, 1);
+ break;
+ case GRPC_RECV_END_OF_INITIAL_METADATA:
+ grpc_call_client_initial_metadata_complete(elem);
+ break;
+ default:
+ GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
+ grpc_call_next_op(elem, op);
+ }
+}
+
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ switch (op->type) {
+ case GRPC_ACCEPT_CALL:
+ gpr_log(GPR_ERROR, "Client cannot accept new calls");
+ break;
+ case GRPC_TRANSPORT_CLOSED:
+ gpr_log(GPR_ERROR, "Transport closed");
+ break;
+ default:
+ GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
+ grpc_channel_next_op(elem, op);
+ }
+}
+
+static void init_call_elem(grpc_call_element *elem,
+ const void *transport_server_data) {}
+
+static void destroy_call_elem(grpc_call_element *elem) {}
+
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ GPR_ASSERT(is_first);
+ GPR_ASSERT(!is_last);
+}
+
+static void destroy_channel_elem(grpc_channel_element *elem) {
+}
+
+const grpc_channel_filter grpc_client_surface_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "client",
+};
diff --git a/src/core/surface/client.h b/src/core/surface/client.h
new file mode 100644
index 0000000000..eb567276e2
--- /dev/null
+++ b/src/core/surface/client.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_CLIENT_H__
+#define __GRPC_INTERNAL_SURFACE_CLIENT_H__
+
+#include "src/core/channel/channel_stack.h"
+
+extern const grpc_channel_filter grpc_client_surface_filter;
+
+#endif /* __GRPC_INTERNAL_SURFACE_CLIENT_H__ */
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
new file mode 100644
index 0000000000..a7d611579f
--- /dev/null
+++ b/src/core/surface/completion_queue.c
@@ -0,0 +1,392 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/completion_queue.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/eventmanager/em.h"
+#include "src/core/surface/call.h"
+#include "src/core/surface/event_string.h"
+#include "src/core/surface/surface_em.h"
+#include "src/core/surface/surface_trace.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+
+#define NUM_TAG_BUCKETS 31
+
+/* A single event: extends grpc_event to form a linked list with a destruction
+ function (on_finish) that is hidden from outside this module */
+typedef struct event {
+ grpc_event base;
+ grpc_event_finish_func on_finish;
+ void *on_finish_user_data;
+ struct event *queue_next;
+ struct event *queue_prev;
+ struct event *bucket_next;
+ struct event *bucket_prev;
+} event;
+
+/* Completion queue structure */
+struct grpc_completion_queue {
+ grpc_em *em;
+ int allow_polling;
+
+ /* When refs drops to zero, we are in shutdown mode, and will be destroyable
+ once all queued events are drained */
+ gpr_refcount refs;
+ /* 0 initially, 1 once we've begun shutting down */
+ int shutdown;
+ /* Head of a linked list of queued events (prev points to the last element) */
+ event *queue;
+ /* Fixed size chained hash table of events for pluck() */
+ event *buckets[NUM_TAG_BUCKETS];
+
+#ifndef NDEBUG
+ /* Debug support: track which operations are in flight at any given time */
+ gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE];
+#endif
+};
+
+/* Default do-nothing on_finish function */
+static void null_on_finish(void *user_data, grpc_op_error error) {}
+
+grpc_completion_queue *grpc_completion_queue_create() {
+ grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
+ memset(cc, 0, sizeof(*cc));
+ /* Initial ref is dropped by grpc_completion_queue_shutdown */
+ gpr_ref_init(&cc->refs, 1);
+ cc->em = grpc_surface_em();
+ cc->allow_polling = 1;
+ return cc;
+}
+
+void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
+ cc->allow_polling = 0;
+}
+
+/* Create and append an event to the queue. Returns the event so that its data
+ members can be filled in.
+ Requires cc->em->mu locked. */
+static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
+ void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data) {
+ event *ev = gpr_malloc(sizeof(event));
+ gpr_intptr bucket = ((gpr_intptr)tag) % NUM_TAG_BUCKETS;
+ GPR_ASSERT(!cc->shutdown);
+ ev->base.type = type;
+ ev->base.tag = tag;
+ ev->base.call = call;
+ ev->on_finish = on_finish ? on_finish : null_on_finish;
+ ev->on_finish_user_data = user_data;
+ if (cc->queue == NULL) {
+ cc->queue = ev->queue_next = ev->queue_prev = ev;
+ } else {
+ ev->queue_next = cc->queue;
+ ev->queue_prev = cc->queue->queue_prev;
+ ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
+ }
+ if (cc->buckets[bucket] == NULL) {
+ cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
+ } else {
+ ev->bucket_next = cc->buckets[bucket];
+ ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
+ ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
+ }
+ gpr_cv_broadcast(&cc->em->cv);
+ return ev;
+}
+
+void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
+ grpc_completion_type type) {
+ gpr_ref(&cc->refs);
+ if (call) grpc_call_internal_ref(call);
+#ifndef NDEBUG
+ gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
+#endif
+}
+
+/* Signal the end of an operation - if this is the last waiting-to-be-queued
+ event, then enter shutdown mode */
+static void end_op_locked(grpc_completion_queue *cc,
+ grpc_completion_type type) {
+#ifndef NDEBUG
+ GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0);
+#endif
+ if (gpr_unref(&cc->refs)) {
+ GPR_ASSERT(!cc->shutdown);
+ cc->shutdown = 1;
+ gpr_cv_broadcast(&cc->em->cv);
+ }
+}
+
+void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ grpc_byte_buffer *read) {
+ event *ev;
+ gpr_mu_lock(&cc->em->mu);
+ ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
+ ev->base.data.read = read;
+ end_op_locked(cc, GRPC_READ);
+ gpr_mu_unlock(&cc->em->mu);
+}
+
+void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
+ grpc_call *call,
+ grpc_event_finish_func on_finish,
+ void *user_data, grpc_op_error error) {
+ event *ev;
+ gpr_mu_lock(&cc->em->mu);
+ ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
+ ev->base.data.invoke_accepted = error;
+ end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
+ gpr_mu_unlock(&cc->em->mu);
+}
+
+void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
+ grpc_call *call,
+ grpc_event_finish_func on_finish,
+ void *user_data, grpc_op_error error) {
+ event *ev;
+ gpr_mu_lock(&cc->em->mu);
+ ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
+ ev->base.data.write_accepted = error;
+ end_op_locked(cc, GRPC_WRITE_ACCEPTED);
+ gpr_mu_unlock(&cc->em->mu);
+}
+
+void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
+ grpc_call *call,
+ grpc_event_finish_func on_finish,
+ void *user_data, grpc_op_error error) {
+ event *ev;
+ gpr_mu_lock(&cc->em->mu);
+ ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
+ ev->base.data.finish_accepted = error;
+ end_op_locked(cc, GRPC_FINISH_ACCEPTED);
+ gpr_mu_unlock(&cc->em->mu);
+}
+
+void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
+ grpc_call *call,
+ grpc_event_finish_func on_finish,
+ void *user_data, size_t count,
+ grpc_metadata *elements) {
+ event *ev;
+ gpr_mu_lock(&cc->em->mu);
+ ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
+ user_data);
+ ev->base.data.client_metadata_read.count = count;
+ ev->base.data.client_metadata_read.elements = elements;
+ end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
+ gpr_mu_unlock(&cc->em->mu);
+}
+
+void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ grpc_status status) {
+ event *ev;
+ gpr_mu_lock(&cc->em->mu);
+ ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
+ ev->base.data.finished = status;
+ end_op_locked(cc, GRPC_FINISHED);
+ gpr_mu_unlock(&cc->em->mu);
+}
+
+void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ const char *method, const char *host,
+ gpr_timespec deadline, size_t metadata_count,
+ grpc_metadata *metadata_elements) {
+ event *ev;
+ gpr_mu_lock(&cc->em->mu);
+ ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
+ ev->base.data.server_rpc_new.method = method;
+ ev->base.data.server_rpc_new.host = host;
+ ev->base.data.server_rpc_new.deadline = deadline;
+ ev->base.data.server_rpc_new.metadata_count = metadata_count;
+ ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
+ end_op_locked(cc, GRPC_SERVER_RPC_NEW);
+ gpr_mu_unlock(&cc->em->mu);
+}
+
+/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
+static event *create_shutdown_event() {
+ event *ev = gpr_malloc(sizeof(event));
+ ev->base.type = GRPC_QUEUE_SHUTDOWN;
+ ev->base.call = NULL;
+ ev->base.tag = NULL;
+ ev->on_finish = null_on_finish;
+ return ev;
+}
+
+grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
+ gpr_timespec deadline) {
+ event *ev = NULL;
+
+ gpr_mu_lock(&cc->em->mu);
+ for (;;) {
+ if (cc->queue != NULL) {
+ gpr_intptr bucket;
+ ev = cc->queue;
+ bucket = ((gpr_intptr)ev->base.tag) % NUM_TAG_BUCKETS;
+ cc->queue = ev->queue_next;
+ ev->queue_next->queue_prev = ev->queue_prev;
+ ev->queue_prev->queue_next = ev->queue_next;
+ ev->bucket_next->bucket_prev = ev->bucket_prev;
+ ev->bucket_prev->bucket_next = ev->bucket_next;
+ if (ev == cc->buckets[bucket]) {
+ cc->buckets[bucket] = ev->bucket_next;
+ if (ev == cc->buckets[bucket]) {
+ cc->buckets[bucket] = NULL;
+ }
+ }
+ if (cc->queue == ev) {
+ cc->queue = NULL;
+ }
+ break;
+ }
+ if (cc->shutdown) {
+ ev = create_shutdown_event();
+ break;
+ }
+ if (cc->allow_polling && grpc_em_work(cc->em, deadline)) {
+ continue;
+ }
+ if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) {
+ gpr_mu_unlock(&cc->em->mu);
+ return NULL;
+ }
+ }
+ gpr_mu_unlock(&cc->em->mu);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
+ return &ev->base;
+}
+
+static event *pluck_event(grpc_completion_queue *cc, void *tag) {
+ gpr_intptr bucket = ((gpr_intptr)tag) % NUM_TAG_BUCKETS;
+ event *ev = cc->buckets[bucket];
+ if (ev == NULL) return NULL;
+ do {
+ if (ev->base.tag == tag) {
+ ev->queue_next->queue_prev = ev->queue_prev;
+ ev->queue_prev->queue_next = ev->queue_next;
+ ev->bucket_next->bucket_prev = ev->bucket_prev;
+ ev->bucket_prev->bucket_next = ev->bucket_next;
+ if (ev == cc->buckets[bucket]) {
+ cc->buckets[bucket] = ev->bucket_next;
+ if (ev == cc->buckets[bucket]) {
+ cc->buckets[bucket] = NULL;
+ }
+ }
+ if (cc->queue == ev) {
+ cc->queue = ev->queue_next;
+ if (cc->queue == ev) {
+ cc->queue = NULL;
+ }
+ }
+ return ev;
+ }
+ ev = ev->bucket_next;
+ } while (ev != cc->buckets[bucket]);
+ return NULL;
+}
+
+grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
+ gpr_timespec deadline) {
+ event *ev = NULL;
+
+ gpr_mu_lock(&cc->em->mu);
+ for (;;) {
+ if ((ev = pluck_event(cc, tag))) {
+ break;
+ }
+ if (cc->shutdown) {
+ ev = create_shutdown_event();
+ break;
+ }
+ if (cc->allow_polling && grpc_em_work(cc->em, deadline)) {
+ continue;
+ }
+ if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) {
+ gpr_mu_unlock(&cc->em->mu);
+ return NULL;
+ }
+ }
+ gpr_mu_unlock(&cc->em->mu);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
+ return &ev->base;
+}
+
+/* Shutdown simply drops a ref that we reserved at creation time; if we drop
+ to zero here, then enter shutdown mode and wake up any waiters */
+void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
+ if (gpr_unref(&cc->refs)) {
+ gpr_mu_lock(&cc->em->mu);
+ GPR_ASSERT(!cc->shutdown);
+ cc->shutdown = 1;
+ gpr_cv_broadcast(&cc->em->cv);
+ gpr_mu_unlock(&cc->em->mu);
+ }
+}
+
+void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
+ GPR_ASSERT(cc->queue == NULL);
+ gpr_free(cc);
+}
+
+void grpc_event_finish(grpc_event *base) {
+ event *ev = (event *)base;
+ ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
+ if (ev->base.call) {
+ grpc_call_internal_unref(ev->base.call);
+ }
+ gpr_free(ev);
+}
+
+void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
+#ifndef NDEBUG
+ char tmp[256];
+ char *p = tmp;
+ int i;
+
+ for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) {
+ p += sprintf(p, " %d", (int)cc->pending_op_count[i]);
+ }
+
+ gpr_log(GPR_INFO, "pending ops:%s", tmp);
+#endif
+}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
new file mode 100644
index 0000000000..0fe576588a
--- /dev/null
+++ b/src/core/surface/completion_queue.h
@@ -0,0 +1,102 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__
+#define __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__
+
+/* Internal API for completion channels */
+
+#include <grpc/grpc.h>
+
+/* A finish func is executed whenever the event consumer calls
+ grpc_event_finish */
+typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error);
+
+/* Flag that an operation is beginning: the completion channel will not finish
+ shutdown until a corrensponding grpc_cq_end_* call is made */
+void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
+ grpc_completion_type type);
+
+/* grpc_cq_end_* functions pair with a grpc_cq_begin_op
+
+ grpc_cq_end_* common arguments:
+ cc - the completion channel to queue on
+ tag - the user supplied operation tag
+ on_finish - grpc_event_finish_func that is called during grpc_event_finish
+ can be NULL to not get a callback
+ user_data - user_data parameter to be passed to on_finish
+
+ Other parameters match the data member of grpc_event */
+
+/* Queue a GRPC_READ operation */
+void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ grpc_byte_buffer *read);
+/* Queue a GRPC_INVOKE_ACCEPTED operation */
+void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
+ grpc_call *call,
+ grpc_event_finish_func on_finish,
+ void *user_data, grpc_op_error error);
+/* Queue a GRPC_WRITE_ACCEPTED operation */
+void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
+ grpc_call *call,
+ grpc_event_finish_func on_finish,
+ void *user_data, grpc_op_error error);
+/* Queue a GRPC_FINISH_ACCEPTED operation */
+void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
+ grpc_call *call,
+ grpc_event_finish_func on_finish,
+ void *user_data, grpc_op_error error);
+/* Queue a GRPC_CLIENT_METADATA_READ operation */
+void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
+ grpc_call *call,
+ grpc_event_finish_func on_finish,
+ void *user_data, size_t count,
+ grpc_metadata *elements);
+
+void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ grpc_status status);
+
+void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ const char *method, const char *host,
+ gpr_timespec deadline, size_t metadata_count,
+ grpc_metadata *metadata_elements);
+
+/* disable polling for some tests */
+void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);
+
+void grpc_cq_dump_pending_ops(grpc_completion_queue *cc);
+
+#endif /* __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__ */
diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c
new file mode 100644
index 0000000000..0a6a81d18e
--- /dev/null
+++ b/src/core/surface/event_string.c
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/event_string.h"
+
+#include <stdio.h>
+
+#include <grpc/support/string.h>
+#include <grpc/byte_buffer.h>
+
+static size_t addhdr(char *p, grpc_event *ev) {
+ return sprintf(p, "tag:%p call:%p", ev->tag, (void *)ev->call);
+}
+
+static const char *errstr(grpc_op_error err) {
+ switch (err) {
+ case GRPC_OP_OK:
+ return "OK";
+ case GRPC_OP_ERROR:
+ return "ERROR";
+ }
+ return "UNKNOWN_UNKNOWN";
+}
+
+static size_t adderr(char *p, grpc_op_error err) {
+ return sprintf(p, " err=%s", errstr(err));
+}
+
+char *grpc_event_string(grpc_event *ev) {
+ char buffer[1024];
+ char *p = buffer;
+
+ if (ev == NULL) return gpr_strdup("null");
+
+ switch (ev->type) {
+ case GRPC_QUEUE_SHUTDOWN:
+ p += sprintf(p, "QUEUE_SHUTDOWN");
+ break;
+ case GRPC_READ:
+ p += sprintf(p, "READ: ");
+ p += addhdr(p, ev);
+ if (ev->data.read) {
+ p += sprintf(p, " %d bytes",
+ (int)grpc_byte_buffer_length(ev->data.read));
+ } else {
+ p += sprintf(p, " end-of-stream");
+ }
+ break;
+ case GRPC_INVOKE_ACCEPTED:
+ p += sprintf(p, "INVOKE_ACCEPTED: ");
+ p += addhdr(p, ev);
+ p += adderr(p, ev->data.invoke_accepted);
+ break;
+ case GRPC_WRITE_ACCEPTED:
+ p += sprintf(p, "WRITE_ACCEPTED: ");
+ p += addhdr(p, ev);
+ p += adderr(p, ev->data.write_accepted);
+ break;
+ case GRPC_FINISH_ACCEPTED:
+ p += sprintf(p, "FINISH_ACCEPTED: ");
+ p += addhdr(p, ev);
+ p += adderr(p, ev->data.write_accepted);
+ break;
+ case GRPC_CLIENT_METADATA_READ:
+ p += sprintf(p, "CLIENT_METADATA_READ: ");
+ p += addhdr(p, ev);
+ p += sprintf(p, " %d elements", (int)ev->data.client_metadata_read.count);
+ break;
+ case GRPC_FINISHED:
+ p += sprintf(p, "FINISHED: ");
+ p += addhdr(p, ev);
+ p += sprintf(p, " status_code=%d details='%s'", ev->data.finished.code,
+ ev->data.finished.details);
+ break;
+ case GRPC_SERVER_RPC_NEW:
+ p += sprintf(p, "SERVER_RPC_NEW: ");
+ p += addhdr(p, ev);
+ p += sprintf(p, " method='%s' host='%s' %d metadata elements",
+ ev->data.server_rpc_new.method, ev->data.server_rpc_new.host,
+ (int)ev->data.server_rpc_new.metadata_count);
+ break;
+ case GRPC_COMPLETION_DO_NOT_USE:
+ p += sprintf(p, "DO_NOT_USE (this is a bug)");
+ p += addhdr(p, ev);
+ break;
+ }
+
+ return gpr_strdup(buffer);
+}
diff --git a/src/core/surface/event_string.h b/src/core/surface/event_string.h
new file mode 100644
index 0000000000..30b693e95c
--- /dev/null
+++ b/src/core/surface/event_string.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_EVENT_STRING_H__
+#define __GRPC_INTERNAL_SURFACE_EVENT_STRING_H__
+
+#include <grpc/grpc.h>
+
+/* Returns a string describing an event. Must be later freed with gpr_free() */
+char *grpc_event_string(grpc_event *ev);
+
+#endif /* __GRPC_INTERNAL_SURFACE_EVENT_STRING_H__ */
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
new file mode 100644
index 0000000000..92c0ac880d
--- /dev/null
+++ b/src/core/surface/init.c
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include "src/core/statistics/census_interface.h"
+#include "src/core/surface/surface_em.h"
+
+void grpc_init() {
+ grpc_surface_em_init();
+ census_init();
+}
+
+void grpc_shutdown() {
+ grpc_surface_em_shutdown();
+ census_shutdown();
+}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
new file mode 100644
index 0000000000..18921c44dd
--- /dev/null
+++ b/src/core/surface/lame_client.c
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/lame_client.h"
+
+#include "src/core/channel/channel_stack.h"
+#include "src/core/surface/channel.h"
+#include "src/core/surface/call.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+
+typedef struct { void *unused; } call_data;
+
+typedef struct { void *unused; } channel_data;
+
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ switch (op->type) {
+ case GRPC_SEND_START:
+ grpc_call_recv_finish(elem, 1);
+ break;
+ case GRPC_SEND_METADATA:
+ grpc_mdelem_unref(op->data.metadata);
+ break;
+ default:
+ break;
+ }
+
+ op->done_cb(op->user_data, GRPC_OP_ERROR);
+}
+
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {}
+
+static void init_call_elem(grpc_call_element *elem,
+ const void *transport_server_data) {}
+
+static void destroy_call_elem(grpc_call_element *elem) {}
+
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args, grpc_mdctx *mdctx,
+ int is_first, int is_last) {
+ GPR_ASSERT(is_first);
+ GPR_ASSERT(is_last);
+}
+
+static void destroy_channel_elem(grpc_channel_element *elem) {}
+
+static const grpc_channel_filter lame_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "lame-client",
+};
+
+grpc_channel *grpc_lame_client_channel_create() {
+ static const grpc_channel_filter *filters[] = {&lame_filter};
+ return grpc_channel_create_from_filters(filters, 1, NULL, grpc_mdctx_create(),
+ 1);
+}
diff --git a/src/core/surface/lame_client.h b/src/core/surface/lame_client.h
new file mode 100644
index 0000000000..74b9707202
--- /dev/null
+++ b/src/core/surface/lame_client.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_LAME_CLIENT_H_
+#define __GRPC_INTERNAL_SURFACE_LAME_CLIENT_H_
+
+#include <grpc/grpc.h>
+
+/* Create a lame client: this client fails every operation attempted on it. */
+grpc_channel *grpc_lame_client_channel_create();
+
+#endif /* __GRPC_INTERNAL_SURFACE_LAME_CLIENT_H_ */
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
new file mode 100644
index 0000000000..f330b83521
--- /dev/null
+++ b/src/core/surface/secure_channel_create.c
@@ -0,0 +1,243 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/channel/census_filter.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/client_channel.h"
+#include "src/core/channel/client_setup.h"
+#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/http_client_filter.h"
+#include "src/core/channel/http_filter.h"
+#include "src/core/endpoint/resolve_address.h"
+#include "src/core/endpoint/tcp.h"
+#include "src/core/endpoint/tcp_client.h"
+#include "src/core/security/auth.h"
+#include "src/core/security/security_context.h"
+#include "src/core/security/secure_transport_setup.h"
+#include "src/core/surface/channel.h"
+#include "src/core/surface/client.h"
+#include "src/core/surface/surface_em.h"
+#include "src/core/transport/chttp2_transport.h"
+#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+#include "src/core/tsi/transport_security_interface.h"
+
+typedef struct setup setup;
+
+/* A single setup request (started via initiate) */
+typedef struct {
+ grpc_client_setup_request *cs_request;
+ setup *setup;
+ /* Resolved addresses, or null if resolution not yet completed. */
+ grpc_resolved_addresses *resolved;
+ /* which address in resolved should we pick for the next connection attempt */
+ size_t resolved_index;
+} request;
+
+struct setup {
+ grpc_channel_security_context *security_context;
+ const char *target;
+ grpc_transport_setup_callback setup_callback;
+ void *setup_user_data;
+ grpc_em *em;
+};
+
+static int maybe_try_next_resolved(request *r);
+
+static void done(request *r, int was_successful) {
+ grpc_client_setup_request_finish(r->cs_request, was_successful);
+ if (r->resolved) {
+ grpc_resolved_addresses_destroy(r->resolved);
+ }
+ gpr_free(r);
+}
+
+static void on_secure_transport_setup_done(void *rp,
+ grpc_security_status status,
+ grpc_endpoint *secure_endpoint) {
+ request *r = rp;
+ if (status != GRPC_SECURITY_OK) {
+ gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
+ done(r, 0);
+ } else {
+ grpc_create_chttp2_transport(
+ r->setup->setup_callback, r->setup->setup_user_data,
+ grpc_client_setup_get_channel_args(r->cs_request), secure_endpoint,
+ NULL, 0, grpc_client_setup_get_mdctx(r->cs_request), 1);
+ done(r, 1);
+ }
+}
+
+/* connection callback: tcp is either valid, or null on error */
+static void on_connect(void *rp, grpc_endpoint *tcp) {
+ request *r = rp;
+
+ if (!grpc_client_setup_request_should_continue(r->cs_request)) {
+ if (tcp) {
+ grpc_endpoint_shutdown(tcp);
+ grpc_endpoint_destroy(tcp);
+ }
+ done(r, 0);
+ return;
+ }
+
+ if (!tcp) {
+ if (!maybe_try_next_resolved(r)) {
+ done(r, 0);
+ return;
+ } else {
+ return;
+ }
+ } else {
+ grpc_setup_secure_transport(&r->setup->security_context->base, tcp,
+ on_secure_transport_setup_done, r);
+ }
+}
+
+/* attempt to connect to the next available resolved address */
+static int maybe_try_next_resolved(request *r) {
+ grpc_resolved_address *addr;
+ if (!r->resolved) return 0;
+ if (r->resolved_index == r->resolved->naddrs) return 0;
+ addr = &r->resolved->addrs[r->resolved_index++];
+ grpc_tcp_client_connect(on_connect, r, r->setup->em,
+ (struct sockaddr *)&addr->addr, addr->len,
+ grpc_client_setup_request_deadline(r->cs_request));
+ return 1;
+}
+
+/* callback for when our target address has been resolved */
+static void on_resolved(void *rp, grpc_resolved_addresses *resolved) {
+ request *r = rp;
+
+ /* if we're not still the active request, abort */
+ if (!grpc_client_setup_request_should_continue(r->cs_request)) {
+ if (resolved) {
+ grpc_resolved_addresses_destroy(resolved);
+ }
+ done(r, 0);
+ return;
+ }
+
+ if (!resolved) {
+ done(r, 0);
+ return;
+ } else {
+ r->resolved = resolved;
+ r->resolved_index = 0;
+ if (!maybe_try_next_resolved(r)) {
+ done(r, 0);
+ }
+ }
+}
+
+static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) {
+ request *r = gpr_malloc(sizeof(request));
+ r->setup = sp;
+ r->cs_request = cs_request;
+ r->resolved = NULL;
+ r->resolved_index = 0;
+ /* TODO(klempner): Make grpc_resolve_address respect deadline */
+ grpc_resolve_address(r->setup->target, "https", on_resolved, r);
+}
+
+static void done_setup(void *sp) {
+ setup *s = sp;
+ gpr_free((void *)s->target);
+ grpc_security_context_unref(&s->security_context->base);
+ gpr_free(s);
+}
+
+static grpc_transport_setup_result complete_setup(void *channel_stack,
+ grpc_transport *transport,
+ grpc_mdctx *mdctx) {
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter,
+ &grpc_http_filter};
+ return grpc_client_channel_transport_setup_complete(
+ channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
+ mdctx);
+}
+
+/* Create a secure client channel:
+ Asynchronously: - resolve target
+ - connect to it (trying alternatives as presented)
+ - perform handshakes */
+grpc_channel *grpc_secure_channel_create_internal(
+ const char *target, const grpc_channel_args *args,
+ grpc_channel_security_context *context) {
+ setup *s;
+ grpc_channel *channel;
+ grpc_arg context_arg;
+ grpc_channel_args *args_copy;
+ grpc_mdctx *mdctx = grpc_mdctx_create();
+#define MAX_FILTERS 4
+ const grpc_channel_filter *filters[MAX_FILTERS];
+ int n = 0;
+ if (grpc_find_security_context_in_args(args) != NULL) {
+ gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
+ }
+
+ s = gpr_malloc(sizeof(setup));
+ context_arg = grpc_security_context_to_arg(&context->base);
+ args_copy = grpc_channel_args_copy_and_add(args, &context_arg);
+ filters[n++] = &grpc_client_surface_filter;
+ if (grpc_channel_args_is_census_enabled(args)) {
+ filters[n++] = &grpc_client_census_filter;
+ }
+ filters[n++] = &grpc_client_auth_filter;
+ filters[n++] = &grpc_client_channel_filter;
+ GPR_ASSERT(n <= MAX_FILTERS);
+ channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
+ grpc_channel_args_destroy(args_copy);
+
+ s->target = gpr_strdup(target);
+ s->em = grpc_surface_em();
+ s->setup_callback = complete_setup;
+ s->setup_user_data = grpc_channel_get_channel_stack(channel);
+ s->security_context =
+ (grpc_channel_security_context *)grpc_security_context_ref(
+ &context->base);
+ grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel),
+ args, mdctx, initiate_setup, done_setup,
+ s, s->em);
+ return channel;
+}
diff --git a/src/core/surface/secure_server_create.c b/src/core/surface/secure_server_create.c
new file mode 100644
index 0000000000..bf0f62367f
--- /dev/null
+++ b/src/core/surface/secure_server_create.c
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+
+#include "src/core/channel/channel_args.h"
+#include "src/core/security/security_context.h"
+#include "src/core/surface/completion_queue.h"
+#include "src/core/surface/server.h"
+#include <grpc/support/log.h>
+
+grpc_server *grpc_secure_server_create_internal(
+ grpc_completion_queue *cq, const grpc_channel_args *args,
+ grpc_security_context *context) {
+ grpc_arg context_arg;
+ grpc_channel_args *args_copy;
+ grpc_server *server;
+ if (grpc_find_security_context_in_args(args) != NULL) {
+ gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
+ }
+
+ context_arg = grpc_security_context_to_arg(context);
+ args_copy = grpc_channel_args_copy_and_add(args, &context_arg);
+ server = grpc_server_create_from_filters(cq, NULL, 0, args_copy);
+ grpc_channel_args_destroy(args_copy);
+ return server;
+}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
new file mode 100644
index 0000000000..99d66ffb2d
--- /dev/null
+++ b/src/core/surface/server.c
@@ -0,0 +1,609 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/server.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/channel/census_filter.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/connected_channel.h"
+#include "src/core/surface/call.h"
+#include "src/core/surface/channel.h"
+#include "src/core/surface/completion_queue.h"
+#include "src/core/surface/surface_em.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string.h>
+#include <grpc/support/useful.h>
+
+typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
+
+typedef struct listener {
+ void *arg;
+ void (*start)(grpc_server *server, void *arg);
+ void (*destroy)(grpc_server *server, void *arg);
+ struct listener *next;
+} listener;
+
+typedef struct call_data call_data;
+typedef struct channel_data channel_data;
+
+struct channel_data {
+ grpc_server *server;
+ grpc_channel *channel;
+ /* linked list of all channels on a server */
+ channel_data *next;
+ channel_data *prev;
+};
+
+struct grpc_server {
+ size_t channel_filter_count;
+ const grpc_channel_filter **channel_filters;
+ grpc_channel_args *channel_args;
+ grpc_completion_queue *cq;
+ grpc_em *em;
+
+ gpr_mu mu;
+
+ void **tags;
+ size_t ntags;
+ size_t tag_cap;
+
+ gpr_uint8 shutdown;
+
+ call_data *lists[CALL_LIST_COUNT];
+ channel_data root_channel_data;
+
+ listener *listeners;
+ gpr_refcount internal_refcount;
+};
+
+typedef struct {
+ call_data *next;
+ call_data *prev;
+} call_link;
+
+typedef enum {
+ /* waiting for metadata */
+ NOT_STARTED,
+ /* inital metadata read, not flow controlled in yet */
+ PENDING,
+ /* flow controlled in, on completion queue */
+ ACTIVATED,
+ /* cancelled before being queued */
+ ZOMBIED
+} call_state;
+
+struct call_data {
+ grpc_call *call;
+
+ call_state state;
+ gpr_timespec deadline;
+
+ gpr_uint8 included[CALL_LIST_COUNT];
+ call_link links[CALL_LIST_COUNT];
+};
+
+#define SERVER_FROM_CALL_ELEM(elem) \
+ (((channel_data *)(elem)->channel_data)->server)
+
+static void do_nothing(void *unused, grpc_op_error ignored) {}
+
+static int call_list_join(grpc_server *server, call_data *call,
+ call_list list) {
+ if (call->included[list]) return 0;
+ call->included[list] = 1;
+ if (!server->lists[list]) {
+ server->lists[list] = call;
+ call->links[list].next = call->links[list].prev = call;
+ } else {
+ call->links[list].next = server->lists[list];
+ call->links[list].prev = server->lists[list]->links[list].prev;
+ call->links[list].next->links[list].prev =
+ call->links[list].prev->links[list].next = call;
+ }
+ return 1;
+}
+
+static call_data *call_list_remove_head(grpc_server *server, call_list list) {
+ call_data *out = server->lists[list];
+ if (out) {
+ out->included[list] = 0;
+ if (out->links[list].next == out) {
+ server->lists[list] = NULL;
+ } else {
+ server->lists[list] = out->links[list].next;
+ out->links[list].next->links[list].prev = out->links[list].prev;
+ out->links[list].prev->links[list].next = out->links[list].next;
+ }
+ }
+ return out;
+}
+
+static int call_list_remove(grpc_server *server, call_data *call,
+ call_list list) {
+ if (!call->included[list]) return 0;
+ call->included[list] = 0;
+ if (server->lists[list] == call) {
+ server->lists[list] = call->links[list].next;
+ if (server->lists[list] == call) {
+ server->lists[list] = NULL;
+ return 1;
+ }
+ }
+ GPR_ASSERT(server->lists[list] != call);
+ call->links[list].next->links[list].prev = call->links[list].prev;
+ call->links[list].prev->links[list].next = call->links[list].next;
+ return 1;
+}
+
+static void server_ref(grpc_server *server) {
+ gpr_ref(&server->internal_refcount);
+}
+
+static void server_unref(grpc_server *server) {
+ if (gpr_unref(&server->internal_refcount)) {
+ grpc_channel_args_destroy(server->channel_args);
+ gpr_mu_destroy(&server->mu);
+ gpr_free(server->channel_filters);
+ gpr_free(server->tags);
+ gpr_free(server);
+ }
+}
+
+static int is_channel_orphaned(channel_data *chand) {
+ return chand->next == chand;
+}
+
+static void orphan_channel(channel_data *chand) {
+ chand->next->prev = chand->prev;
+ chand->prev->next = chand->next;
+ chand->next = chand->prev = chand;
+}
+
+static void finish_destroy_channel(void *cd, grpc_em_cb_status status) {
+ channel_data *chand = cd;
+ grpc_server *server = chand->server;
+ /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/
+ grpc_channel_destroy(chand->channel);
+ server_unref(server);
+}
+
+static void destroy_channel(channel_data *chand) {
+ if (is_channel_orphaned(chand)) return;
+ GPR_ASSERT(chand->server != NULL);
+ orphan_channel(chand);
+ server_ref(chand->server);
+ grpc_em_add_callback(chand->server->em, finish_destroy_channel, chand);
+}
+
+static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) {
+ grpc_call *call = calld->call;
+ grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call);
+ size_t count = grpc_metadata_buffer_count(mdbuf);
+ grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf);
+ const char *host = NULL;
+ const char *method = NULL;
+ size_t i;
+ grpc_metadata status_md;
+
+ for (i = 0; i < count; i++) {
+ if (0 == strcmp(elements[i].key, ":authority")) {
+ host = elements[i].value;
+ } else if (0 == strcmp(elements[i].key, ":path")) {
+ method = elements[i].value;
+ }
+ }
+
+ status_md.key = ":status";
+ status_md.value = "200";
+ status_md.value_length = 3;
+ grpc_call_add_metadata(call, &status_md, GRPC_WRITE_BUFFER_HINT);
+
+ grpc_call_internal_ref(call);
+ grpc_cq_end_new_rpc(server->cq, tag, call,
+ grpc_metadata_buffer_cleanup_elements, elements, method,
+ host, calld->deadline, count, elements);
+}
+
+static void start_new_rpc(grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ grpc_server *server = chand->server;
+
+ gpr_mu_lock(&server->mu);
+ if (server->ntags) {
+ calld->state = ACTIVATED;
+ queue_new_rpc(server, calld, server->tags[--server->ntags]);
+ } else {
+ calld->state = PENDING;
+ call_list_join(server, calld, PENDING_START);
+ }
+ gpr_mu_unlock(&server->mu);
+}
+
+static void kill_zombie(void *elem, grpc_em_cb_status status) {
+ grpc_call_destroy(grpc_call_from_top_element(elem));
+}
+
+static void finish_rpc(grpc_call_element *elem, int is_full_close) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->server->mu);
+ switch (calld->state) {
+ case ACTIVATED:
+ grpc_call_recv_finish(elem, is_full_close);
+ break;
+ case PENDING:
+ if (!is_full_close) {
+ grpc_call_recv_finish(elem, is_full_close);
+ break;
+ }
+ call_list_remove(chand->server, calld, PENDING_START);
+ /* fallthrough intended */
+ case NOT_STARTED:
+ calld->state = ZOMBIED;
+ grpc_em_add_callback(chand->server->em, kill_zombie, elem);
+ break;
+ case ZOMBIED:
+ break;
+ }
+ gpr_mu_unlock(&chand->server->mu);
+}
+
+static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ switch (op->type) {
+ case GRPC_RECV_METADATA:
+ grpc_call_recv_metadata(elem, op);
+ break;
+ case GRPC_RECV_END_OF_INITIAL_METADATA:
+ start_new_rpc(elem);
+ break;
+ case GRPC_RECV_MESSAGE:
+ grpc_call_recv_message(elem, op->data.message, op->done_cb,
+ op->user_data);
+ break;
+ case GRPC_RECV_HALF_CLOSE:
+ finish_rpc(elem, 0);
+ break;
+ case GRPC_RECV_FINISH:
+ finish_rpc(elem, 1);
+ break;
+ case GRPC_RECV_DEADLINE:
+ grpc_call_set_deadline(elem, op->data.deadline);
+ ((call_data *)elem->call_data)->deadline = op->data.deadline;
+ break;
+ default:
+ GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
+ grpc_call_next_op(elem, op);
+ break;
+ }
+}
+
+static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+ channel_data *chand = elem->channel_data;
+
+ switch (op->type) {
+ case GRPC_ACCEPT_CALL:
+ /* create a call */
+ grpc_call_create(chand->channel,
+ op->data.accept_call.transport_server_data);
+ break;
+ case GRPC_TRANSPORT_CLOSED:
+ /* if the transport is closed for a server channel, we destroy the
+ channel */
+ gpr_mu_lock(&chand->server->mu);
+ server_ref(chand->server);
+ destroy_channel(chand);
+ gpr_mu_unlock(&chand->server->mu);
+ server_unref(chand->server);
+ break;
+ default:
+ GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
+ grpc_channel_next_op(elem, op);
+ break;
+ }
+}
+
+static void finish_shutdown_channel(void *cd, grpc_em_cb_status status) {
+ channel_data *chand = cd;
+ grpc_channel_op op;
+ op.type = GRPC_CHANNEL_SHUTDOWN;
+ op.dir = GRPC_CALL_DOWN;
+ channel_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(chand->channel), 0),
+ &op);
+ grpc_channel_internal_unref(chand->channel);
+}
+
+static void shutdown_channel(channel_data *chand) {
+ grpc_channel_internal_ref(chand->channel);
+ grpc_em_add_callback(chand->server->em, finish_shutdown_channel, chand);
+}
+
+static void init_call_elem(grpc_call_element *elem,
+ const void *server_transport_data) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ memset(calld, 0, sizeof(call_data));
+ calld->deadline = gpr_inf_future;
+ calld->call = grpc_call_from_top_element(elem);
+
+ gpr_mu_lock(&chand->server->mu);
+ call_list_join(chand->server, calld, ALL_CALLS);
+ gpr_mu_unlock(&chand->server->mu);
+
+ server_ref(chand->server);
+}
+
+static void destroy_call_elem(grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ int i;
+
+ gpr_mu_lock(&chand->server->mu);
+ for (i = 0; i < CALL_LIST_COUNT; i++) {
+ call_list_remove(chand->server, elem->call_data, i);
+ }
+ gpr_mu_unlock(&chand->server->mu);
+
+ server_unref(chand->server);
+}
+
+static void init_channel_elem(grpc_channel_element *elem,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(is_first);
+ GPR_ASSERT(!is_last);
+ chand->server = NULL;
+ chand->channel = NULL;
+ chand->next = chand->prev = chand;
+}
+
+static void destroy_channel_elem(grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ if (chand->server) {
+ gpr_mu_lock(&chand->server->mu);
+ chand->next->prev = chand->prev;
+ chand->prev->next = chand->next;
+ chand->next = chand->prev = chand;
+ gpr_mu_unlock(&chand->server->mu);
+ server_unref(chand->server);
+ }
+}
+
+static const grpc_channel_filter server_surface_filter = {
+ call_op, channel_op,
+
+ sizeof(call_data), init_call_elem, destroy_call_elem,
+
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+
+ "server",
+};
+
+static void early_terminate_requested_calls(grpc_completion_queue *cq,
+ void **tags, size_t ntags) {
+ size_t i;
+
+ for (i = 0; i < ntags; i++) {
+ grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL,
+ gpr_inf_past, 0, NULL);
+ }
+}
+
+grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
+ grpc_channel_filter **filters,
+ size_t filter_count,
+ const grpc_channel_args *args) {
+ size_t i;
+ int census_enabled = grpc_channel_args_is_census_enabled(args);
+
+ grpc_server *server = gpr_malloc(sizeof(grpc_server));
+ memset(server, 0, sizeof(grpc_server));
+
+ gpr_mu_init(&server->mu);
+
+ server->cq = cq;
+ server->em = grpc_surface_em();
+ /* decremented by grpc_server_destroy */
+ gpr_ref_init(&server->internal_refcount, 1);
+ server->root_channel_data.next = server->root_channel_data.prev =
+ &server->root_channel_data;
+
+ /* Server filter stack is:
+
+ server_surface_filter - for making surface API calls
+ grpc_server_census_filter (optional) - for stats collection and tracing
+ {passed in filter stack}
+ grpc_connected_channel_filter - for interfacing with transports */
+ server->channel_filter_count = filter_count + 1 + census_enabled;
+ server->channel_filters =
+ gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
+ server->channel_filters[0] = &server_surface_filter;
+ if (census_enabled) {
+ server->channel_filters[1] = &grpc_server_census_filter;
+ }
+ for (i = 0; i < filter_count; i++) {
+ server->channel_filters[i + 1 + census_enabled] = filters[i];
+ }
+
+ server->channel_args = grpc_channel_args_copy(args);
+
+ return server;
+}
+
+void grpc_server_start(grpc_server *server) {
+ listener *l;
+
+ for (l = server->listeners; l; l = l->next) {
+ l->start(server, l->arg);
+ }
+}
+
+grpc_transport_setup_result grpc_server_setup_transport(
+ grpc_server *s, grpc_transport *transport,
+ grpc_channel_filter const **extra_filters, size_t num_extra_filters,
+ grpc_mdctx *mdctx) {
+ size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
+ grpc_channel_filter const **filters =
+ gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
+ size_t i;
+ grpc_channel *channel;
+ channel_data *chand;
+
+ for (i = 0; i < s->channel_filter_count; i++) {
+ filters[i] = s->channel_filters[i];
+ }
+ for (; i < s->channel_filter_count + num_extra_filters; i++) {
+ filters[i] = extra_filters[i - s->channel_filter_count];
+ }
+ filters[i] = &grpc_connected_channel_filter;
+
+ channel = grpc_channel_create_from_filters(filters, num_filters,
+ s->channel_args, mdctx, 0);
+ chand = (channel_data *)grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(channel), 0)->channel_data;
+ chand->server = s;
+ server_ref(s);
+ chand->channel = channel;
+
+ gpr_mu_lock(&s->mu);
+ chand->next = &s->root_channel_data;
+ chand->prev = chand->next->prev;
+ chand->next->prev = chand->prev->next = chand;
+ gpr_mu_unlock(&s->mu);
+
+ gpr_free(filters);
+
+ return grpc_connected_channel_bind_transport(
+ grpc_channel_get_channel_stack(channel), transport);
+}
+
+void grpc_server_shutdown(grpc_server *server) {
+ /* TODO(ctiller): send goaway, etc */
+ listener *l;
+ void **tags;
+ size_t ntags;
+
+ /* lock, and gather up some stuff to do */
+ gpr_mu_lock(&server->mu);
+ if (server->shutdown) {
+ gpr_mu_unlock(&server->mu);
+ return;
+ }
+
+ tags = server->tags;
+ ntags = server->ntags;
+ server->tags = NULL;
+ server->ntags = 0;
+
+ server->shutdown = 1;
+ gpr_mu_unlock(&server->mu);
+
+ /* terminate all the requested calls */
+ early_terminate_requested_calls(server->cq, tags, ntags);
+ gpr_free(tags);
+
+ /* Shutdown listeners */
+ for (l = server->listeners; l; l = l->next) {
+ l->destroy(server, l->arg);
+ }
+ while (server->listeners) {
+ l = server->listeners;
+ server->listeners = l->next;
+ gpr_free(l);
+ }
+}
+
+void grpc_server_destroy(grpc_server *server) {
+ channel_data *c;
+ gpr_mu_lock(&server->mu);
+ for (c = server->root_channel_data.next; c != &server->root_channel_data;
+ c = c->next) {
+ shutdown_channel(c);
+ }
+ gpr_mu_unlock(&server->mu);
+
+ server_unref(server);
+}
+
+void grpc_server_add_listener(grpc_server *server, void *arg,
+ void (*start)(grpc_server *server, void *arg),
+ void (*destroy)(grpc_server *server, void *arg)) {
+ listener *l = gpr_malloc(sizeof(listener));
+ l->arg = arg;
+ l->start = start;
+ l->destroy = destroy;
+ l->next = server->listeners;
+ server->listeners = l;
+}
+
+grpc_call_error grpc_server_request_call(grpc_server *server, void *tag_new) {
+ call_data *calld;
+
+ grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
+
+ gpr_mu_lock(&server->mu);
+
+ if (server->shutdown) {
+ gpr_mu_unlock(&server->mu);
+ early_terminate_requested_calls(server->cq, &tag_new, 1);
+ return GRPC_CALL_OK;
+ }
+
+ calld = call_list_remove_head(server, PENDING_START);
+ if (calld) {
+ GPR_ASSERT(calld->state == PENDING);
+ calld->state = ACTIVATED;
+ queue_new_rpc(server, calld, tag_new);
+ } else {
+ if (server->tag_cap == server->ntags) {
+ server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1);
+ server->tags =
+ gpr_realloc(server->tags, sizeof(void *) * server->tag_cap);
+ }
+ server->tags[server->ntags++] = tag_new;
+ }
+ gpr_mu_unlock(&server->mu);
+
+ return GRPC_CALL_OK;
+}
+
+const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
+ return server->channel_args;
+}
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
new file mode 100644
index 0000000000..f0773ab9d5
--- /dev/null
+++ b/src/core/surface/server.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_SERVER_H__
+#define __GRPC_INTERNAL_SURFACE_SERVER_H__
+
+#include "src/core/channel/channel_stack.h"
+#include <grpc/grpc.h>
+#include "src/core/transport/transport.h"
+
+/* Create a server */
+grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
+ grpc_channel_filter **filters,
+ size_t filter_count,
+ const grpc_channel_args *args);
+
+/* Add a listener to the server: when the server starts, it will call start,
+ and when it shuts down, it will call destroy */
+void grpc_server_add_listener(grpc_server *server, void *listener,
+ void (*start)(grpc_server *server, void *arg),
+ void (*destroy)(grpc_server *server, void *arg));
+
+/* Setup a transport - creates a channel stack, binds the transport to the
+ server */
+grpc_transport_setup_result grpc_server_setup_transport(
+ grpc_server *server, grpc_transport *transport,
+ grpc_channel_filter const **extra_filters, size_t num_extra_filters,
+ grpc_mdctx *mdctx);
+
+const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
+
+#endif /* __GRPC_INTERNAL_SURFACE_SERVER_H__ */
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
new file mode 100644
index 0000000000..24c5757166
--- /dev/null
+++ b/src/core/surface/server_chttp2.c
@@ -0,0 +1,123 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+
+#include "src/core/channel/http_filter.h"
+#include "src/core/channel/http_server_filter.h"
+#include "src/core/endpoint/resolve_address.h"
+#include "src/core/endpoint/tcp_server.h"
+#include "src/core/surface/server.h"
+#include "src/core/surface/surface_em.h"
+#include "src/core/transport/chttp2_transport.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+static grpc_transport_setup_result setup_transport(void *server,
+ grpc_transport *transport,
+ grpc_mdctx *mdctx) {
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
+ &grpc_http_filter};
+ return grpc_server_setup_transport(server, transport, extra_filters,
+ GPR_ARRAY_SIZE(extra_filters), mdctx);
+}
+
+static void new_transport(void *server, grpc_endpoint *tcp) {
+ grpc_create_chttp2_transport(setup_transport, server,
+ grpc_server_get_channel_args(server), tcp, NULL,
+ 0, grpc_mdctx_create(), 0);
+}
+
+/* Server callback: start listening on our ports */
+static void start(grpc_server *server, void *tcpp) {
+ grpc_tcp_server *tcp = tcpp;
+ grpc_tcp_server_start(tcp, new_transport, server);
+}
+
+/* Server callback: destroy the tcp listener (so we don't generate further
+ callbacks) */
+static void destroy(grpc_server *server, void *tcpp) {
+ grpc_tcp_server *tcp = tcpp;
+ grpc_tcp_server_destroy(tcp);
+}
+
+int grpc_server_add_http2_port(grpc_server *server, const char *addr) {
+ grpc_resolved_addresses *resolved = NULL;
+ grpc_tcp_server *tcp = NULL;
+ size_t i;
+ int count = 0;
+
+ resolved = grpc_blocking_resolve_address(addr, "http");
+ if (!resolved) {
+ goto error;
+ }
+
+ tcp = grpc_tcp_server_create(grpc_surface_em());
+ if (!tcp) {
+ goto error;
+ }
+
+ for (i = 0; i < resolved->naddrs; i++) {
+ if (grpc_tcp_server_add_port(tcp,
+ (struct sockaddr *)&resolved->addrs[i].addr,
+ resolved->addrs[i].len) >= 0) {
+ count++;
+ }
+ }
+ if (count == 0) {
+ gpr_log(GPR_ERROR, "No address added out of total %d resolved",
+ resolved->naddrs);
+ goto error;
+ }
+ if (count != resolved->naddrs) {
+ gpr_log(GPR_ERROR, "Only %d addresses added out of total %d resolved",
+ count, resolved->naddrs);
+ }
+ grpc_resolved_addresses_destroy(resolved);
+
+ /* Register with the server only upon success */
+ grpc_server_add_listener(server, tcp, start, destroy);
+
+ return 1;
+
+/* Error path: cleanup and return */
+error:
+ if (resolved) {
+ grpc_resolved_addresses_destroy(resolved);
+ }
+ if (tcp) {
+ grpc_tcp_server_destroy(tcp);
+ }
+ return 0;
+}
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
new file mode 100644
index 0000000000..dcc6ce1ccc
--- /dev/null
+++ b/src/core/surface/server_create.c
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include "src/core/surface/completion_queue.h"
+#include "src/core/surface/server.h"
+
+grpc_server *grpc_server_create(grpc_completion_queue *cq,
+ const grpc_channel_args *args) {
+ return grpc_server_create_from_filters(cq, NULL, 0, args);
+}
diff --git a/src/core/surface/surface_em.c b/src/core/surface/surface_em.c
new file mode 100644
index 0000000000..e1785d1a44
--- /dev/null
+++ b/src/core/surface/surface_em.c
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/surface_em.h"
+#include <grpc/support/log.h>
+
+static int initialized = 0;
+static grpc_em em;
+
+grpc_em *grpc_surface_em() {
+ GPR_ASSERT(initialized && "call grpc_init()");
+ return &em;
+}
+
+void grpc_surface_em_init() {
+ GPR_ASSERT(!initialized);
+ initialized = 1;
+ grpc_em_init(&em);
+}
+
+void grpc_surface_em_shutdown() {
+ GPR_ASSERT(initialized);
+ grpc_em_destroy(&em);
+ initialized = 0;
+}
diff --git a/src/core/surface/surface_em.h b/src/core/surface/surface_em.h
new file mode 100644
index 0000000000..165f42f868
--- /dev/null
+++ b/src/core/surface/surface_em.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__
+#define __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__
+
+#include "src/core/eventmanager/em.h"
+
+/* Returns a global singleton event manager for
+ the surface apis, and is passed down to channels and
+ transports as needed. */
+grpc_em *grpc_surface_em();
+
+void grpc_surface_em_init();
+void grpc_surface_em_shutdown();
+
+#endif /* __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__ */
diff --git a/src/core/surface/surface_trace.h b/src/core/surface/surface_trace.h
new file mode 100644
index 0000000000..f6f9acfd9c
--- /dev/null
+++ b/src/core/surface/surface_trace.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_SURFACE_TRACE_H__
+#define __GRPC_INTERNAL_SURFACE_SURFACE_TRACE_H__
+
+#include <grpc/support/log.h>
+
+/* #define GRPC_ENABLE_SURFACE_TRACE 1 */
+
+#ifdef GRPC_ENABLE_SURFACE_TRACE
+#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
+ do { \
+ char *_ev = grpc_event_string(event); \
+ gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
+ gpr_free(_ev); \
+ } while (0)
+#else
+#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
+ do { \
+ } while (0)
+#endif
+
+#endif /* __GRPC_INTERNAL_SURFACE_SURFACE_TRACE_H__ */