diff options
author | Nicolas Noble <nnoble@google.com> | 2014-11-26 16:33:03 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2014-11-26 16:33:03 -0800 |
commit | b7ebd3b8c6fe39f99c40b10c1b563e4adb607b6c (patch) | |
tree | c1decf819492d455ec81cd471942c5516138f825 /src/core/surface | |
parent | 0e905e63db21bcdd85d3d1af051fcdc5bb5caa38 (diff) |
Initial import.
Diffstat (limited to 'src/core/surface')
25 files changed, 3750 insertions, 0 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c new file mode 100644 index 0000000000..27a6c6e33d --- /dev/null +++ b/src/core/surface/byte_buffer.c @@ -0,0 +1,68 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/byte_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) { + size_t i; + grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer)); + + bb->type = GRPC_BB_SLICE_BUFFER; + gpr_slice_buffer_init(&bb->data.slice_buffer); + for (i = 0; i < nslices; i++) { + gpr_slice_ref(slices[i]); + gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]); + } + + return bb; +} + +void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { + switch (bb->type) { + case GRPC_BB_SLICE_BUFFER: + gpr_slice_buffer_destroy(&bb->data.slice_buffer); + break; + } + free(bb); +} + +size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) { + switch (bb->type) { + case GRPC_BB_SLICE_BUFFER: + return bb->data.slice_buffer.length; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c new file mode 100644 index 0000000000..18500b83e8 --- /dev/null +++ b/src/core/surface/byte_buffer_reader.c @@ -0,0 +1,74 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/byte_buffer_reader.h> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/byte_buffer.h> + +grpc_byte_buffer_reader *grpc_byte_buffer_reader_create( + grpc_byte_buffer *buffer) { + grpc_byte_buffer_reader *reader = malloc(sizeof(grpc_byte_buffer_reader)); + reader->buffer = buffer; + switch (buffer->type) { + case GRPC_BB_SLICE_BUFFER: + reader->current.index = 0; + } + return reader; +} + +int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, + gpr_slice *slice) { + grpc_byte_buffer *buffer = reader->buffer; + gpr_slice_buffer *slice_buffer; + switch (buffer->type) { + case GRPC_BB_SLICE_BUFFER: + slice_buffer = &buffer->data.slice_buffer; + if (reader->current.index < slice_buffer->count) { + *slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]); + reader->current.index += 1; + return 1; + } else { + return 0; + } + break; + } + return 0; +} + +void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) { + free(reader); +} diff --git a/src/core/surface/call.c b/src/core/surface/call.c new file mode 100644 index 0000000000..63d408d2d5 --- /dev/null +++ b/src/core/surface/call.c @@ -0,0 +1,835 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/call.h" +#include "src/core/channel/channel_stack.h" +#include "src/core/channel/metadata_buffer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include "src/core/surface/channel.h" +#include "src/core/surface/completion_queue.h" +#include "src/core/surface/surface_em.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#define INVALID_TAG ((void *)0xdeadbeef) + +/* Pending read queue + + This data structure tracks reads that need to be presented to the completion + queue but are waiting for the application to ask for them. */ + +#define INITIAL_PENDING_READ_COUNT 4 + +typedef struct { + grpc_byte_buffer *byte_buffer; + void *user_data; + void (*on_finish)(void *user_data, grpc_op_error error); +} pending_read; + +/* TODO(ctiller): inline an element or two into this struct to avoid per-call + allocations */ +typedef struct { + pending_read *data; + size_t count; + size_t capacity; +} pending_read_array; + +typedef struct { + size_t drain_pos; + pending_read_array filling; + pending_read_array draining; +} pending_read_queue; + +static void pra_init(pending_read_array *array) { + array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT); + array->count = 0; + array->capacity = INITIAL_PENDING_READ_COUNT; +} + +static void pra_destroy(pending_read_array *array, + size_t finish_starting_from) { + size_t i; + for (i = finish_starting_from; i < array->count; i++) { + array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR); + } + gpr_free(array->data); +} + +/* Append an operation to an array, expanding as needed */ +static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer, + void (*on_finish)(void *user_data, grpc_op_error error), + void *user_data) { + if (a->count == a->capacity) { + a->capacity *= 2; + a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity); + } + a->data[a->count].byte_buffer = buffer; + a->data[a->count].user_data = user_data; + a->data[a->count].on_finish = on_finish; + a->count++; +} + +static void prq_init(pending_read_queue *q) { + q->drain_pos = 0; + pra_init(&q->filling); + pra_init(&q->draining); +} + +static void prq_destroy(pending_read_queue *q) { + pra_destroy(&q->filling, 0); + pra_destroy(&q->draining, q->drain_pos); +} + +static int prq_is_empty(pending_read_queue *q) { + return (q->drain_pos == q->draining.count && q->filling.count == 0); +} + +static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer, + void (*on_finish)(void *user_data, grpc_op_error error), + void *user_data) { + pra_push(&q->filling, buffer, on_finish, user_data); +} + +/* Take the first queue element and move it to the completion queue. Do nothing + if q is empty */ +static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call, + grpc_completion_queue *cq) { + pending_read_array temp_array; + pending_read *pr; + + if (q->drain_pos == q->draining.count) { + if (q->filling.count == 0) { + return 0; + } + q->draining.count = 0; + q->drain_pos = 0; + /* swap arrays */ + temp_array = q->filling; + q->filling = q->draining; + q->draining = temp_array; + } + + pr = q->draining.data + q->drain_pos; + q->drain_pos++; + grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data, + pr->byte_buffer); + return 1; +} + +/* grpc_call proper */ + +/* the state of a call, based upon which functions have been called against + said call */ +typedef enum { CALL_CREATED, CALL_STARTED, CALL_FINISHED } call_state; + +struct grpc_call { + grpc_completion_queue *cq; + grpc_channel *channel; + grpc_mdctx *metadata_context; + + call_state state; + gpr_uint8 is_client; + gpr_uint8 have_write; + grpc_metadata_buffer incoming_metadata; + + /* protects variables in this section */ + gpr_mu read_mu; + gpr_uint8 reads_done; + gpr_uint8 received_finish; + gpr_uint8 received_metadata; + gpr_uint8 have_read; + gpr_uint8 have_alarm; + /* The current outstanding read message tag (only valid if have_read == 1) */ + void *read_tag; + void *metadata_tag; + void *finished_tag; + pending_read_queue prq; + + grpc_em_alarm alarm; + + /* The current outstanding send message/context/invoke/end tag (only valid if + have_write == 1) */ + void *write_tag; + + /* The final status of the call */ + grpc_status_code status_code; + grpc_mdstr *status_details; + + gpr_refcount internal_refcount; +}; + +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) +#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) +#define CALL_ELEM_FROM_CALL(call, idx) \ + grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) +#define CALL_FROM_TOP_ELEM(top_elem) \ + CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) + +static void do_nothing(void *ignored, grpc_op_error also_ignored) {} + +grpc_call *grpc_call_create(grpc_channel *channel, + const void *server_transport_data) { + grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); + grpc_call *call = + gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); + call->cq = NULL; + call->channel = channel; + grpc_channel_internal_ref(channel); + call->metadata_context = grpc_channel_get_metadata_context(channel); + call->state = CALL_CREATED; + call->is_client = (server_transport_data == NULL); + call->write_tag = INVALID_TAG; + call->read_tag = INVALID_TAG; + call->metadata_tag = INVALID_TAG; + call->finished_tag = INVALID_TAG; + call->have_read = 0; + call->have_write = 0; + call->have_alarm = 0; + call->received_metadata = 0; + call->status_code = + server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN; + call->status_details = NULL; + call->received_finish = 0; + call->reads_done = 0; + grpc_metadata_buffer_init(&call->incoming_metadata); + gpr_ref_init(&call->internal_refcount, 1); + grpc_call_stack_init(channel_stack, server_transport_data, + CALL_STACK_FROM_CALL(call)); + prq_init(&call->prq); + gpr_mu_init(&call->read_mu); + return call; +} + +void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } + +void grpc_call_internal_unref(grpc_call *c) { + if (gpr_unref(&c->internal_refcount)) { + grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); + grpc_metadata_buffer_destroy(&c->incoming_metadata, GRPC_OP_OK); + if (c->status_details) { + grpc_mdstr_unref(c->status_details); + } + prq_destroy(&c->prq); + gpr_mu_destroy(&c->read_mu); + grpc_channel_internal_unref(c->channel); + gpr_free(c); + } +} + +void grpc_call_destroy(grpc_call *c) { + gpr_mu_lock(&c->read_mu); + if (c->have_alarm) { + void *arg_was; + grpc_em_alarm_cancel(&c->alarm, &arg_was); + c->have_alarm = 0; + } + gpr_mu_unlock(&c->read_mu); + grpc_call_internal_unref(c); +} + +grpc_call_error grpc_call_cancel(grpc_call *c) { + grpc_call_element *elem; + grpc_call_op op; + + op.type = GRPC_CANCEL_OP; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = do_nothing; + op.user_data = NULL; + + elem = CALL_ELEM_FROM_CALL(c, 0); + elem->filter->call_op(elem, &op); + + return GRPC_CALL_OK; +} + +void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { + grpc_call_element *elem; + GPR_ASSERT(op->dir == GRPC_CALL_DOWN); + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, op); +} + +grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, + gpr_uint32 flags) { + grpc_call_element *elem; + grpc_call_op op; + + if (call->state >= CALL_STARTED) { + return GRPC_CALL_ERROR_ALREADY_INVOKED; + } + + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.done_cb = do_nothing; + op.user_data = NULL; + op.data.metadata = grpc_mdelem_from_string_and_buffer( + call->metadata_context, metadata->key, (gpr_uint8 *)metadata->value, + metadata->value_length); + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, &op); + + return GRPC_CALL_OK; +} + +static void done_invoke(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + void *tag = call->write_tag; + + GPR_ASSERT(call->have_write); + call->have_write = 0; + call->write_tag = INVALID_TAG; + grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error); +} + +static void finish_call(grpc_call *call) { + grpc_status status; + status.code = call->status_code; + status.details = call->status_details + ? (char *)grpc_mdstr_as_c_string(call->status_details) + : NULL; + grpc_cq_end_finished(call->cq, call->finished_tag, call, NULL, NULL, status); +} + +grpc_call_error grpc_call_start_invoke(grpc_call *call, + grpc_completion_queue *cq, + void *invoke_accepted_tag, + void *metadata_read_tag, + void *finished_tag, gpr_uint32 flags) { + grpc_call_element *elem; + grpc_call_op op; + + /* validate preconditions */ + if (!call->is_client) { + gpr_log(GPR_ERROR, "can only call %s on clients", __FUNCTION__); + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + + if (call->state >= CALL_STARTED || call->cq) { + gpr_log(GPR_ERROR, "call is already invoked"); + return GRPC_CALL_ERROR_ALREADY_INVOKED; + } + + if (call->have_write) { + gpr_log(GPR_ERROR, "can only have one pending write operation at a time"); + return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + } + + if (call->have_read) { + gpr_log(GPR_ERROR, "can only have one pending read operation at a time"); + return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + } + + if (flags & GRPC_WRITE_NO_COMPRESS) { + return GRPC_CALL_ERROR_INVALID_FLAGS; + } + + /* inform the completion queue of an incoming operation */ + grpc_cq_begin_op(cq, call, GRPC_FINISHED); + grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); + grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED); + + gpr_mu_lock(&call->read_mu); + + /* update state */ + call->cq = cq; + call->state = CALL_STARTED; + call->finished_tag = finished_tag; + + if (call->received_finish) { + /* handle early cancellation */ + grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL, + GRPC_OP_ERROR); + grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL, + NULL, 0, NULL); + finish_call(call); + + /* early out.. unlock & return */ + gpr_mu_unlock(&call->read_mu); + return GRPC_CALL_OK; + } + + call->write_tag = invoke_accepted_tag; + call->metadata_tag = metadata_read_tag; + + call->have_write = 1; + + gpr_mu_unlock(&call->read_mu); + + /* call down the filter stack */ + op.type = GRPC_SEND_START; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.done_cb = done_invoke; + op.user_data = call; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, &op); + + return GRPC_CALL_OK; +} + +grpc_call_error grpc_call_accept(grpc_call *call, grpc_completion_queue *cq, + void *finished_tag, gpr_uint32 flags) { + grpc_call_element *elem; + grpc_call_op op; + + /* validate preconditions */ + if (call->is_client) { + gpr_log(GPR_ERROR, "can only call %s on servers", __FUNCTION__); + return GRPC_CALL_ERROR_NOT_ON_CLIENT; + } + + if (call->state >= CALL_STARTED) { + gpr_log(GPR_ERROR, "call is already invoked"); + return GRPC_CALL_ERROR_ALREADY_INVOKED; + } + + if (flags & GRPC_WRITE_NO_COMPRESS) { + return GRPC_CALL_ERROR_INVALID_FLAGS; + } + + /* inform the completion queue of an incoming operation (corresponding to + finished_tag) */ + grpc_cq_begin_op(cq, call, GRPC_FINISHED); + + /* update state */ + gpr_mu_lock(&call->read_mu); + call->state = CALL_STARTED; + call->cq = cq; + call->finished_tag = finished_tag; + if (prq_is_empty(&call->prq) && call->received_finish) { + finish_call(call); + + /* early out.. unlock & return */ + gpr_mu_unlock(&call->read_mu); + return GRPC_CALL_OK; + } + gpr_mu_unlock(&call->read_mu); + + /* call down */ + op.type = GRPC_SEND_START; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.done_cb = do_nothing; + op.user_data = NULL; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, &op); + + return GRPC_CALL_OK; +} + +static void done_writes_done(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + void *tag = call->write_tag; + + GPR_ASSERT(call->have_write); + call->have_write = 0; + call->write_tag = INVALID_TAG; + grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error); +} + +static void done_write(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + void *tag = call->write_tag; + + GPR_ASSERT(call->have_write); + call->have_write = 0; + call->write_tag = INVALID_TAG; + grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error); +} + +void grpc_call_client_initial_metadata_complete( + grpc_call_element *surface_element) { + grpc_call *call = grpc_call_from_top_element(surface_element); + size_t count; + grpc_metadata *elements; + + gpr_mu_lock(&call->read_mu); + count = grpc_metadata_buffer_count(&call->incoming_metadata); + elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata); + + GPR_ASSERT(!call->received_metadata); + grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call, + grpc_metadata_buffer_cleanup_elements, + elements, count, elements); + call->received_metadata = 1; + call->metadata_tag = INVALID_TAG; + gpr_mu_unlock(&call->read_mu); +} + +static void request_more_data(grpc_call *call) { + grpc_call_element *elem; + grpc_call_op op; + + /* call down */ + op.type = GRPC_REQUEST_DATA; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = do_nothing; + op.user_data = NULL; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, &op); +} + +grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) { + gpr_uint8 request_more = 0; + + switch (call->state) { + case CALL_CREATED: + return GRPC_CALL_ERROR_NOT_INVOKED; + case CALL_STARTED: + break; + case CALL_FINISHED: + return GRPC_CALL_ERROR_ALREADY_FINISHED; + } + + gpr_mu_lock(&call->read_mu); + + if (call->have_read) { + gpr_mu_unlock(&call->read_mu); + return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + } + + grpc_cq_begin_op(call->cq, call, GRPC_READ); + + if (!prq_pop_to_cq(&call->prq, tag, call, call->cq)) { + if (call->reads_done) { + grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL); + } else { + call->read_tag = tag; + call->have_read = 1; + request_more = 1; + } + } else if (prq_is_empty(&call->prq) && call->received_finish) { + finish_call(call); + } + + gpr_mu_unlock(&call->read_mu); + + if (request_more) { + request_more_data(call); + } + + return GRPC_CALL_OK; +} + +grpc_call_error grpc_call_start_write(grpc_call *call, + grpc_byte_buffer *byte_buffer, void *tag, + gpr_uint32 flags) { + grpc_call_element *elem; + grpc_call_op op; + + switch (call->state) { + case CALL_CREATED: + return GRPC_CALL_ERROR_NOT_INVOKED; + case CALL_STARTED: + break; + case CALL_FINISHED: + return GRPC_CALL_ERROR_ALREADY_FINISHED; + } + + if (call->have_write) { + return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + } + + grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); + + /* for now we do no buffering, so a NULL byte_buffer can have no impact + on our behavior -- succeed immediately */ + /* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a + flush, and that flush should be propogated down from here */ + if (byte_buffer == NULL) { + grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, GRPC_OP_OK); + return GRPC_CALL_OK; + } + + call->write_tag = tag; + call->have_write = 1; + + op.type = GRPC_SEND_MESSAGE; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.done_cb = done_write; + op.user_data = call; + op.data.message = byte_buffer; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, &op); + + return GRPC_CALL_OK; +} + +grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) { + grpc_call_element *elem; + grpc_call_op op; + + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + + switch (call->state) { + case CALL_CREATED: + return GRPC_CALL_ERROR_NOT_INVOKED; + case CALL_FINISHED: + return GRPC_CALL_ERROR_ALREADY_FINISHED; + case CALL_STARTED: + break; + } + + if (call->have_write) { + return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + } + + grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); + + call->write_tag = tag; + call->have_write = 1; + + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = done_writes_done; + op.user_data = call; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, &op); + + return GRPC_CALL_OK; +} + +grpc_call_error grpc_call_start_write_status(grpc_call *call, + grpc_status status, void *tag) { + grpc_call_element *elem; + grpc_call_op op; + + if (call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_CLIENT; + } + + switch (call->state) { + case CALL_CREATED: + return GRPC_CALL_ERROR_NOT_INVOKED; + case CALL_FINISHED: + return GRPC_CALL_ERROR_ALREADY_FINISHED; + case CALL_STARTED: + break; + } + + if (call->have_write) { + return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + } + + elem = CALL_ELEM_FROM_CALL(call, 0); + + if (status.details && status.details[0]) { + grpc_mdelem *md = grpc_mdelem_from_strings(call->metadata_context, + "grpc-message", status.details); + + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = do_nothing; + op.user_data = NULL; + op.data.metadata = md; + elem->filter->call_op(elem, &op); + } + + /* always send status */ + { + grpc_mdelem *md; + char buffer[32]; + sprintf(buffer, "%d", status.code); + md = + grpc_mdelem_from_strings(call->metadata_context, "grpc-status", buffer); + + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = do_nothing; + op.user_data = NULL; + op.data.metadata = md; + elem->filter->call_op(elem, &op); + } + + grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); + + call->state = CALL_FINISHED; + call->write_tag = tag; + call->have_write = 1; + + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = done_writes_done; + op.user_data = call; + + elem->filter->call_op(elem, &op); + + return GRPC_CALL_OK; +} + +/* we offset status by a small amount when storing it into transport metadata + as metadata cannot store a 0 value (which is used as OK for grpc_status_codes + */ +#define STATUS_OFFSET 1 +static void destroy_status(void *ignored) {} + +static gpr_uint32 decode_status(grpc_mdelem *md) { + gpr_uint32 status; + void *user_data = grpc_mdelem_get_user_data(md, destroy_status); + if (user_data) { + status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; + } else { + if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), + GPR_SLICE_LENGTH(md->value->slice), + &status)) { + status = GRPC_STATUS_UNKNOWN; /* could not parse status code */ + } + grpc_mdelem_set_user_data(md, destroy_status, + (void *)(gpr_intptr)(status + STATUS_OFFSET)); + } + return status; +} + +void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + grpc_mdelem *md = op->data.metadata; + grpc_mdstr *key = md->key; + if (key == grpc_channel_get_status_string(call->channel)) { + call->status_code = decode_status(md); + grpc_mdelem_unref(md); + op->done_cb(op->user_data, GRPC_OP_OK); + } else if (key == grpc_channel_get_message_string(call->channel)) { + if (call->status_details) { + grpc_mdstr_unref(call->status_details); + } + call->status_details = grpc_mdstr_ref(md->value); + grpc_mdelem_unref(md); + op->done_cb(op->user_data, GRPC_OP_OK); + } else { + grpc_metadata_buffer_queue(&call->incoming_metadata, op); + } +} + +void grpc_call_recv_finish(grpc_call_element *elem, int is_full_close) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + + gpr_mu_lock(&call->read_mu); + + if (call->have_read) { + grpc_cq_end_read(call->cq, call->read_tag, call, do_nothing, NULL, NULL); + call->read_tag = INVALID_TAG; + call->have_read = 0; + } + if (call->is_client && !call->received_metadata && call->cq) { + size_t count; + grpc_metadata *elements; + + call->received_metadata = 1; + + count = grpc_metadata_buffer_count(&call->incoming_metadata); + elements = grpc_metadata_buffer_extract_elements(&call->incoming_metadata); + grpc_cq_end_client_metadata_read(call->cq, call->metadata_tag, call, + grpc_metadata_buffer_cleanup_elements, + elements, count, elements); + } + if (is_full_close) { + if (call->have_alarm) { + void *arg_was; + grpc_em_alarm_cancel(&call->alarm, &arg_was); + call->have_alarm = 0; + } + call->received_finish = 1; + if (prq_is_empty(&call->prq) && call->cq != NULL) { + finish_call(call); + } + } else { + call->reads_done = 1; + } + gpr_mu_unlock(&call->read_mu); +} + +void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *message, + void (*on_finish)(void *user_data, + grpc_op_error error), + void *user_data) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + + gpr_mu_lock(&call->read_mu); + if (call->have_read) { + grpc_cq_end_read(call->cq, call->read_tag, call, on_finish, user_data, + message); + call->read_tag = INVALID_TAG; + call->have_read = 0; + } else { + prq_push(&call->prq, message, on_finish, user_data); + } + gpr_mu_unlock(&call->read_mu); +} + +grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { + return CALL_FROM_TOP_ELEM(elem); +} + +grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) { + return &call->incoming_metadata; +} + +static void call_alarm(void *arg, grpc_em_cb_status status) { + grpc_call *call = arg; + if (status == GRPC_CALLBACK_SUCCESS) { + grpc_call_cancel(call); + } + grpc_call_internal_unref(call); +} + +void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + + if (call->have_alarm) { + gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); + } + grpc_call_internal_ref(call); + call->have_alarm = 1; + grpc_em_alarm_init(&call->alarm, grpc_surface_em(), call_alarm, call); + grpc_em_alarm_add(&call->alarm, deadline); +} diff --git a/src/core/surface/call.h b/src/core/surface/call.h new file mode 100644 index 0000000000..2c785a59fc --- /dev/null +++ b/src/core/surface/call.h @@ -0,0 +1,73 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_CALL_H__ +#define __GRPC_INTERNAL_SURFACE_CALL_H__ + +#include "src/core/channel/channel_stack.h" +#include "src/core/channel/metadata_buffer.h" +#include <grpc/grpc.h> + +grpc_call *grpc_call_create(grpc_channel *channel, + const void *server_transport_data); + +void grpc_call_internal_ref(grpc_call *call); +void grpc_call_internal_unref(grpc_call *call); + +/* Helpers for grpc_client, grpc_server filters to publish received data to + the completion queue/surface layer */ +void grpc_call_recv_metadata(grpc_call_element *surface_element, + grpc_call_op *op); +void grpc_call_recv_message( + grpc_call_element *surface_element, grpc_byte_buffer *message, + void (*on_finish)(void *user_data, grpc_op_error error), void *user_data); +void grpc_call_recv_finish(grpc_call_element *surface_element, + int is_full_close); + +void grpc_call_execute_op(grpc_call *call, grpc_call_op *op); + +/* Called when it's known that the initial batch of metadata is complete on the + client side (must not be called on the server) */ +void grpc_call_client_initial_metadata_complete( + grpc_call_element *surface_element); + +void grpc_call_set_deadline(grpc_call_element *surface_element, + gpr_timespec deadline); + +/* Given the top call_element, get the call object. */ +grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); + +/* Get the metadata buffer. */ +grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call); + +#endif /* __GRPC_INTERNAL_SURFACE_CALL_H__ */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c new file mode 100644 index 0000000000..ff994257f4 --- /dev/null +++ b/src/core/surface/channel.c @@ -0,0 +1,152 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/channel.h" + +#include <stdlib.h> +#include <string.h> + +#include "src/core/surface/call.h" +#include "src/core/surface/client.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +struct grpc_channel { + int is_client; + gpr_refcount refs; + grpc_mdctx *metadata_context; + grpc_mdstr *grpc_status_string; + grpc_mdstr *grpc_message_string; +}; + +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) + +grpc_channel *grpc_channel_create_from_filters( + const grpc_channel_filter **filters, size_t num_filters, + const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + size_t size = + sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); + grpc_channel *channel = gpr_malloc(size); + channel->is_client = is_client; + /* decremented by grpc_channel_destroy */ + gpr_ref_init(&channel->refs, 1); + channel->metadata_context = mdctx; + channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); + channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); + grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, + CHANNEL_STACK_FROM_CHANNEL(channel)); + return channel; +} + +static void do_nothing(void *ignored, grpc_op_error error) {} + +grpc_call *grpc_channel_create_call(grpc_channel *channel, const char *method, + const char *host, + gpr_timespec absolute_deadline) { + grpc_call *call; + grpc_metadata md; + + if (!channel->is_client) { + gpr_log(GPR_ERROR, "Cannot create a call on the server."); + return NULL; + } + + call = grpc_call_create(channel, NULL); + +#define ADDMD(k, v) \ + do { \ + md.key = (k); \ + md.value = (char *)(v); \ + md.value_length = strlen((v)); \ + grpc_call_add_metadata(call, &md, 0); \ + } while (0) + ADDMD(":method", "POST"); + ADDMD(":scheme", "grpc"); + ADDMD(":path", method); + ADDMD(":authority", host); + ADDMD("content-type", "application/grpc"); + if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) { + grpc_call_op op; + op.type = GRPC_SEND_DEADLINE; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.data.deadline = absolute_deadline; + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); + } + + return call; +} + +void grpc_channel_internal_ref(grpc_channel *channel) { + gpr_ref(&channel->refs); +} + +void grpc_channel_internal_unref(grpc_channel *channel) { + if (gpr_unref(&channel->refs)) { + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); + grpc_mdstr_unref(channel->grpc_status_string); + grpc_mdstr_unref(channel->grpc_message_string); + grpc_mdctx_orphan(channel->metadata_context); + gpr_free(channel); + } +} + +void grpc_channel_destroy(grpc_channel *channel) { + grpc_channel_op op; + grpc_channel_element *elem; + + op.type = GRPC_CHANNEL_SHUTDOWN; + op.dir = GRPC_CALL_DOWN; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); + elem->filter->channel_op(elem, &op); + + grpc_channel_internal_unref(channel); +} + +grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { + return CHANNEL_STACK_FROM_CHANNEL(channel); +} + +grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel) { + return channel->metadata_context; +} + +grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { + return channel->grpc_status_string; +} + +grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { + return channel->grpc_message_string; +} diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h new file mode 100644 index 0000000000..11d4939916 --- /dev/null +++ b/src/core/surface/channel.h @@ -0,0 +1,51 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_CHANNEL_H__ +#define __GRPC_INTERNAL_SURFACE_CHANNEL_H__ + +#include "src/core/channel/channel_stack.h" + +grpc_channel *grpc_channel_create_from_filters( + const grpc_channel_filter **filters, size_t count, + const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); + +grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); +grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); +grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); +grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); + +void grpc_channel_internal_ref(grpc_channel *channel); +void grpc_channel_internal_unref(grpc_channel *channel); + +#endif /* __GRPC_INTERNAL_SURFACE_CHANNEL_H__ */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c new file mode 100644 index 0000000000..ec1c8477fa --- /dev/null +++ b/src/core/surface/channel_create.c @@ -0,0 +1,213 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> + +#include <stdlib.h> +#include <string.h> + +#include "src/core/channel/census_filter.h" +#include "src/core/channel/channel_args.h" +#include "src/core/channel/client_channel.h" +#include "src/core/channel/client_setup.h" +#include "src/core/channel/connected_channel.h" +#include "src/core/channel/http_client_filter.h" +#include "src/core/channel/http_filter.h" +#include "src/core/endpoint/resolve_address.h" +#include "src/core/endpoint/tcp.h" +#include "src/core/endpoint/tcp_client.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/client.h" +#include "src/core/surface/surface_em.h" +#include "src/core/transport/chttp2_transport.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +typedef struct setup setup; + +/* A single setup request (started via initiate) */ +typedef struct { + grpc_client_setup_request *cs_request; + setup *setup; + /* Resolved addresses, or null if resolution not yet completed */ + grpc_resolved_addresses *resolved; + /* which address in resolved should we pick for the next connection attempt */ + size_t resolved_index; +} request; + +/* Global setup logic (may be running many simultaneous setup requests, but + with only one 'active' */ +struct setup { + const char *target; + grpc_transport_setup_callback setup_callback; + void *setup_user_data; + grpc_em *em; +}; + +static int maybe_try_next_resolved(request *r); + +static void done(request *r, int was_successful) { + grpc_client_setup_request_finish(r->cs_request, was_successful); + if (r->resolved) { + grpc_resolved_addresses_destroy(r->resolved); + } + gpr_free(r); +} + +/* connection callback: tcp is either valid, or null on error */ +static void on_connect(void *rp, grpc_endpoint *tcp) { + request *r = rp; + + if (!grpc_client_setup_request_should_continue(r->cs_request)) { + if (tcp) { + grpc_endpoint_shutdown(tcp); + grpc_endpoint_destroy(tcp); + } + done(r, 0); + return; + } + + if (!tcp) { + if (!maybe_try_next_resolved(r)) { + done(r, 0); + return; + } else { + return; + } + } else { + grpc_create_chttp2_transport( + r->setup->setup_callback, r->setup->setup_user_data, + grpc_client_setup_get_channel_args(r->cs_request), tcp, NULL, 0, + grpc_client_setup_get_mdctx(r->cs_request), 1); + done(r, 1); + return; + } +} + +/* attempt to connect to the next available resolved address */ +static int maybe_try_next_resolved(request *r) { + grpc_resolved_address *addr; + if (!r->resolved) return 0; + if (r->resolved_index == r->resolved->naddrs) return 0; + addr = &r->resolved->addrs[r->resolved_index++]; + grpc_tcp_client_connect(on_connect, r, r->setup->em, + (struct sockaddr *)&addr->addr, addr->len, + grpc_client_setup_request_deadline(r->cs_request)); + return 1; +} + +/* callback for when our target address has been resolved */ +static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { + request *r = rp; + + /* if we're not still the active request, abort */ + if (!grpc_client_setup_request_should_continue(r->cs_request)) { + if (resolved) { + grpc_resolved_addresses_destroy(resolved); + } + done(r, 0); + return; + } + + if (!resolved) { + done(r, 0); + return; + } else { + r->resolved = resolved; + r->resolved_index = 0; + if (!maybe_try_next_resolved(r)) { + done(r, 0); + } + } +} + +static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) { + request *r = gpr_malloc(sizeof(request)); + r->setup = sp; + r->cs_request = cs_request; + r->resolved = NULL; + r->resolved_index = 0; + /* TODO(klempner): Make grpc_resolve_address respect deadline */ + grpc_resolve_address(r->setup->target, "http", on_resolved, r); +} + +static void done_setup(void *sp) { + setup *s = sp; + gpr_free((void *)s->target); + gpr_free(s); +} + +static grpc_transport_setup_result complete_setup(void *channel_stack, + grpc_transport *transport, + grpc_mdctx *mdctx) { + static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter, + &grpc_http_filter}; + return grpc_client_channel_transport_setup_complete( + channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), + mdctx); +} + +/* Create a client channel: + Asynchronously: - resolve target + - connect to it (trying alternatives as presented) + - perform handshakes */ +grpc_channel *grpc_channel_create(const char *target, + const grpc_channel_args *args) { + setup *s = gpr_malloc(sizeof(setup)); + grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_channel *channel = NULL; +#define MAX_FILTERS 3 + const grpc_channel_filter *filters[MAX_FILTERS]; + int n = 0; + filters[n++] = &grpc_client_surface_filter; + if (grpc_channel_args_is_census_enabled(args)) { + filters[n++] = &grpc_client_census_filter; + } + filters[n++] = &grpc_client_channel_filter; + GPR_ASSERT(n <= MAX_FILTERS); + channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); + + s->target = gpr_strdup(target); + s->em = grpc_surface_em(); + s->setup_callback = complete_setup; + s->setup_user_data = grpc_channel_get_channel_stack(channel); + + grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel), + args, mdctx, initiate_setup, done_setup, + s, s->em); + + return channel; +} diff --git a/src/core/surface/client.c b/src/core/surface/client.c new file mode 100644 index 0000000000..26abffa817 --- /dev/null +++ b/src/core/surface/client.c @@ -0,0 +1,115 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/client.h" + +#include "src/core/surface/call.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> + +typedef struct { void *unused; } call_data; + +typedef struct { void *unused; } channel_data; + +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + switch (op->type) { + case GRPC_SEND_DEADLINE: + grpc_call_set_deadline(elem, op->data.deadline); + grpc_call_next_op(elem, op); + break; + case GRPC_RECV_METADATA: + grpc_call_recv_metadata(elem, op); + break; + case GRPC_RECV_DEADLINE: + gpr_log(GPR_ERROR, "Deadline received by client (ignored)"); + break; + case GRPC_RECV_MESSAGE: + grpc_call_recv_message(elem, op->data.message, op->done_cb, + op->user_data); + break; + case GRPC_RECV_HALF_CLOSE: + grpc_call_recv_finish(elem, 0); + break; + case GRPC_RECV_FINISH: + grpc_call_recv_finish(elem, 1); + break; + case GRPC_RECV_END_OF_INITIAL_METADATA: + grpc_call_client_initial_metadata_complete(elem); + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_DOWN); + grpc_call_next_op(elem, op); + } +} + +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + switch (op->type) { + case GRPC_ACCEPT_CALL: + gpr_log(GPR_ERROR, "Client cannot accept new calls"); + break; + case GRPC_TRANSPORT_CLOSED: + gpr_log(GPR_ERROR, "Transport closed"); + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_DOWN); + grpc_channel_next_op(elem, op); + } +} + +static void init_call_elem(grpc_call_element *elem, + const void *transport_server_data) {} + +static void destroy_call_elem(grpc_call_element *elem) {} + +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + GPR_ASSERT(is_first); + GPR_ASSERT(!is_last); +} + +static void destroy_channel_elem(grpc_channel_element *elem) { +} + +const grpc_channel_filter grpc_client_surface_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "client", +}; diff --git a/src/core/surface/client.h b/src/core/surface/client.h new file mode 100644 index 0000000000..eb567276e2 --- /dev/null +++ b/src/core/surface/client.h @@ -0,0 +1,41 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_CLIENT_H__ +#define __GRPC_INTERNAL_SURFACE_CLIENT_H__ + +#include "src/core/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_client_surface_filter; + +#endif /* __GRPC_INTERNAL_SURFACE_CLIENT_H__ */ diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c new file mode 100644 index 0000000000..a7d611579f --- /dev/null +++ b/src/core/surface/completion_queue.c @@ -0,0 +1,392 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/completion_queue.h" + +#include <stdio.h> +#include <string.h> + +#include "src/core/eventmanager/em.h" +#include "src/core/surface/call.h" +#include "src/core/surface/event_string.h" +#include "src/core/surface/surface_em.h" +#include "src/core/surface/surface_trace.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> + +#define NUM_TAG_BUCKETS 31 + +/* A single event: extends grpc_event to form a linked list with a destruction + function (on_finish) that is hidden from outside this module */ +typedef struct event { + grpc_event base; + grpc_event_finish_func on_finish; + void *on_finish_user_data; + struct event *queue_next; + struct event *queue_prev; + struct event *bucket_next; + struct event *bucket_prev; +} event; + +/* Completion queue structure */ +struct grpc_completion_queue { + grpc_em *em; + int allow_polling; + + /* When refs drops to zero, we are in shutdown mode, and will be destroyable + once all queued events are drained */ + gpr_refcount refs; + /* 0 initially, 1 once we've begun shutting down */ + int shutdown; + /* Head of a linked list of queued events (prev points to the last element) */ + event *queue; + /* Fixed size chained hash table of events for pluck() */ + event *buckets[NUM_TAG_BUCKETS]; + +#ifndef NDEBUG + /* Debug support: track which operations are in flight at any given time */ + gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE]; +#endif +}; + +/* Default do-nothing on_finish function */ +static void null_on_finish(void *user_data, grpc_op_error error) {} + +grpc_completion_queue *grpc_completion_queue_create() { + grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); + memset(cc, 0, sizeof(*cc)); + /* Initial ref is dropped by grpc_completion_queue_shutdown */ + gpr_ref_init(&cc->refs, 1); + cc->em = grpc_surface_em(); + cc->allow_polling = 1; + return cc; +} + +void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { + cc->allow_polling = 0; +} + +/* Create and append an event to the queue. Returns the event so that its data + members can be filled in. + Requires cc->em->mu locked. */ +static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, + void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data) { + event *ev = gpr_malloc(sizeof(event)); + gpr_intptr bucket = ((gpr_intptr)tag) % NUM_TAG_BUCKETS; + GPR_ASSERT(!cc->shutdown); + ev->base.type = type; + ev->base.tag = tag; + ev->base.call = call; + ev->on_finish = on_finish ? on_finish : null_on_finish; + ev->on_finish_user_data = user_data; + if (cc->queue == NULL) { + cc->queue = ev->queue_next = ev->queue_prev = ev; + } else { + ev->queue_next = cc->queue; + ev->queue_prev = cc->queue->queue_prev; + ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev; + } + if (cc->buckets[bucket] == NULL) { + cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev; + } else { + ev->bucket_next = cc->buckets[bucket]; + ev->bucket_prev = cc->buckets[bucket]->bucket_prev; + ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; + } + gpr_cv_broadcast(&cc->em->cv); + return ev; +} + +void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, + grpc_completion_type type) { + gpr_ref(&cc->refs); + if (call) grpc_call_internal_ref(call); +#ifndef NDEBUG + gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1); +#endif +} + +/* Signal the end of an operation - if this is the last waiting-to-be-queued + event, then enter shutdown mode */ +static void end_op_locked(grpc_completion_queue *cc, + grpc_completion_type type) { +#ifndef NDEBUG + GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0); +#endif + if (gpr_unref(&cc->refs)) { + GPR_ASSERT(!cc->shutdown); + cc->shutdown = 1; + gpr_cv_broadcast(&cc->em->cv); + } +} + +void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_byte_buffer *read) { + event *ev; + gpr_mu_lock(&cc->em->mu); + ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data); + ev->base.data.read = read; + end_op_locked(cc, GRPC_READ); + gpr_mu_unlock(&cc->em->mu); +} + +void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, + grpc_call *call, + grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error) { + event *ev; + gpr_mu_lock(&cc->em->mu); + ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data); + ev->base.data.invoke_accepted = error; + end_op_locked(cc, GRPC_INVOKE_ACCEPTED); + gpr_mu_unlock(&cc->em->mu); +} + +void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, + grpc_call *call, + grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error) { + event *ev; + gpr_mu_lock(&cc->em->mu); + ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data); + ev->base.data.write_accepted = error; + end_op_locked(cc, GRPC_WRITE_ACCEPTED); + gpr_mu_unlock(&cc->em->mu); +} + +void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, + grpc_call *call, + grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error) { + event *ev; + gpr_mu_lock(&cc->em->mu); + ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data); + ev->base.data.finish_accepted = error; + end_op_locked(cc, GRPC_FINISH_ACCEPTED); + gpr_mu_unlock(&cc->em->mu); +} + +void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, + grpc_call *call, + grpc_event_finish_func on_finish, + void *user_data, size_t count, + grpc_metadata *elements) { + event *ev; + gpr_mu_lock(&cc->em->mu); + ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish, + user_data); + ev->base.data.client_metadata_read.count = count; + ev->base.data.client_metadata_read.elements = elements; + end_op_locked(cc, GRPC_CLIENT_METADATA_READ); + gpr_mu_unlock(&cc->em->mu); +} + +void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_status status) { + event *ev; + gpr_mu_lock(&cc->em->mu); + ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data); + ev->base.data.finished = status; + end_op_locked(cc, GRPC_FINISHED); + gpr_mu_unlock(&cc->em->mu); +} + +void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + const char *method, const char *host, + gpr_timespec deadline, size_t metadata_count, + grpc_metadata *metadata_elements) { + event *ev; + gpr_mu_lock(&cc->em->mu); + ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data); + ev->base.data.server_rpc_new.method = method; + ev->base.data.server_rpc_new.host = host; + ev->base.data.server_rpc_new.deadline = deadline; + ev->base.data.server_rpc_new.metadata_count = metadata_count; + ev->base.data.server_rpc_new.metadata_elements = metadata_elements; + end_op_locked(cc, GRPC_SERVER_RPC_NEW); + gpr_mu_unlock(&cc->em->mu); +} + +/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ +static event *create_shutdown_event() { + event *ev = gpr_malloc(sizeof(event)); + ev->base.type = GRPC_QUEUE_SHUTDOWN; + ev->base.call = NULL; + ev->base.tag = NULL; + ev->on_finish = null_on_finish; + return ev; +} + +grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, + gpr_timespec deadline) { + event *ev = NULL; + + gpr_mu_lock(&cc->em->mu); + for (;;) { + if (cc->queue != NULL) { + gpr_intptr bucket; + ev = cc->queue; + bucket = ((gpr_intptr)ev->base.tag) % NUM_TAG_BUCKETS; + cc->queue = ev->queue_next; + ev->queue_next->queue_prev = ev->queue_prev; + ev->queue_prev->queue_next = ev->queue_next; + ev->bucket_next->bucket_prev = ev->bucket_prev; + ev->bucket_prev->bucket_next = ev->bucket_next; + if (ev == cc->buckets[bucket]) { + cc->buckets[bucket] = ev->bucket_next; + if (ev == cc->buckets[bucket]) { + cc->buckets[bucket] = NULL; + } + } + if (cc->queue == ev) { + cc->queue = NULL; + } + break; + } + if (cc->shutdown) { + ev = create_shutdown_event(); + break; + } + if (cc->allow_polling && grpc_em_work(cc->em, deadline)) { + continue; + } + if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) { + gpr_mu_unlock(&cc->em->mu); + return NULL; + } + } + gpr_mu_unlock(&cc->em->mu); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); + return &ev->base; +} + +static event *pluck_event(grpc_completion_queue *cc, void *tag) { + gpr_intptr bucket = ((gpr_intptr)tag) % NUM_TAG_BUCKETS; + event *ev = cc->buckets[bucket]; + if (ev == NULL) return NULL; + do { + if (ev->base.tag == tag) { + ev->queue_next->queue_prev = ev->queue_prev; + ev->queue_prev->queue_next = ev->queue_next; + ev->bucket_next->bucket_prev = ev->bucket_prev; + ev->bucket_prev->bucket_next = ev->bucket_next; + if (ev == cc->buckets[bucket]) { + cc->buckets[bucket] = ev->bucket_next; + if (ev == cc->buckets[bucket]) { + cc->buckets[bucket] = NULL; + } + } + if (cc->queue == ev) { + cc->queue = ev->queue_next; + if (cc->queue == ev) { + cc->queue = NULL; + } + } + return ev; + } + ev = ev->bucket_next; + } while (ev != cc->buckets[bucket]); + return NULL; +} + +grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline) { + event *ev = NULL; + + gpr_mu_lock(&cc->em->mu); + for (;;) { + if ((ev = pluck_event(cc, tag))) { + break; + } + if (cc->shutdown) { + ev = create_shutdown_event(); + break; + } + if (cc->allow_polling && grpc_em_work(cc->em, deadline)) { + continue; + } + if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) { + gpr_mu_unlock(&cc->em->mu); + return NULL; + } + } + gpr_mu_unlock(&cc->em->mu); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); + return &ev->base; +} + +/* Shutdown simply drops a ref that we reserved at creation time; if we drop + to zero here, then enter shutdown mode and wake up any waiters */ +void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { + if (gpr_unref(&cc->refs)) { + gpr_mu_lock(&cc->em->mu); + GPR_ASSERT(!cc->shutdown); + cc->shutdown = 1; + gpr_cv_broadcast(&cc->em->cv); + gpr_mu_unlock(&cc->em->mu); + } +} + +void grpc_completion_queue_destroy(grpc_completion_queue *cc) { + GPR_ASSERT(cc->queue == NULL); + gpr_free(cc); +} + +void grpc_event_finish(grpc_event *base) { + event *ev = (event *)base; + ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK); + if (ev->base.call) { + grpc_call_internal_unref(ev->base.call); + } + gpr_free(ev); +} + +void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) { +#ifndef NDEBUG + char tmp[256]; + char *p = tmp; + int i; + + for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) { + p += sprintf(p, " %d", (int)cc->pending_op_count[i]); + } + + gpr_log(GPR_INFO, "pending ops:%s", tmp); +#endif +} diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h new file mode 100644 index 0000000000..0fe576588a --- /dev/null +++ b/src/core/surface/completion_queue.h @@ -0,0 +1,102 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__ +#define __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__ + +/* Internal API for completion channels */ + +#include <grpc/grpc.h> + +/* A finish func is executed whenever the event consumer calls + grpc_event_finish */ +typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error); + +/* Flag that an operation is beginning: the completion channel will not finish + shutdown until a corrensponding grpc_cq_end_* call is made */ +void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, + grpc_completion_type type); + +/* grpc_cq_end_* functions pair with a grpc_cq_begin_op + + grpc_cq_end_* common arguments: + cc - the completion channel to queue on + tag - the user supplied operation tag + on_finish - grpc_event_finish_func that is called during grpc_event_finish + can be NULL to not get a callback + user_data - user_data parameter to be passed to on_finish + + Other parameters match the data member of grpc_event */ + +/* Queue a GRPC_READ operation */ +void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_byte_buffer *read); +/* Queue a GRPC_INVOKE_ACCEPTED operation */ +void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, + grpc_call *call, + grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error); +/* Queue a GRPC_WRITE_ACCEPTED operation */ +void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, + grpc_call *call, + grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error); +/* Queue a GRPC_FINISH_ACCEPTED operation */ +void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, + grpc_call *call, + grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error); +/* Queue a GRPC_CLIENT_METADATA_READ operation */ +void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, + grpc_call *call, + grpc_event_finish_func on_finish, + void *user_data, size_t count, + grpc_metadata *elements); + +void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_status status); + +void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + const char *method, const char *host, + gpr_timespec deadline, size_t metadata_count, + grpc_metadata *metadata_elements); + +/* disable polling for some tests */ +void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc); + +void grpc_cq_dump_pending_ops(grpc_completion_queue *cc); + +#endif /* __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__ */ diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c new file mode 100644 index 0000000000..0a6a81d18e --- /dev/null +++ b/src/core/surface/event_string.c @@ -0,0 +1,119 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/event_string.h" + +#include <stdio.h> + +#include <grpc/support/string.h> +#include <grpc/byte_buffer.h> + +static size_t addhdr(char *p, grpc_event *ev) { + return sprintf(p, "tag:%p call:%p", ev->tag, (void *)ev->call); +} + +static const char *errstr(grpc_op_error err) { + switch (err) { + case GRPC_OP_OK: + return "OK"; + case GRPC_OP_ERROR: + return "ERROR"; + } + return "UNKNOWN_UNKNOWN"; +} + +static size_t adderr(char *p, grpc_op_error err) { + return sprintf(p, " err=%s", errstr(err)); +} + +char *grpc_event_string(grpc_event *ev) { + char buffer[1024]; + char *p = buffer; + + if (ev == NULL) return gpr_strdup("null"); + + switch (ev->type) { + case GRPC_QUEUE_SHUTDOWN: + p += sprintf(p, "QUEUE_SHUTDOWN"); + break; + case GRPC_READ: + p += sprintf(p, "READ: "); + p += addhdr(p, ev); + if (ev->data.read) { + p += sprintf(p, " %d bytes", + (int)grpc_byte_buffer_length(ev->data.read)); + } else { + p += sprintf(p, " end-of-stream"); + } + break; + case GRPC_INVOKE_ACCEPTED: + p += sprintf(p, "INVOKE_ACCEPTED: "); + p += addhdr(p, ev); + p += adderr(p, ev->data.invoke_accepted); + break; + case GRPC_WRITE_ACCEPTED: + p += sprintf(p, "WRITE_ACCEPTED: "); + p += addhdr(p, ev); + p += adderr(p, ev->data.write_accepted); + break; + case GRPC_FINISH_ACCEPTED: + p += sprintf(p, "FINISH_ACCEPTED: "); + p += addhdr(p, ev); + p += adderr(p, ev->data.write_accepted); + break; + case GRPC_CLIENT_METADATA_READ: + p += sprintf(p, "CLIENT_METADATA_READ: "); + p += addhdr(p, ev); + p += sprintf(p, " %d elements", (int)ev->data.client_metadata_read.count); + break; + case GRPC_FINISHED: + p += sprintf(p, "FINISHED: "); + p += addhdr(p, ev); + p += sprintf(p, " status_code=%d details='%s'", ev->data.finished.code, + ev->data.finished.details); + break; + case GRPC_SERVER_RPC_NEW: + p += sprintf(p, "SERVER_RPC_NEW: "); + p += addhdr(p, ev); + p += sprintf(p, " method='%s' host='%s' %d metadata elements", + ev->data.server_rpc_new.method, ev->data.server_rpc_new.host, + (int)ev->data.server_rpc_new.metadata_count); + break; + case GRPC_COMPLETION_DO_NOT_USE: + p += sprintf(p, "DO_NOT_USE (this is a bug)"); + p += addhdr(p, ev); + break; + } + + return gpr_strdup(buffer); +} diff --git a/src/core/surface/event_string.h b/src/core/surface/event_string.h new file mode 100644 index 0000000000..30b693e95c --- /dev/null +++ b/src/core/surface/event_string.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_EVENT_STRING_H__ +#define __GRPC_INTERNAL_SURFACE_EVENT_STRING_H__ + +#include <grpc/grpc.h> + +/* Returns a string describing an event. Must be later freed with gpr_free() */ +char *grpc_event_string(grpc_event *ev); + +#endif /* __GRPC_INTERNAL_SURFACE_EVENT_STRING_H__ */ diff --git a/src/core/surface/init.c b/src/core/surface/init.c new file mode 100644 index 0000000000..92c0ac880d --- /dev/null +++ b/src/core/surface/init.c @@ -0,0 +1,46 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> +#include "src/core/statistics/census_interface.h" +#include "src/core/surface/surface_em.h" + +void grpc_init() { + grpc_surface_em_init(); + census_init(); +} + +void grpc_shutdown() { + grpc_surface_em_shutdown(); + census_shutdown(); +} diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c new file mode 100644 index 0000000000..18921c44dd --- /dev/null +++ b/src/core/surface/lame_client.c @@ -0,0 +1,94 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/lame_client.h" + +#include "src/core/channel/channel_stack.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/call.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> + +typedef struct { void *unused; } call_data; + +typedef struct { void *unused; } channel_data; + +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + switch (op->type) { + case GRPC_SEND_START: + grpc_call_recv_finish(elem, 1); + break; + case GRPC_SEND_METADATA: + grpc_mdelem_unref(op->data.metadata); + break; + default: + break; + } + + op->done_cb(op->user_data, GRPC_OP_ERROR); +} + +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {} + +static void init_call_elem(grpc_call_element *elem, + const void *transport_server_data) {} + +static void destroy_call_elem(grpc_call_element *elem) {} + +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + GPR_ASSERT(is_first); + GPR_ASSERT(is_last); +} + +static void destroy_channel_elem(grpc_channel_element *elem) {} + +static const grpc_channel_filter lame_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "lame-client", +}; + +grpc_channel *grpc_lame_client_channel_create() { + static const grpc_channel_filter *filters[] = {&lame_filter}; + return grpc_channel_create_from_filters(filters, 1, NULL, grpc_mdctx_create(), + 1); +} diff --git a/src/core/surface/lame_client.h b/src/core/surface/lame_client.h new file mode 100644 index 0000000000..74b9707202 --- /dev/null +++ b/src/core/surface/lame_client.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_LAME_CLIENT_H_ +#define __GRPC_INTERNAL_SURFACE_LAME_CLIENT_H_ + +#include <grpc/grpc.h> + +/* Create a lame client: this client fails every operation attempted on it. */ +grpc_channel *grpc_lame_client_channel_create(); + +#endif /* __GRPC_INTERNAL_SURFACE_LAME_CLIENT_H_ */ diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c new file mode 100644 index 0000000000..f330b83521 --- /dev/null +++ b/src/core/surface/secure_channel_create.c @@ -0,0 +1,243 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> + +#include <stdlib.h> +#include <string.h> + +#include "src/core/channel/census_filter.h" +#include "src/core/channel/channel_args.h" +#include "src/core/channel/client_channel.h" +#include "src/core/channel/client_setup.h" +#include "src/core/channel/connected_channel.h" +#include "src/core/channel/http_client_filter.h" +#include "src/core/channel/http_filter.h" +#include "src/core/endpoint/resolve_address.h" +#include "src/core/endpoint/tcp.h" +#include "src/core/endpoint/tcp_client.h" +#include "src/core/security/auth.h" +#include "src/core/security/security_context.h" +#include "src/core/security/secure_transport_setup.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/client.h" +#include "src/core/surface/surface_em.h" +#include "src/core/transport/chttp2_transport.h" +#include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> +#include "src/core/tsi/transport_security_interface.h" + +typedef struct setup setup; + +/* A single setup request (started via initiate) */ +typedef struct { + grpc_client_setup_request *cs_request; + setup *setup; + /* Resolved addresses, or null if resolution not yet completed. */ + grpc_resolved_addresses *resolved; + /* which address in resolved should we pick for the next connection attempt */ + size_t resolved_index; +} request; + +struct setup { + grpc_channel_security_context *security_context; + const char *target; + grpc_transport_setup_callback setup_callback; + void *setup_user_data; + grpc_em *em; +}; + +static int maybe_try_next_resolved(request *r); + +static void done(request *r, int was_successful) { + grpc_client_setup_request_finish(r->cs_request, was_successful); + if (r->resolved) { + grpc_resolved_addresses_destroy(r->resolved); + } + gpr_free(r); +} + +static void on_secure_transport_setup_done(void *rp, + grpc_security_status status, + grpc_endpoint *secure_endpoint) { + request *r = rp; + if (status != GRPC_SECURITY_OK) { + gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); + done(r, 0); + } else { + grpc_create_chttp2_transport( + r->setup->setup_callback, r->setup->setup_user_data, + grpc_client_setup_get_channel_args(r->cs_request), secure_endpoint, + NULL, 0, grpc_client_setup_get_mdctx(r->cs_request), 1); + done(r, 1); + } +} + +/* connection callback: tcp is either valid, or null on error */ +static void on_connect(void *rp, grpc_endpoint *tcp) { + request *r = rp; + + if (!grpc_client_setup_request_should_continue(r->cs_request)) { + if (tcp) { + grpc_endpoint_shutdown(tcp); + grpc_endpoint_destroy(tcp); + } + done(r, 0); + return; + } + + if (!tcp) { + if (!maybe_try_next_resolved(r)) { + done(r, 0); + return; + } else { + return; + } + } else { + grpc_setup_secure_transport(&r->setup->security_context->base, tcp, + on_secure_transport_setup_done, r); + } +} + +/* attempt to connect to the next available resolved address */ +static int maybe_try_next_resolved(request *r) { + grpc_resolved_address *addr; + if (!r->resolved) return 0; + if (r->resolved_index == r->resolved->naddrs) return 0; + addr = &r->resolved->addrs[r->resolved_index++]; + grpc_tcp_client_connect(on_connect, r, r->setup->em, + (struct sockaddr *)&addr->addr, addr->len, + grpc_client_setup_request_deadline(r->cs_request)); + return 1; +} + +/* callback for when our target address has been resolved */ +static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { + request *r = rp; + + /* if we're not still the active request, abort */ + if (!grpc_client_setup_request_should_continue(r->cs_request)) { + if (resolved) { + grpc_resolved_addresses_destroy(resolved); + } + done(r, 0); + return; + } + + if (!resolved) { + done(r, 0); + return; + } else { + r->resolved = resolved; + r->resolved_index = 0; + if (!maybe_try_next_resolved(r)) { + done(r, 0); + } + } +} + +static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) { + request *r = gpr_malloc(sizeof(request)); + r->setup = sp; + r->cs_request = cs_request; + r->resolved = NULL; + r->resolved_index = 0; + /* TODO(klempner): Make grpc_resolve_address respect deadline */ + grpc_resolve_address(r->setup->target, "https", on_resolved, r); +} + +static void done_setup(void *sp) { + setup *s = sp; + gpr_free((void *)s->target); + grpc_security_context_unref(&s->security_context->base); + gpr_free(s); +} + +static grpc_transport_setup_result complete_setup(void *channel_stack, + grpc_transport *transport, + grpc_mdctx *mdctx) { + static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter, + &grpc_http_filter}; + return grpc_client_channel_transport_setup_complete( + channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), + mdctx); +} + +/* Create a secure client channel: + Asynchronously: - resolve target + - connect to it (trying alternatives as presented) + - perform handshakes */ +grpc_channel *grpc_secure_channel_create_internal( + const char *target, const grpc_channel_args *args, + grpc_channel_security_context *context) { + setup *s; + grpc_channel *channel; + grpc_arg context_arg; + grpc_channel_args *args_copy; + grpc_mdctx *mdctx = grpc_mdctx_create(); +#define MAX_FILTERS 4 + const grpc_channel_filter *filters[MAX_FILTERS]; + int n = 0; + if (grpc_find_security_context_in_args(args) != NULL) { + gpr_log(GPR_ERROR, "Cannot set security context in channel args."); + } + + s = gpr_malloc(sizeof(setup)); + context_arg = grpc_security_context_to_arg(&context->base); + args_copy = grpc_channel_args_copy_and_add(args, &context_arg); + filters[n++] = &grpc_client_surface_filter; + if (grpc_channel_args_is_census_enabled(args)) { + filters[n++] = &grpc_client_census_filter; + } + filters[n++] = &grpc_client_auth_filter; + filters[n++] = &grpc_client_channel_filter; + GPR_ASSERT(n <= MAX_FILTERS); + channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); + grpc_channel_args_destroy(args_copy); + + s->target = gpr_strdup(target); + s->em = grpc_surface_em(); + s->setup_callback = complete_setup; + s->setup_user_data = grpc_channel_get_channel_stack(channel); + s->security_context = + (grpc_channel_security_context *)grpc_security_context_ref( + &context->base); + grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel), + args, mdctx, initiate_setup, done_setup, + s, s->em); + return channel; +} diff --git a/src/core/surface/secure_server_create.c b/src/core/surface/secure_server_create.c new file mode 100644 index 0000000000..bf0f62367f --- /dev/null +++ b/src/core/surface/secure_server_create.c @@ -0,0 +1,57 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> + +#include "src/core/channel/channel_args.h" +#include "src/core/security/security_context.h" +#include "src/core/surface/completion_queue.h" +#include "src/core/surface/server.h" +#include <grpc/support/log.h> + +grpc_server *grpc_secure_server_create_internal( + grpc_completion_queue *cq, const grpc_channel_args *args, + grpc_security_context *context) { + grpc_arg context_arg; + grpc_channel_args *args_copy; + grpc_server *server; + if (grpc_find_security_context_in_args(args) != NULL) { + gpr_log(GPR_ERROR, "Cannot set security context in channel args."); + } + + context_arg = grpc_security_context_to_arg(context); + args_copy = grpc_channel_args_copy_and_add(args, &context_arg); + server = grpc_server_create_from_filters(cq, NULL, 0, args_copy); + grpc_channel_args_destroy(args_copy); + return server; +} diff --git a/src/core/surface/server.c b/src/core/surface/server.c new file mode 100644 index 0000000000..99d66ffb2d --- /dev/null +++ b/src/core/surface/server.c @@ -0,0 +1,609 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/server.h" + +#include <stdlib.h> +#include <string.h> + +#include "src/core/channel/census_filter.h" +#include "src/core/channel/channel_args.h" +#include "src/core/channel/connected_channel.h" +#include "src/core/surface/call.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/completion_queue.h" +#include "src/core/surface/surface_em.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include <grpc/support/useful.h> + +typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; + +typedef struct listener { + void *arg; + void (*start)(grpc_server *server, void *arg); + void (*destroy)(grpc_server *server, void *arg); + struct listener *next; +} listener; + +typedef struct call_data call_data; +typedef struct channel_data channel_data; + +struct channel_data { + grpc_server *server; + grpc_channel *channel; + /* linked list of all channels on a server */ + channel_data *next; + channel_data *prev; +}; + +struct grpc_server { + size_t channel_filter_count; + const grpc_channel_filter **channel_filters; + grpc_channel_args *channel_args; + grpc_completion_queue *cq; + grpc_em *em; + + gpr_mu mu; + + void **tags; + size_t ntags; + size_t tag_cap; + + gpr_uint8 shutdown; + + call_data *lists[CALL_LIST_COUNT]; + channel_data root_channel_data; + + listener *listeners; + gpr_refcount internal_refcount; +}; + +typedef struct { + call_data *next; + call_data *prev; +} call_link; + +typedef enum { + /* waiting for metadata */ + NOT_STARTED, + /* inital metadata read, not flow controlled in yet */ + PENDING, + /* flow controlled in, on completion queue */ + ACTIVATED, + /* cancelled before being queued */ + ZOMBIED +} call_state; + +struct call_data { + grpc_call *call; + + call_state state; + gpr_timespec deadline; + + gpr_uint8 included[CALL_LIST_COUNT]; + call_link links[CALL_LIST_COUNT]; +}; + +#define SERVER_FROM_CALL_ELEM(elem) \ + (((channel_data *)(elem)->channel_data)->server) + +static void do_nothing(void *unused, grpc_op_error ignored) {} + +static int call_list_join(grpc_server *server, call_data *call, + call_list list) { + if (call->included[list]) return 0; + call->included[list] = 1; + if (!server->lists[list]) { + server->lists[list] = call; + call->links[list].next = call->links[list].prev = call; + } else { + call->links[list].next = server->lists[list]; + call->links[list].prev = server->lists[list]->links[list].prev; + call->links[list].next->links[list].prev = + call->links[list].prev->links[list].next = call; + } + return 1; +} + +static call_data *call_list_remove_head(grpc_server *server, call_list list) { + call_data *out = server->lists[list]; + if (out) { + out->included[list] = 0; + if (out->links[list].next == out) { + server->lists[list] = NULL; + } else { + server->lists[list] = out->links[list].next; + out->links[list].next->links[list].prev = out->links[list].prev; + out->links[list].prev->links[list].next = out->links[list].next; + } + } + return out; +} + +static int call_list_remove(grpc_server *server, call_data *call, + call_list list) { + if (!call->included[list]) return 0; + call->included[list] = 0; + if (server->lists[list] == call) { + server->lists[list] = call->links[list].next; + if (server->lists[list] == call) { + server->lists[list] = NULL; + return 1; + } + } + GPR_ASSERT(server->lists[list] != call); + call->links[list].next->links[list].prev = call->links[list].prev; + call->links[list].prev->links[list].next = call->links[list].next; + return 1; +} + +static void server_ref(grpc_server *server) { + gpr_ref(&server->internal_refcount); +} + +static void server_unref(grpc_server *server) { + if (gpr_unref(&server->internal_refcount)) { + grpc_channel_args_destroy(server->channel_args); + gpr_mu_destroy(&server->mu); + gpr_free(server->channel_filters); + gpr_free(server->tags); + gpr_free(server); + } +} + +static int is_channel_orphaned(channel_data *chand) { + return chand->next == chand; +} + +static void orphan_channel(channel_data *chand) { + chand->next->prev = chand->prev; + chand->prev->next = chand->next; + chand->next = chand->prev = chand; +} + +static void finish_destroy_channel(void *cd, grpc_em_cb_status status) { + channel_data *chand = cd; + grpc_server *server = chand->server; + /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ + grpc_channel_destroy(chand->channel); + server_unref(server); +} + +static void destroy_channel(channel_data *chand) { + if (is_channel_orphaned(chand)) return; + GPR_ASSERT(chand->server != NULL); + orphan_channel(chand); + server_ref(chand->server); + grpc_em_add_callback(chand->server->em, finish_destroy_channel, chand); +} + +static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) { + grpc_call *call = calld->call; + grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call); + size_t count = grpc_metadata_buffer_count(mdbuf); + grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf); + const char *host = NULL; + const char *method = NULL; + size_t i; + grpc_metadata status_md; + + for (i = 0; i < count; i++) { + if (0 == strcmp(elements[i].key, ":authority")) { + host = elements[i].value; + } else if (0 == strcmp(elements[i].key, ":path")) { + method = elements[i].value; + } + } + + status_md.key = ":status"; + status_md.value = "200"; + status_md.value_length = 3; + grpc_call_add_metadata(call, &status_md, GRPC_WRITE_BUFFER_HINT); + + grpc_call_internal_ref(call); + grpc_cq_end_new_rpc(server->cq, tag, call, + grpc_metadata_buffer_cleanup_elements, elements, method, + host, calld->deadline, count, elements); +} + +static void start_new_rpc(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + grpc_server *server = chand->server; + + gpr_mu_lock(&server->mu); + if (server->ntags) { + calld->state = ACTIVATED; + queue_new_rpc(server, calld, server->tags[--server->ntags]); + } else { + calld->state = PENDING; + call_list_join(server, calld, PENDING_START); + } + gpr_mu_unlock(&server->mu); +} + +static void kill_zombie(void *elem, grpc_em_cb_status status) { + grpc_call_destroy(grpc_call_from_top_element(elem)); +} + +static void finish_rpc(grpc_call_element *elem, int is_full_close) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->server->mu); + switch (calld->state) { + case ACTIVATED: + grpc_call_recv_finish(elem, is_full_close); + break; + case PENDING: + if (!is_full_close) { + grpc_call_recv_finish(elem, is_full_close); + break; + } + call_list_remove(chand->server, calld, PENDING_START); + /* fallthrough intended */ + case NOT_STARTED: + calld->state = ZOMBIED; + grpc_em_add_callback(chand->server->em, kill_zombie, elem); + break; + case ZOMBIED: + break; + } + gpr_mu_unlock(&chand->server->mu); +} + +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + switch (op->type) { + case GRPC_RECV_METADATA: + grpc_call_recv_metadata(elem, op); + break; + case GRPC_RECV_END_OF_INITIAL_METADATA: + start_new_rpc(elem); + break; + case GRPC_RECV_MESSAGE: + grpc_call_recv_message(elem, op->data.message, op->done_cb, + op->user_data); + break; + case GRPC_RECV_HALF_CLOSE: + finish_rpc(elem, 0); + break; + case GRPC_RECV_FINISH: + finish_rpc(elem, 1); + break; + case GRPC_RECV_DEADLINE: + grpc_call_set_deadline(elem, op->data.deadline); + ((call_data *)elem->call_data)->deadline = op->data.deadline; + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_DOWN); + grpc_call_next_op(elem, op); + break; + } +} + +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + channel_data *chand = elem->channel_data; + + switch (op->type) { + case GRPC_ACCEPT_CALL: + /* create a call */ + grpc_call_create(chand->channel, + op->data.accept_call.transport_server_data); + break; + case GRPC_TRANSPORT_CLOSED: + /* if the transport is closed for a server channel, we destroy the + channel */ + gpr_mu_lock(&chand->server->mu); + server_ref(chand->server); + destroy_channel(chand); + gpr_mu_unlock(&chand->server->mu); + server_unref(chand->server); + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_DOWN); + grpc_channel_next_op(elem, op); + break; + } +} + +static void finish_shutdown_channel(void *cd, grpc_em_cb_status status) { + channel_data *chand = cd; + grpc_channel_op op; + op.type = GRPC_CHANNEL_SHUTDOWN; + op.dir = GRPC_CALL_DOWN; + channel_op(grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + &op); + grpc_channel_internal_unref(chand->channel); +} + +static void shutdown_channel(channel_data *chand) { + grpc_channel_internal_ref(chand->channel); + grpc_em_add_callback(chand->server->em, finish_shutdown_channel, chand); +} + +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + memset(calld, 0, sizeof(call_data)); + calld->deadline = gpr_inf_future; + calld->call = grpc_call_from_top_element(elem); + + gpr_mu_lock(&chand->server->mu); + call_list_join(chand->server, calld, ALL_CALLS); + gpr_mu_unlock(&chand->server->mu); + + server_ref(chand->server); +} + +static void destroy_call_elem(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + int i; + + gpr_mu_lock(&chand->server->mu); + for (i = 0; i < CALL_LIST_COUNT; i++) { + call_list_remove(chand->server, elem->call_data, i); + } + gpr_mu_unlock(&chand->server->mu); + + server_unref(chand->server); +} + +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last) { + channel_data *chand = elem->channel_data; + GPR_ASSERT(is_first); + GPR_ASSERT(!is_last); + chand->server = NULL; + chand->channel = NULL; + chand->next = chand->prev = chand; +} + +static void destroy_channel_elem(grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + if (chand->server) { + gpr_mu_lock(&chand->server->mu); + chand->next->prev = chand->prev; + chand->prev->next = chand->next; + chand->next = chand->prev = chand; + gpr_mu_unlock(&chand->server->mu); + server_unref(chand->server); + } +} + +static const grpc_channel_filter server_surface_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "server", +}; + +static void early_terminate_requested_calls(grpc_completion_queue *cq, + void **tags, size_t ntags) { + size_t i; + + for (i = 0; i < ntags; i++) { + grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL, + gpr_inf_past, 0, NULL); + } +} + +grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, + grpc_channel_filter **filters, + size_t filter_count, + const grpc_channel_args *args) { + size_t i; + int census_enabled = grpc_channel_args_is_census_enabled(args); + + grpc_server *server = gpr_malloc(sizeof(grpc_server)); + memset(server, 0, sizeof(grpc_server)); + + gpr_mu_init(&server->mu); + + server->cq = cq; + server->em = grpc_surface_em(); + /* decremented by grpc_server_destroy */ + gpr_ref_init(&server->internal_refcount, 1); + server->root_channel_data.next = server->root_channel_data.prev = + &server->root_channel_data; + + /* Server filter stack is: + + server_surface_filter - for making surface API calls + grpc_server_census_filter (optional) - for stats collection and tracing + {passed in filter stack} + grpc_connected_channel_filter - for interfacing with transports */ + server->channel_filter_count = filter_count + 1 + census_enabled; + server->channel_filters = + gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *)); + server->channel_filters[0] = &server_surface_filter; + if (census_enabled) { + server->channel_filters[1] = &grpc_server_census_filter; + } + for (i = 0; i < filter_count; i++) { + server->channel_filters[i + 1 + census_enabled] = filters[i]; + } + + server->channel_args = grpc_channel_args_copy(args); + + return server; +} + +void grpc_server_start(grpc_server *server) { + listener *l; + + for (l = server->listeners; l; l = l->next) { + l->start(server, l->arg); + } +} + +grpc_transport_setup_result grpc_server_setup_transport( + grpc_server *s, grpc_transport *transport, + grpc_channel_filter const **extra_filters, size_t num_extra_filters, + grpc_mdctx *mdctx) { + size_t num_filters = s->channel_filter_count + num_extra_filters + 1; + grpc_channel_filter const **filters = + gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); + size_t i; + grpc_channel *channel; + channel_data *chand; + + for (i = 0; i < s->channel_filter_count; i++) { + filters[i] = s->channel_filters[i]; + } + for (; i < s->channel_filter_count + num_extra_filters; i++) { + filters[i] = extra_filters[i - s->channel_filter_count]; + } + filters[i] = &grpc_connected_channel_filter; + + channel = grpc_channel_create_from_filters(filters, num_filters, + s->channel_args, mdctx, 0); + chand = (channel_data *)grpc_channel_stack_element( + grpc_channel_get_channel_stack(channel), 0)->channel_data; + chand->server = s; + server_ref(s); + chand->channel = channel; + + gpr_mu_lock(&s->mu); + chand->next = &s->root_channel_data; + chand->prev = chand->next->prev; + chand->next->prev = chand->prev->next = chand; + gpr_mu_unlock(&s->mu); + + gpr_free(filters); + + return grpc_connected_channel_bind_transport( + grpc_channel_get_channel_stack(channel), transport); +} + +void grpc_server_shutdown(grpc_server *server) { + /* TODO(ctiller): send goaway, etc */ + listener *l; + void **tags; + size_t ntags; + + /* lock, and gather up some stuff to do */ + gpr_mu_lock(&server->mu); + if (server->shutdown) { + gpr_mu_unlock(&server->mu); + return; + } + + tags = server->tags; + ntags = server->ntags; + server->tags = NULL; + server->ntags = 0; + + server->shutdown = 1; + gpr_mu_unlock(&server->mu); + + /* terminate all the requested calls */ + early_terminate_requested_calls(server->cq, tags, ntags); + gpr_free(tags); + + /* Shutdown listeners */ + for (l = server->listeners; l; l = l->next) { + l->destroy(server, l->arg); + } + while (server->listeners) { + l = server->listeners; + server->listeners = l->next; + gpr_free(l); + } +} + +void grpc_server_destroy(grpc_server *server) { + channel_data *c; + gpr_mu_lock(&server->mu); + for (c = server->root_channel_data.next; c != &server->root_channel_data; + c = c->next) { + shutdown_channel(c); + } + gpr_mu_unlock(&server->mu); + + server_unref(server); +} + +void grpc_server_add_listener(grpc_server *server, void *arg, + void (*start)(grpc_server *server, void *arg), + void (*destroy)(grpc_server *server, void *arg)) { + listener *l = gpr_malloc(sizeof(listener)); + l->arg = arg; + l->start = start; + l->destroy = destroy; + l->next = server->listeners; + server->listeners = l; +} + +grpc_call_error grpc_server_request_call(grpc_server *server, void *tag_new) { + call_data *calld; + + grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); + + gpr_mu_lock(&server->mu); + + if (server->shutdown) { + gpr_mu_unlock(&server->mu); + early_terminate_requested_calls(server->cq, &tag_new, 1); + return GRPC_CALL_OK; + } + + calld = call_list_remove_head(server, PENDING_START); + if (calld) { + GPR_ASSERT(calld->state == PENDING); + calld->state = ACTIVATED; + queue_new_rpc(server, calld, tag_new); + } else { + if (server->tag_cap == server->ntags) { + server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1); + server->tags = + gpr_realloc(server->tags, sizeof(void *) * server->tag_cap); + } + server->tags[server->ntags++] = tag_new; + } + gpr_mu_unlock(&server->mu); + + return GRPC_CALL_OK; +} + +const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { + return server->channel_args; +} diff --git a/src/core/surface/server.h b/src/core/surface/server.h new file mode 100644 index 0000000000..f0773ab9d5 --- /dev/null +++ b/src/core/surface/server.h @@ -0,0 +1,62 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_SERVER_H__ +#define __GRPC_INTERNAL_SURFACE_SERVER_H__ + +#include "src/core/channel/channel_stack.h" +#include <grpc/grpc.h> +#include "src/core/transport/transport.h" + +/* Create a server */ +grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, + grpc_channel_filter **filters, + size_t filter_count, + const grpc_channel_args *args); + +/* Add a listener to the server: when the server starts, it will call start, + and when it shuts down, it will call destroy */ +void grpc_server_add_listener(grpc_server *server, void *listener, + void (*start)(grpc_server *server, void *arg), + void (*destroy)(grpc_server *server, void *arg)); + +/* Setup a transport - creates a channel stack, binds the transport to the + server */ +grpc_transport_setup_result grpc_server_setup_transport( + grpc_server *server, grpc_transport *transport, + grpc_channel_filter const **extra_filters, size_t num_extra_filters, + grpc_mdctx *mdctx); + +const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); + +#endif /* __GRPC_INTERNAL_SURFACE_SERVER_H__ */ diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c new file mode 100644 index 0000000000..24c5757166 --- /dev/null +++ b/src/core/surface/server_chttp2.c @@ -0,0 +1,123 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> + +#include "src/core/channel/http_filter.h" +#include "src/core/channel/http_server_filter.h" +#include "src/core/endpoint/resolve_address.h" +#include "src/core/endpoint/tcp_server.h" +#include "src/core/surface/server.h" +#include "src/core/surface/surface_em.h" +#include "src/core/transport/chttp2_transport.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +static grpc_transport_setup_result setup_transport(void *server, + grpc_transport *transport, + grpc_mdctx *mdctx) { + static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter, + &grpc_http_filter}; + return grpc_server_setup_transport(server, transport, extra_filters, + GPR_ARRAY_SIZE(extra_filters), mdctx); +} + +static void new_transport(void *server, grpc_endpoint *tcp) { + grpc_create_chttp2_transport(setup_transport, server, + grpc_server_get_channel_args(server), tcp, NULL, + 0, grpc_mdctx_create(), 0); +} + +/* Server callback: start listening on our ports */ +static void start(grpc_server *server, void *tcpp) { + grpc_tcp_server *tcp = tcpp; + grpc_tcp_server_start(tcp, new_transport, server); +} + +/* Server callback: destroy the tcp listener (so we don't generate further + callbacks) */ +static void destroy(grpc_server *server, void *tcpp) { + grpc_tcp_server *tcp = tcpp; + grpc_tcp_server_destroy(tcp); +} + +int grpc_server_add_http2_port(grpc_server *server, const char *addr) { + grpc_resolved_addresses *resolved = NULL; + grpc_tcp_server *tcp = NULL; + size_t i; + int count = 0; + + resolved = grpc_blocking_resolve_address(addr, "http"); + if (!resolved) { + goto error; + } + + tcp = grpc_tcp_server_create(grpc_surface_em()); + if (!tcp) { + goto error; + } + + for (i = 0; i < resolved->naddrs; i++) { + if (grpc_tcp_server_add_port(tcp, + (struct sockaddr *)&resolved->addrs[i].addr, + resolved->addrs[i].len) >= 0) { + count++; + } + } + if (count == 0) { + gpr_log(GPR_ERROR, "No address added out of total %d resolved", + resolved->naddrs); + goto error; + } + if (count != resolved->naddrs) { + gpr_log(GPR_ERROR, "Only %d addresses added out of total %d resolved", + count, resolved->naddrs); + } + grpc_resolved_addresses_destroy(resolved); + + /* Register with the server only upon success */ + grpc_server_add_listener(server, tcp, start, destroy); + + return 1; + +/* Error path: cleanup and return */ +error: + if (resolved) { + grpc_resolved_addresses_destroy(resolved); + } + if (tcp) { + grpc_tcp_server_destroy(tcp); + } + return 0; +} diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c new file mode 100644 index 0000000000..dcc6ce1ccc --- /dev/null +++ b/src/core/surface/server_create.c @@ -0,0 +1,41 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> +#include "src/core/surface/completion_queue.h" +#include "src/core/surface/server.h" + +grpc_server *grpc_server_create(grpc_completion_queue *cq, + const grpc_channel_args *args) { + return grpc_server_create_from_filters(cq, NULL, 0, args); +} diff --git a/src/core/surface/surface_em.c b/src/core/surface/surface_em.c new file mode 100644 index 0000000000..e1785d1a44 --- /dev/null +++ b/src/core/surface/surface_em.c @@ -0,0 +1,55 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/surface_em.h" +#include <grpc/support/log.h> + +static int initialized = 0; +static grpc_em em; + +grpc_em *grpc_surface_em() { + GPR_ASSERT(initialized && "call grpc_init()"); + return &em; +} + +void grpc_surface_em_init() { + GPR_ASSERT(!initialized); + initialized = 1; + grpc_em_init(&em); +} + +void grpc_surface_em_shutdown() { + GPR_ASSERT(initialized); + grpc_em_destroy(&em); + initialized = 0; +} diff --git a/src/core/surface/surface_em.h b/src/core/surface/surface_em.h new file mode 100644 index 0000000000..165f42f868 --- /dev/null +++ b/src/core/surface/surface_em.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__ +#define __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__ + +#include "src/core/eventmanager/em.h" + +/* Returns a global singleton event manager for + the surface apis, and is passed down to channels and + transports as needed. */ +grpc_em *grpc_surface_em(); + +void grpc_surface_em_init(); +void grpc_surface_em_shutdown(); + +#endif /* __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__ */ diff --git a/src/core/surface/surface_trace.h b/src/core/surface/surface_trace.h new file mode 100644 index 0000000000..f6f9acfd9c --- /dev/null +++ b/src/core/surface/surface_trace.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_SURFACE_TRACE_H__ +#define __GRPC_INTERNAL_SURFACE_SURFACE_TRACE_H__ + +#include <grpc/support/log.h> + +/* #define GRPC_ENABLE_SURFACE_TRACE 1 */ + +#ifdef GRPC_ENABLE_SURFACE_TRACE +#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ + do { \ + char *_ev = grpc_event_string(event); \ + gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \ + gpr_free(_ev); \ + } while (0) +#else +#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ + do { \ + } while (0) +#endif + +#endif /* __GRPC_INTERNAL_SURFACE_SURFACE_TRACE_H__ */ |