diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 703 | ||||
-rw-r--r-- | src/core/surface/call.h | 44 | ||||
-rw-r--r-- | src/core/surface/call_log_batch.c | 121 | ||||
-rw-r--r-- | src/core/surface/channel.c | 150 | ||||
-rw-r--r-- | src/core/surface/channel.h | 3 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 5 | ||||
-rw-r--r-- | src/core/surface/client.c | 52 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 41 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 7 | ||||
-rw-r--r-- | src/core/surface/init.c | 12 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 81 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 47 | ||||
-rw-r--r-- | src/core/surface/server.c | 226 | ||||
-rw-r--r-- | src/core/surface/server.h | 7 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 12 |
15 files changed, 964 insertions, 547 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index cfce943794..6ca1b4e9a1 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -33,7 +33,6 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" @@ -41,6 +40,7 @@ #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <assert.h> #include <stdio.h> #include <stdlib.h> @@ -68,8 +68,10 @@ typedef struct { } completed_request; /* See request_set in grpc_call below for a description */ -#define REQSET_EMPTY 255 -#define REQSET_DONE 254 +#define REQSET_EMPTY 'X' +#define REQSET_DONE 'Y' + +#define MAX_SEND_INITIAL_METADATA_COUNT 3 typedef struct { /* Overall status of the operation: starts OK, may degrade to @@ -79,9 +81,9 @@ typedef struct { grpc_ioreq_completion_func on_complete; void *user_data; /* a bit mask of which request ops are needed (1u << opid) */ - gpr_uint32 need_mask; + gpr_uint16 need_mask; /* a bit mask of which request ops are now completed */ - gpr_uint32 complete_mask; + gpr_uint16 complete_mask; } reqinfo_master; /* Status data for a request can come from several sources; this @@ -92,6 +94,8 @@ typedef enum { /* Status came from the application layer overriding whatever the wire says */ STATUS_FROM_API_OVERRIDE = 0, + /* Status was created by some internal channel stack operation */ + STATUS_FROM_CORE, /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, @@ -140,12 +144,17 @@ struct grpc_call { gpr_uint8 have_alarm; /* are we currently performing a send operation */ gpr_uint8 sending; + /* are we currently performing a recv operation */ + gpr_uint8 receiving; /* are we currently completing requests */ gpr_uint8 completing; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; - /* flag that we need to request more data */ - gpr_uint8 need_more_data; + /* are we currently reading a message? */ + gpr_uint8 reading_message; + /* flags with bits corresponding to write states allowing us to determine + what was sent */ + gpr_uint16 last_send_contains; /* Active ioreqs. request_set and request_data contain one element per active ioreq @@ -204,12 +213,25 @@ struct grpc_call { /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; + grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT]; + grpc_linked_mdelem status_link; + grpc_linked_mdelem details_link; + size_t send_initial_metadata_count; + gpr_timespec send_deadline; + + grpc_stream_op_buffer send_ops; + grpc_stream_op_buffer recv_ops; + grpc_stream_state recv_state; + + gpr_slice_buffer incoming_message; + gpr_uint32 incoming_message_length; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -224,12 +246,22 @@ struct grpc_call { } while (0) static void do_nothing(void *ignored, grpc_op_error also_ignored) {} -static send_action choose_send_action(grpc_call *call); -static void enact_send_action(grpc_call *call, send_action sa); +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); +static void call_on_done_recv(void *call, int success); +static void call_on_done_send(void *call, int success); +static int fill_send_ops(grpc_call *call, grpc_transport_op *op); +static void execute_op(grpc_call *call, grpc_transport_op *op); +static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); +static void finish_read_ops(grpc_call *call); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data) { + const void *server_transport_data, + grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, + gpr_timespec send_deadline) { size_t i; + grpc_transport_op initial_op; + grpc_transport_op *initial_op_ptr = NULL; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); @@ -245,13 +277,36 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; } + GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); + for (i = 0; i < add_initial_metadata_count; i++) { + call->send_initial_metadata[i].md = add_initial_metadata[i]; + } + call->send_initial_metadata_count = add_initial_metadata_count; + call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); - /* one ref is dropped in response to destroy, the other in - stream_closed */ - gpr_ref_init(&call->internal_refcount, 2); - grpc_call_stack_init(channel_stack, server_transport_data, + grpc_sopb_init(&call->send_ops); + grpc_sopb_init(&call->recv_ops); + gpr_slice_buffer_init(&call->incoming_message); + /* dropped in destroy */ + gpr_ref_init(&call->internal_refcount, 1); + /* server hack: start reads immediately so we can get initial metadata. + TODO(ctiller): figure out a cleaner solution */ + if (!call->is_client) { + memset(&initial_op, 0, sizeof(initial_op)); + initial_op.recv_ops = &call->recv_ops; + initial_op.recv_state = &call->recv_state; + initial_op.on_done_recv = call_on_done_recv; + initial_op.recv_user_data = call; + call->receiving = 1; + GRPC_CALL_INTERNAL_REF(call, "receiving"); + initial_op_ptr = &initial_op; + } + grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); + if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, send_deadline); + } return call; } @@ -264,7 +319,15 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } -void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } +#ifdef GRPC_CALL_REF_COUNT_DEBUG +void grpc_call_internal_ref(grpc_call *c, const char *reason) { + gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, + c->internal_refcount.count, c->internal_refcount.count + 1, reason); +#else +void grpc_call_internal_ref(grpc_call *c) { +#endif + gpr_ref(&c->internal_refcount); +} static void destroy_call(void *call, int ignored_success) { size_t i; @@ -284,14 +347,27 @@ static void destroy_call(void *call, int ignored_success) { for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { gpr_free(c->buffered_metadata[i].metadata); } + for (i = 0; i < c->send_initial_metadata_count; i++) { + grpc_mdelem_unref(c->send_initial_metadata[i].md); + } + grpc_sopb_destroy(&c->send_ops); + grpc_sopb_destroy(&c->recv_ops); if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } grpc_bbq_destroy(&c->incoming_queue); + gpr_slice_buffer_destroy(&c->incoming_message); gpr_free(c); } +#ifdef GRPC_CALL_REF_COUNT_DEBUG +void grpc_call_internal_unref(grpc_call *c, const char *reason, + int allow_immediate_deletion) { + gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c, + c->internal_refcount.count, c->internal_refcount.count - 1, reason); +#else void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { +#endif if (gpr_unref(&c->internal_refcount)) { if (allow_immediate_deletion) { destroy_call(c, 1); @@ -333,19 +409,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { return GRPC_CALL_OK; } -static void request_more_data(grpc_call *call) { - grpc_call_op op; - - /* call down */ - op.type = GRPC_REQUEST_DATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - - grpc_call_execute_op(call, &op); -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -356,17 +419,43 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } +static int need_more_data(grpc_call *call) { + return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) || + is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS) || + is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || + (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && + grpc_bbq_empty(&call->incoming_queue)) || + (call->write_state == WRITE_STATE_INITIAL && !call->is_client && + call->read_state != READ_STATE_STREAM_CLOSED); +} + static void unlock(grpc_call *call) { - send_action sa = SEND_NOTHING; + grpc_transport_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; - int need_more_data = - call->need_more_data && - (call->write_state >= WRITE_STATE_STARTED || !call->is_client); + int start_op = 0; int i; - if (need_more_data) { - call->need_more_data = 0; + memset(&op, 0, sizeof(op)); + + if (!call->receiving && need_more_data(call)) { + op.recv_ops = &call->recv_ops; + op.recv_state = &call->recv_state; + op.on_done_recv = call_on_done_recv; + op.recv_user_data = call; + call->receiving = 1; + GRPC_CALL_INTERNAL_REF(call, "receiving"); + start_op = 1; + } + + if (!call->sending) { + if (fill_send_ops(call, &op)) { + call->sending = 1; + GRPC_CALL_INTERNAL_REF(call, "sending"); + start_op = 1; + } } if (!call->completing && call->num_completed_requests != 0) { @@ -375,25 +464,13 @@ static void unlock(grpc_call *call) { sizeof(completed_requests)); call->num_completed_requests = 0; call->completing = 1; - grpc_call_internal_ref(call); - } - - if (!call->sending) { - sa = choose_send_action(call); - if (sa != SEND_NOTHING) { - call->sending = 1; - grpc_call_internal_ref(call); - } + GRPC_CALL_INTERNAL_REF(call, "completing"); } gpr_mu_unlock(&call->mu); - if (need_more_data) { - request_more_data(call); - } - - if (sa != SEND_NOTHING) { - enact_send_action(call, sa); + if (start_op) { + execute_op(call, &op); } if (completing_requests > 0) { @@ -404,7 +481,7 @@ static void unlock(grpc_call *call) { lock(call); call->completing = 0; unlock(call); - grpc_call_internal_unref(call, 0); + GRPC_CALL_INTERNAL_UNREF(call, "completing", 0); } } @@ -468,7 +545,6 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, master->complete_mask |= 1u << op; if (status != GRPC_OP_OK) { master->status = status; - master->complete_mask = master->need_mask; } if (master->complete_mask == master->need_mask) { for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { @@ -527,160 +603,267 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, } } -static void finish_send_op(grpc_call *call, grpc_ioreq_op op, write_state ws, - grpc_op_error error) { +static void call_on_done_send(void *pc, int success) { + grpc_call *call = pc; + grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR; lock(call); - finish_ioreq_op(call, op, error); + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error); + } + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error); + } + if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK); + } + call->last_send_contains = 0; call->sending = 0; - call->write_state = ws; unlock(call); - grpc_call_internal_unref(call, 0); + GRPC_CALL_INTERNAL_UNREF(call, "sending", 0); } -static void finish_write_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, WRITE_STATE_STARTED, error); +static void finish_message(grpc_call *call) { + /* TODO(ctiller): this could be a lot faster if coded directly */ + grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create( + call->incoming_message.slices, call->incoming_message.count); + gpr_slice_buffer_reset_and_unref(&call->incoming_message); + + grpc_bbq_push(&call->incoming_queue, byte_buffer); + + GPR_ASSERT(call->incoming_message.count == 0); + call->reading_message = 0; } -static void finish_finish_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, WRITE_STATE_WRITE_CLOSED, error); +static int begin_message(grpc_call *call, grpc_begin_message msg) { + /* can't begin a message when we're still reading a message */ + if (call->reading_message) { + char *message = NULL; + gpr_asprintf( + &message, "Message terminated early; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } + /* stash away parameters, and prepare for incoming slices */ + if (msg.length > grpc_channel_get_max_message_length(call->channel)) { + char *message = NULL; + gpr_asprintf( + &message, + "Maximum message length of %d exceeded by a message of length %d", + grpc_channel_get_max_message_length(call->channel), msg.length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (msg.length > 0) { + call->reading_message = 1; + call->incoming_message_length = msg.length; + return 1; + } else { + finish_message(call); + return 1; + } } -static void finish_start_step(void *pc, grpc_op_error error) { - finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, WRITE_STATE_STARTED, - error); +static int add_slice_to_message(grpc_call *call, gpr_slice slice) { + if (GPR_SLICE_LENGTH(slice) == 0) { + gpr_slice_unref(slice); + return 1; + } + /* we have to be reading a message to know what to do here */ + if (!call->reading_message) { + grpc_call_cancel_with_status( + call, GRPC_STATUS_INVALID_ARGUMENT, + "Received payload data while not reading a message"); + return 0; + } + /* append the slice to the incoming buffer */ + gpr_slice_buffer_add(&call->incoming_message, slice); + if (call->incoming_message.length > call->incoming_message_length) { + /* if we got too many bytes, complain */ + char *message = NULL; + gpr_asprintf( + &message, "Receiving message overflow; read %d bytes, expected %d", + (int)call->incoming_message.length, (int)call->incoming_message_length); + grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message); + gpr_free(message); + return 0; + } else if (call->incoming_message.length == call->incoming_message_length) { + finish_message(call); + return 1; + } else { + return 1; + } } -static send_action choose_send_action(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_INITIAL: - if (is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) || - is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - return SEND_BUFFERED_INITIAL_METADATA; - } else { - return SEND_INITIAL_METADATA; - } - } - return SEND_NOTHING; - case WRITE_STATE_STARTED: - if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { - if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - return SEND_BUFFERED_MESSAGE; - } else { - return SEND_MESSAGE; - } - } else if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); - if (call->is_client) { - return SEND_FINISH; - } else { - return SEND_TRAILING_METADATA_AND_FINISH; - } +static void call_on_done_recv(void *pc, int success) { + grpc_call *call = pc; + size_t i; + lock(call); + call->receiving = 0; + if (success) { + for (i = 0; success && i < call->recv_ops.nops; i++) { + grpc_stream_op *op = &call->recv_ops.ops[i]; + switch (op->type) { + case GRPC_NO_OP: + break; + case GRPC_OP_METADATA: + recv_metadata(call, &op->data.metadata); + break; + case GRPC_OP_BEGIN_MESSAGE: + success = begin_message(call, op->data.begin_message); + break; + case GRPC_OP_SLICE: + success = add_slice_to_message(call, op->data.slice); + break; } - return SEND_NOTHING; - case WRITE_STATE_WRITE_CLOSED: - return SEND_NOTHING; + } + if (call->recv_state == GRPC_STREAM_RECV_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED); + call->read_state = READ_STATE_READ_CLOSED; + } + if (call->recv_state == GRPC_STREAM_CLOSED) { + GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); + call->read_state = READ_STATE_STREAM_CLOSED; + } + finish_read_ops(call); + } else { + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR); + } + call->recv_ops.nops = 0; + unlock(call); + + GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0); +} + +static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, + grpc_metadata *metadata) { + size_t i; + grpc_mdelem_list out; + if (count == 0) { + out.head = out.tail = NULL; + return out; + } + for (i = 0; i < count; i++) { + grpc_metadata *md = &metadata[i]; + grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; + grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); + l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length); + l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; + l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - return SEND_NOTHING; + out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); + out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); + return out; } -static void send_metadata(grpc_call *call, grpc_mdelem *elem) { - grpc_call_op op; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = GRPC_WRITE_BUFFER_HINT; - op.data.metadata = elem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); +/* Copy the contents of a byte buffer into stream ops */ +static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, + grpc_stream_op_buffer *sopb) { + size_t i; + + switch (byte_buffer->type) { + case GRPC_BB_SLICE_BUFFER: + for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + gpr_slice_ref(slice); + grpc_sopb_add_slice(sopb, slice); + } + break; + } } -static void enact_send_action(grpc_call *call, send_action sa) { +static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; - grpc_call_op op; + grpc_metadata_batch mdb; size_t i; - gpr_uint32 flags = 0; char status_str[GPR_LTOA_MIN_BUFSIZE]; + GPR_ASSERT(op->send_ops == NULL); - switch (sa) { - case SEND_NOTHING: - abort(); - break; - case SEND_BUFFERED_INITIAL_METADATA: - flags |= GRPC_WRITE_BUFFER_HINT; - /* fallthrough */ - case SEND_INITIAL_METADATA: + switch (call->write_state) { + case WRITE_STATE_INITIAL: + if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { + break; + } data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = call->send_deadline; + for (i = 0; i < call->send_initial_metadata_count; i++) { + grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]); } - op.type = GRPC_SEND_START; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.start.pollset = grpc_cq_pollset(call->cq); - op.done_cb = finish_start_step; - op.user_data = call; - grpc_call_execute_op(call, &op); - break; - case SEND_BUFFERED_MESSAGE: - flags |= GRPC_WRITE_BUFFER_HINT; - /* fallthrough */ - case SEND_MESSAGE: - data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.message = data.send_message; - op.done_cb = finish_write_step; - op.user_data = call; - grpc_call_execute_op(call, &op); - break; - case SEND_TRAILING_METADATA_AND_FINISH: - /* send trailing metadata */ - data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); + grpc_sopb_add_metadata(&call->send_ops, mdb); + op->send_ops = &call->send_ops; + op->bind_pollset = grpc_cq_pollset(call->cq); + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; + call->write_state = WRITE_STATE_STARTED; + call->send_initial_metadata_count = 0; + /* fall through intended */ + case WRITE_STATE_STARTED: + if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { + data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + grpc_sopb_add_begin_message( + &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); + op->send_ops = &call->send_ops; + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; } - /* send status */ - /* TODO(ctiller): cache common status values */ - data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - gpr_ltoa(data.send_status.code, status_str); - send_metadata( - call, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); - if (data.send_status.details) { - send_metadata( - call, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) { + op->is_last_send = 1; + op->send_ops = &call->send_ops; + call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; + call->write_state = WRITE_STATE_WRITE_CLOSED; + if (!call->is_client) { + /* send trailing metadata */ + data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; + mdb.list = chain_metadata_from_app(call, data.send_metadata.count, + data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future; + /* send status */ + /* TODO(ctiller): cache common status values */ + data = call->request_data[GRPC_IOREQ_SEND_STATUS]; + gpr_ltoa(data.send_status.code, status_str); + grpc_metadata_batch_add_tail( + &mdb, &call->status_link, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, status_str))); + if (data.send_status.details) { + grpc_metadata_batch_add_tail( + &mdb, &call->details_link, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref( + grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_status.details))); + } + grpc_sopb_add_metadata(&call->send_ops, mdb); + } } - /* fallthrough: see choose_send_action for details */ - case SEND_FINISH: - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = finish_finish_step; - op.user_data = call; - grpc_call_execute_op(call, &op); break; + case WRITE_STATE_WRITE_CLOSED: + break; + } + if (op->send_ops) { + op->on_done_send = call_on_done_send; + op->send_user_data = call; } + return op->send_ops != NULL; } static grpc_call_error start_ioreq_error(grpc_call *call, @@ -789,10 +972,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, master->on_complete = completion; master->user_data = user_data; - if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) { - call->need_more_data = 1; - } - finish_read_ops(call); early_out_write_ops(call); @@ -819,43 +998,37 @@ void grpc_call_destroy(grpc_call *c) { cancel = c->read_state != READ_STATE_STREAM_CLOSED; unlock(c); if (cancel) grpc_call_cancel(c); - grpc_call_internal_unref(c, 1); + GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1); } -grpc_call_error grpc_call_cancel(grpc_call *c) { - grpc_call_element *elem; - grpc_call_op op; - - op.type = GRPC_CANCEL_OP; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - - elem = CALL_ELEM_FROM_CALL(c, 0); - elem->filter->call_op(elem, NULL, &op); - - return GRPC_CALL_OK; +grpc_call_error grpc_call_cancel(grpc_call *call) { + return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled"); } grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) { + grpc_transport_op op; grpc_mdstr *details = description ? grpc_mdstr_from_string(c->metadata_context, description) : NULL; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = status; + lock(c); set_status_code(c, STATUS_FROM_API_OVERRIDE, status); set_status_details(c, STATUS_FROM_API_OVERRIDE, details); unlock(c); - return grpc_call_cancel(c); + + execute_op(c, &op); + + return GRPC_CALL_OK; } -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { +static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, op); + elem->filter->start_transport_op(elem, op); } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -872,38 +1045,20 @@ static void call_alarm(void *arg, int success) { grpc_call_cancel(call); } } - grpc_call_internal_unref(call, 1); + GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1); } -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { if (call->have_alarm) { gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); + assert(0); + return; } - grpc_call_internal_ref(call); + GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state(grpc_call *call, read_state state) { - lock(call); - GPR_ASSERT(call->read_state < state); - call->read_state = state; - finish_read_ops(call); - unlock(call); -} - -void grpc_call_read_closed(grpc_call_element *elem) { - set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED); -} - -void grpc_call_stream_closed(grpc_call_element *elem) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - set_read_state(call, READ_STATE_STREAM_CLOSED); - grpc_call_internal_unref(call, 0); -} - /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ @@ -914,7 +1069,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -927,61 +1082,65 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } -void grpc_call_recv_message(grpc_call_element *elem, - grpc_byte_buffer *byte_buffer) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - lock(call); - grpc_bbq_push(&call->incoming_queue, byte_buffer); - finish_read_ops(call); - unlock(call); -} - -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdstr *key = md->key; +static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { + grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; - - lock(call); - if (key == grpc_channel_get_status_string(call->channel)) { - set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); - grpc_mdelem_unref(md); - } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); - } else { - dest = &call->buffered_metadata[call->read_state >= - READ_STATE_GOT_INITIAL_METADATA]; - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(md->key); - mdusr->value = grpc_mdstr_as_c_string(md->value); - mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = GPR_MAX( - call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + int is_trailing; + grpc_mdctx *mdctx = call->metadata_context; + + is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; + for (l = md->list.head; l != NULL; l = l->next) { + grpc_mdelem *md = l->md; + grpc_mdstr *key = md->key; + if (key == grpc_channel_get_status_string(call->channel)) { + set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); + } else if (key == grpc_channel_get_message_string(call->channel)) { + set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + } else { + dest = &call->buffered_metadata[is_trailing]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(md->key); + mdusr->value = grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + if (call->owned_metadata_count == call->owned_metadata_capacity) { + call->owned_metadata_capacity = + GPR_MAX(call->owned_metadata_capacity + 8, + call->owned_metadata_capacity * 2); + call->owned_metadata = + gpr_realloc(call->owned_metadata, + sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + } + call->owned_metadata[call->owned_metadata_count++] = md; + l->md = 0; } - call->owned_metadata[call->owned_metadata_count++] = md; } - unlock(call); + if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, md->deadline); + } + if (!is_trailing) { + call->read_state = READ_STATE_GOT_INITIAL_METADATA; + } + + grpc_mdctx_lock(mdctx); + for (l = md->list.head; l; l = l->next) { + if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + for (l = md->garbage.head; l; l = l->next) { + grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + grpc_mdctx_unlock(mdctx); } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { - grpc_call *call = grpc_call_from_top_element(surface_element); - set_read_state(call, READ_STATE_GOT_INITIAL_METADATA); -} - /* * BATCH API IMPLEMENTATION */ @@ -1006,6 +1165,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, const grpc_op *op; grpc_ioreq *req; + GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); + if (nops == 0) { grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); diff --git a/src/core/surface/call.h b/src/core/surface/call.h index cb81cb52c2..2d4c7f61e3 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -35,7 +35,6 @@ #define GRPC_INTERNAL_CORE_SURFACE_CALL_H #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include <grpc/grpc.h> /* Primitive operation types - grpc_op's get rewritten into these */ @@ -67,7 +66,7 @@ typedef union { } recv_status_details; struct { size_t count; - const grpc_metadata *metadata; + grpc_metadata *metadata; } send_metadata; grpc_byte_buffer *send_message; struct { @@ -86,37 +85,42 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call, void *user_data); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data); + const void *server_transport_data, + grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, + gpr_timespec send_deadline); void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); +#ifdef GRPC_CALL_REF_COUNT_DEBUG +void grpc_call_internal_ref(grpc_call *call, const char *reason); +void grpc_call_internal_unref(grpc_call *call, const char *reason, int allow_immediate_deletion); +#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call, reason) +#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) grpc_call_internal_unref(call, reason, allow_immediate_deletion) +#else void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); +#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call) +#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) grpc_call_internal_unref(call, allow_immediate_deletion) +#endif -/* Helpers for grpc_client, grpc_server filters to publish received data to - the completion queue/surface layer */ -void grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_mdelem *md); -void grpc_call_recv_message(grpc_call_element *surface_element, - grpc_byte_buffer *message); -void grpc_call_read_closed(grpc_call_element *surface_element); -void grpc_call_stream_closed(grpc_call_element *surface_element); - -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op); grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); -/* Called when it's known that the initial batch of metadata is complete */ -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element); - -void grpc_call_set_deadline(grpc_call_element *surface_element, - gpr_timespec deadline); - grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); -#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ +extern int grpc_trace_batch; + +void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, + grpc_call *call, const grpc_op *ops, size_t nops, + void *tag); + +#define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \ + if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag) + +#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c new file mode 100644 index 0000000000..a33583a12d --- /dev/null +++ b/src/core/surface/call_log_batch.c @@ -0,0 +1,121 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/call.h" + +#include "src/core/support/string.h" +#include <grpc/support/alloc.h> + +int grpc_trace_batch = 0; + +static void add_metadata(gpr_strvec *b, const grpc_metadata *md, size_t count) { + size_t i; + for(i = 0; i < count; i++) { + gpr_strvec_add(b, gpr_strdup("\nkey=")); + gpr_strvec_add(b, gpr_strdup(md[i].key)); + + gpr_strvec_add(b, gpr_strdup(" value=")); + gpr_strvec_add(b, gpr_hexdump(md[i].value, md[i].value_length, + GPR_HEXDUMP_PLAINTEXT)); + } +} + +char *grpc_op_string(const grpc_op *op) { + char *tmp; + char *out; + + gpr_strvec b; + gpr_strvec_init(&b); + + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA")); + add_metadata(&b, op->data.send_initial_metadata.metadata, + op->data.send_initial_metadata.count); + break; + case GRPC_OP_SEND_MESSAGE: + gpr_asprintf(&tmp, "SEND_MESSAGE ptr=%p", op->data.send_message); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + gpr_strvec_add(&b, gpr_strdup("SEND_CLOSE_FROM_CLIENT")); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + gpr_asprintf(&tmp, "SEND_STATUS_FROM_SERVER status=%d details=%s", + op->data.send_status_from_server.status, + op->data.send_status_from_server.status_details); + gpr_strvec_add(&b, tmp); + add_metadata(&b, op->data.send_status_from_server.trailing_metadata, + op->data.send_status_from_server.trailing_metadata_count); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + gpr_asprintf(&tmp, "RECV_INITIAL_METADATA ptr=%p", + op->data.recv_initial_metadata); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_RECV_MESSAGE: + gpr_asprintf(&tmp, "RECV_MESSAGE ptr=%p", op->data.recv_message); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + gpr_asprintf(&tmp, + "RECV_STATUS_ON_CLIENT metadata=%p status=%p details=%p", + op->data.recv_status_on_client.trailing_metadata, + op->data.recv_status_on_client.status, + op->data.recv_status_on_client.status_details); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + gpr_asprintf(&tmp, "RECV_CLOSE_ON_SERVER cancelled=%p", + op->data.recv_close_on_server.cancelled); + gpr_strvec_add(&b, tmp); + } + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); + + return out; +} + +void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, + grpc_call *call, const grpc_op *ops, size_t nops, + void *tag) { + char *tmp; + size_t i; + gpr_log(file, line, severity, + "grpc_call_start_batch(%p, %p, %d, 0x%x)", call, ops, nops, tag); + for(i = 0; i < nops; i++) { + tmp = grpc_op_string(&ops[i]); + gpr_log(file, line, severity, "ops[%d]: %s", i, tmp); + gpr_free(tmp); + } +} diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index e764a3b9af..78f9144c19 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -43,30 +43,46 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +typedef struct registered_call { + grpc_mdelem *path; + grpc_mdelem *authority; + struct registered_call *next; +} registered_call; + struct grpc_channel { int is_client; gpr_refcount refs; + gpr_uint32 max_message_length; grpc_mdctx *metadata_context; grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; + + gpr_mu registered_call_mu; + registered_call *registered_calls; }; -#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) -#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) (((grpc_channel *)(channel_stack)) - 1) +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) +#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ + (((grpc_channel *)(channel_stack)) - 1) #define CHANNEL_FROM_TOP_ELEM(top_elem) \ CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) +/* the protobuf library will (by default) start warning at 100megs */ +#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) + grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; - /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */ + /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if + * is_client */ gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); @@ -75,59 +91,41 @@ grpc_channel *grpc_channel_create_from_filters( channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL(channel)); + gpr_mu_init(&channel->registered_call_mu); + channel->registered_calls = NULL; + + channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + if (args) { + for (i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else if (args->args[i].value.integer < 0) { + gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else { + channel->max_message_length = args->args[i].value.integer; + } + } + } + } + return channel; } -static void do_nothing(void *ignored, grpc_op_error error) {} +static grpc_call *grpc_channel_create_call_internal( + grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, + grpc_mdelem *authority_mdelem, gpr_timespec deadline) { + grpc_mdelem *send_metadata[2]; -grpc_call *grpc_channel_create_call(grpc_channel *channel, - grpc_completion_queue *cq, - const char *method, const char *host, - gpr_timespec absolute_deadline) { - grpc_call *call; - grpc_mdelem *path_mdelem; - grpc_mdelem *authority_mdelem; - grpc_call_op op; - - if (!channel->is_client) { - gpr_log(GPR_ERROR, "Cannot create a call on the server."); - return NULL; - } - - call = grpc_call_create(channel, cq, NULL); + GPR_ASSERT(channel->is_client); - /* Add :path and :authority headers. */ - /* TODO(klempner): Consider optimizing this by stashing mdelems for common - values of method and host. */ - path_mdelem = grpc_mdelem_from_metadata_strings( - channel->metadata_context, grpc_mdstr_ref(channel->path_string), - grpc_mdstr_from_string(channel->metadata_context, method)); - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.metadata = path_mdelem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - - grpc_mdstr_ref(channel->authority_string); - authority_mdelem = grpc_mdelem_from_metadata_strings( - channel->metadata_context, channel->authority_string, - grpc_mdstr_from_string(channel->metadata_context, host)); - op.data.metadata = authority_mdelem; - grpc_call_execute_op(call, &op); - - if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) { - op.type = GRPC_SEND_DEADLINE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.deadline = absolute_deadline; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - } + send_metadata[0] = path_mdelem; + send_metadata[1] = authority_mdelem; - return call; + return grpc_call_create(channel, cq, NULL, send_metadata, + GPR_ARRAY_SIZE(send_metadata), deadline); } grpc_call *grpc_channel_create_call_old(grpc_channel *channel, @@ -137,6 +135,46 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, absolute_deadline); } +grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_completion_queue *cq, + const char *method, const char *host, + gpr_timespec deadline) { + return grpc_channel_create_call_internal( + channel, cq, + grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->path_string), + grpc_mdstr_from_string(channel->metadata_context, method)), + grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->authority_string), + grpc_mdstr_from_string(channel->metadata_context, host)), + deadline); +} + +void *grpc_channel_register_call(grpc_channel *channel, const char *method, + const char *host) { + registered_call *rc = gpr_malloc(sizeof(registered_call)); + rc->path = grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->path_string), + grpc_mdstr_from_string(channel->metadata_context, method)); + rc->authority = grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->authority_string), + grpc_mdstr_from_string(channel->metadata_context, host)); + gpr_mu_lock(&channel->registered_call_mu); + rc->next = channel->registered_calls; + channel->registered_calls = rc; + gpr_mu_unlock(&channel->registered_call_mu); + return rc; +} + +grpc_call *grpc_channel_create_registered_call( + grpc_channel *channel, grpc_completion_queue *completion_queue, + void *registered_call_handle, gpr_timespec deadline) { + registered_call *rc = registered_call_handle; + return grpc_channel_create_call_internal( + channel, completion_queue, grpc_mdelem_ref(rc->path), + grpc_mdelem_ref(rc->authority), deadline); +} + void grpc_channel_internal_ref(grpc_channel *channel) { gpr_ref(&channel->refs); } @@ -148,7 +186,15 @@ static void destroy_channel(void *p, int ok) { grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); + while (channel->registered_calls) { + registered_call *rc = channel->registered_calls; + channel->registered_calls = rc->next; + grpc_mdelem_unref(rc->path); + grpc_mdelem_unref(rc->authority); + gpr_free(rc); + } grpc_mdctx_unref(channel->metadata_context); + gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel); } @@ -196,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { return channel->grpc_message_string; } + +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) { + return channel->max_message_length; +} diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index d3e51185ee..388be35711 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -44,10 +44,11 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); void grpc_client_channel_closed(grpc_channel_element *elem); void grpc_channel_internal_ref(grpc_channel *channel); void grpc_channel_internal_unref(grpc_channel *channel); -#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 3104b1d00d..daa8d3a7c6 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -44,7 +44,6 @@ #include "src/core/channel/client_setup.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/channel/http_filter.h" #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" @@ -176,8 +175,8 @@ static void done_setup(void *sp) { static grpc_transport_setup_result complete_setup(void *channel_stack, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_client_filter}; return grpc_client_channel_transport_setup_complete( channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 4d54865d16..8ac4dd1e0e 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -39,46 +39,14 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef struct { - void *unused; -} call_data; +typedef struct { void *unused; } call_data; -typedef struct { - void *unused; -} channel_data; +typedef struct { void *unused; } channel_data; -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void client_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_SEND_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - grpc_call_next_op(elem, op); - break; - case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op->data.metadata); - break; - case GRPC_RECV_DEADLINE: - gpr_log(GPR_ERROR, "Deadline received by client (ignored)"); - break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_RECV_HALF_CLOSE: - grpc_call_read_closed(elem); - break; - case GRPC_RECV_FINISH: - grpc_call_stream_closed(elem); - break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - grpc_call_initial_metadata_complete(elem); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); - } + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -100,7 +68,8 @@ static void channel_op(grpc_channel_element *elem, } static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data) {} + const void *transport_server_data, + grpc_transport_op *initial_op) {} static void destroy_call_elem(grpc_call_element *elem) {} @@ -114,6 +83,7 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} const grpc_channel_filter grpc_client_surface_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "client", }; + client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "client", +}; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 6a1d83ce5d..c1c97af337 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -67,6 +67,8 @@ struct grpc_completion_queue { /* When refs drops to zero, we are in shutdown mode, and will be destroyable once all queued events are drained */ gpr_refcount refs; + /* Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; /* the set of low level i/o things that concern this cq */ grpc_pollset pollset; /* 0 initially, 1 once we've begun shutting down */ @@ -91,11 +93,29 @@ grpc_completion_queue *grpc_completion_queue_create(void) { memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->refs, 1); + gpr_ref_init(&cc->owning_refs, 1); grpc_pollset_init(&cc->pollset); cc->allow_polling = 1; return cc; } +void grpc_cq_internal_ref(grpc_completion_queue *cc) { + gpr_ref(&cc->owning_refs); +} + +static void on_pollset_destroy_done(void *arg) { + grpc_completion_queue *cc = arg; + grpc_pollset_destroy(&cc->pollset); + gpr_free(cc); +} + +void grpc_cq_internal_unref(grpc_completion_queue *cc) { + if (gpr_unref(&cc->owning_refs)) { + GPR_ASSERT(cc->queue == NULL); + grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + } +} + void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { cc->allow_polling = 0; } @@ -135,7 +155,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, grpc_completion_type type) { gpr_ref(&cc->refs); - if (call) grpc_call_internal_ref(call); + if (call) GRPC_CALL_INTERNAL_REF(call, "cq"); #ifndef NDEBUG gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1); #endif @@ -394,22 +414,15 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { } } -static void on_pollset_destroy_done(void *arg) { - grpc_completion_queue *cc = arg; - grpc_pollset_destroy(&cc->pollset); - gpr_free(cc); -} - void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GPR_ASSERT(cc->queue == NULL); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_cq_internal_unref(cc); } void grpc_event_finish(grpc_event *base) { event *ev = (event *)base; ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK); if (ev->base.call) { - grpc_call_internal_unref(ev->base.call, 1); + GRPC_CALL_INTERNAL_UNREF(ev->base.call, "cq", 1); } gpr_free(ev); } @@ -432,3 +445,11 @@ void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) { grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { return &cc->pollset; } + +void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_kick(&cc->pollset); + grpc_pollset_work(&cc->pollset, + gpr_time_add(gpr_now(), gpr_time_from_millis(100))); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +} diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 3054264cad..41024cda14 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -43,6 +43,9 @@ grpc_event_finish */ typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error); +void grpc_cq_internal_ref(grpc_completion_queue *cc); +void grpc_cq_internal_unref(grpc_completion_queue *cc); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made */ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, @@ -114,4 +117,6 @@ void grpc_cq_dump_pending_ops(grpc_completion_queue *cc); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); -#endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ +void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc); + +#endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/surface/init.c b/src/core/surface/init.c index e48c4202e5..5a119a47cc 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -32,10 +32,12 @@ */ #include <grpc/grpc.h> -#include "src/core/iomgr/iomgr.h" +#include "src/core/channel/channel_stack.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/iomgr.h" #include "src/core/statistics/census_interface.h" -#include "src/core/channel/channel_stack.h" +#include "src/core/profiling/timers.h" +#include "src/core/surface/call.h" #include "src/core/surface/init.h" #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" @@ -57,10 +59,12 @@ void grpc_init(void) { grpc_register_tracer("channel", &grpc_trace_channel); grpc_register_tracer("surface", &grpc_surface_trace); grpc_register_tracer("http", &grpc_http_trace); + grpc_register_tracer("batch", &grpc_trace_batch); grpc_security_pre_init(); - grpc_tracer_init("GRPC_TRACE"); grpc_iomgr_init(); + grpc_tracer_init("GRPC_TRACE"); census_init(); + grpc_timers_log_global_init(); } gpr_mu_unlock(&g_init_mu); } @@ -70,6 +74,7 @@ void grpc_shutdown(void) { if (--g_initializations == 0) { grpc_iomgr_shutdown(); census_shutdown(); + grpc_timers_log_global_destroy(); } gpr_mu_unlock(&g_init_mu); } @@ -82,4 +87,3 @@ int grpc_is_initialized(void) { gpr_mu_unlock(&g_init_mu); return r; } - diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b40c48381f..3186292a02 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -43,33 +43,39 @@ #include <grpc/support/log.h> typedef struct { - void *unused; + grpc_linked_mdelem status; + grpc_linked_mdelem details; } call_data; -typedef struct { - grpc_mdelem *status; - grpc_mdelem *message; -} channel_data; +typedef struct { grpc_mdctx *mdctx; } channel_data; -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { - channel_data *channeld = elem->channel_data; +static void lame_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_SEND_START: - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status)); - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); - grpc_call_stream_closed(elem); - break; - case GRPC_SEND_METADATA: - grpc_mdelem_unref(op->data.metadata); - break; - default: - break; + if (op->send_ops) { + op->on_done_send(op->send_user_data, 0); + } + if (op->recv_ops) { + char tmp[GPR_LTOA_MIN_BUFSIZE]; + grpc_metadata_batch mdb; + gpr_ltoa(GRPC_STATUS_UNKNOWN, tmp); + calld->status.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp); + calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", + "Rpc sent on a lame channel."); + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb.list.head = &calld->status; + mdb.list.tail = &calld->details; + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future; + grpc_sopb_add_metadata(op->recv_ops, mdb); + *op->recv_state = GRPC_STREAM_CLOSED; + op->on_done_recv(op->recv_user_data, 1); } - - op->done_cb(op->user_data, GRPC_OP_ERROR); } static void channel_op(grpc_channel_element *elem, @@ -87,36 +93,31 @@ static void channel_op(grpc_channel_element *elem, } static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data) {} + const void *transport_server_data, + grpc_transport_op *initial_op) { + if (initial_op) { + grpc_transport_op_finish_with_failure(initial_op); + } +} static void destroy_call_elem(grpc_call_element *elem) {} static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { - channel_data *channeld = elem->channel_data; - char status[12]; - + channel_data *chand = elem->channel_data; GPR_ASSERT(is_first); GPR_ASSERT(is_last); - - channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message", - "Rpc sent on a lame channel."); - gpr_ltoa(GRPC_STATUS_UNKNOWN, status); - channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status); + chand->mdctx = mdctx; } -static void destroy_channel_elem(grpc_channel_element *elem) { - channel_data *channeld = elem->channel_data; - - grpc_mdelem_unref(channeld->message); - grpc_mdelem_unref(channeld->status); -} +static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "lame-client", }; + lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "lame-client", +}; grpc_channel *grpc_lame_client_channel_create(void) { static const grpc_channel_filter *filters[] = {&lame_filter}; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 8e56868d42..3e331293b5 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -44,11 +44,10 @@ #include "src/core/channel/client_setup.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/channel/http_filter.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" #include "src/core/security/auth.h" -#include "src/core/security/security_context.h" +#include "src/core/security/credentials.h" #include "src/core/security/secure_transport_setup.h" #include "src/core/support/string.h" #include "src/core/surface/channel.h" @@ -74,7 +73,7 @@ typedef struct { } request; struct setup { - grpc_channel_security_context *security_context; + grpc_channel_security_connector *security_connector; const char *target; grpc_transport_setup_callback setup_callback; void *setup_user_data; @@ -130,7 +129,7 @@ static void on_connect(void *rp, grpc_endpoint *tcp) { return; } } else { - grpc_setup_secure_transport(&r->setup->security_context->base, tcp, + grpc_setup_secure_transport(&r->setup->security_connector->base, tcp, on_secure_transport_setup_done, r); } } @@ -185,7 +184,7 @@ static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) { static void done_setup(void *sp) { setup *s = sp; gpr_free((void *)s->target); - grpc_security_context_unref(&s->security_context->base); + grpc_security_connector_unref(&s->security_connector->base); gpr_free(s); } @@ -193,7 +192,7 @@ static grpc_transport_setup_result complete_setup(void *channel_stack, grpc_transport *transport, grpc_mdctx *mdctx) { static grpc_channel_filter const *extra_filters[] = { - &grpc_client_auth_filter, &grpc_http_client_filter, &grpc_http_filter}; + &grpc_client_auth_filter, &grpc_http_client_filter}; return grpc_client_channel_transport_setup_complete( channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); @@ -203,24 +202,37 @@ static grpc_transport_setup_result complete_setup(void *channel_stack, Asynchronously: - resolve target - connect to it (trying alternatives as presented) - perform handshakes */ -grpc_channel *grpc_secure_channel_create_internal( - const char *target, const grpc_channel_args *args, - grpc_channel_security_context *context) { +grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, + const char *target, + const grpc_channel_args *args) { setup *s; grpc_channel *channel; - grpc_arg context_arg; + grpc_arg connector_arg; grpc_channel_args *args_copy; - grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_channel_args *new_args_from_connector; + grpc_channel_security_connector *connector; + grpc_mdctx *mdctx; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; int n = 0; - if (grpc_find_security_context_in_args(args) != NULL) { + + if (grpc_find_security_connector_in_args(args) != NULL) { gpr_log(GPR_ERROR, "Cannot set security context in channel args."); + return grpc_lame_client_channel_create(); + } + + if (grpc_credentials_create_security_connector( + creds, target, args, NULL, &connector, &new_args_from_connector) != + GRPC_SECURITY_OK) { + return grpc_lame_client_channel_create(); } + mdctx = grpc_credentials_get_or_create_metadata_context(creds); s = gpr_malloc(sizeof(setup)); - context_arg = grpc_security_context_to_arg(&context->base); - args_copy = grpc_channel_args_copy_and_add(args, &context_arg); + connector_arg = grpc_security_connector_to_arg(&connector->base); + args_copy = grpc_channel_args_copy_and_add( + new_args_from_connector != NULL ? new_args_from_connector : args, + &connector_arg); filters[n++] = &grpc_client_surface_filter; if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; @@ -229,13 +241,14 @@ grpc_channel *grpc_secure_channel_create_internal( GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); grpc_channel_args_destroy(args_copy); + if (new_args_from_connector != NULL) { + grpc_channel_args_destroy(new_args_from_connector); + } s->target = gpr_strdup(target); s->setup_callback = complete_setup; s->setup_user_data = grpc_channel_get_channel_stack(channel); - s->security_context = - (grpc_channel_security_context *)grpc_security_context_ref( - &context->base); + s->security_connector = connector; grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel), args, mdctx, initiate_setup, done_setup, s); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 424734c54c..83caefcbc6 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -137,6 +137,7 @@ struct grpc_server { size_t cq_count; gpr_mu mu; + gpr_cv cv; registered_method *registered_methods; requested_call_array requested_calls; @@ -149,6 +150,7 @@ struct grpc_server { channel_data root_channel_data; listener *listeners; + int listeners_destroyed; gpr_refcount internal_refcount; }; @@ -171,13 +173,19 @@ struct call_data { grpc_call *call; call_state state; - gpr_timespec deadline; grpc_mdstr *path; grpc_mdstr *host; + gpr_timespec deadline; + int got_initial_metadata; legacy_data *legacy; grpc_completion_queue *cq_new; + grpc_stream_op_buffer *recv_ops; + grpc_stream_state *recv_state; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; }; @@ -260,9 +268,11 @@ static void server_ref(grpc_server *server) { static void server_unref(grpc_server *server) { registered_method *rm; + size_t i; if (gpr_unref(&server->internal_refcount)) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); + gpr_cv_destroy(&server->cv); gpr_free(server->channel_filters); requested_call_array_destroy(&server->requested_calls); while ((rm = server->registered_methods) != NULL) { @@ -272,6 +282,9 @@ static void server_unref(grpc_server *server) { requested_call_array_destroy(&rm->requested); gpr_free(rm); } + for (i = 0; i < server->cq_count; i++) { + grpc_cq_internal_unref(server->cqs[i]); + } gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -368,88 +381,89 @@ static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } -static void stream_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - break; - case PENDING: - call_list_remove(calld, PENDING_START); - /* fallthrough intended */ - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; + call_data *calld = elem->call_data; + if (md->key == chand->path_key) { + calld->path = grpc_mdstr_ref(md->value); + return NULL; + } else if (md->key == chand->authority_key) { + calld->host = grpc_mdstr_ref(md->value); + return NULL; } - gpr_mu_unlock(&chand->server->mu); - grpc_call_stream_closed(elem); + return md; } -static void read_closed(grpc_call_element *elem) { +static void server_on_recv(void *ptr, int success) { + grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - case PENDING: - grpc_call_read_closed(elem); - break; - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); -} -static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, - grpc_call_op *op) { - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; - grpc_mdelem *md; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_RECV_METADATA: - md = op->data.metadata; - if (md->key == chand->path_key) { - calld->path = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else if (md->key == chand->authority_key) { - calld->host = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else { - grpc_call_recv_metadata(elem, md); + if (success && !calld->got_initial_metadata) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); + if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) { + calld->deadline = op->data.metadata.deadline; } - break; - case GRPC_RECV_END_OF_INITIAL_METADATA: + calld->got_initial_metadata = 1; start_new_rpc(elem); - grpc_call_initial_metadata_complete(elem); - break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); break; - case GRPC_RECV_HALF_CLOSE: - read_closed(elem); + } + } + + switch (*calld->recv_state) { + case GRPC_STREAM_OPEN: break; - case GRPC_RECV_FINISH: - stream_closed(elem); + case GRPC_STREAM_SEND_CLOSED: break; - case GRPC_RECV_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - ((call_data *)elem->call_data)->deadline = op->data.deadline; + case GRPC_STREAM_RECV_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } + gpr_mu_unlock(&chand->server->mu); break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); + case GRPC_STREAM_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } else if (calld->state == PENDING) { + call_list_remove(calld, PENDING_START); + } + gpr_mu_unlock(&chand->server->mu); break; } + + calld->on_done_recv(calld->recv_user_data, success); +} + +static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { + call_data *calld = elem->call_data; + + if (op->recv_ops) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->recv_state = op->recv_state; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = server_on_recv; + op->recv_user_data = elem; + } +} + +static void server_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + server_mutate_op(elem, op); + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -461,7 +475,8 @@ static void channel_op(grpc_channel_element *elem, case GRPC_ACCEPT_CALL: /* create a call */ grpc_call_create(chand->channel, NULL, - op->data.accept_call.transport_server_data); + op->data.accept_call.transport_server_data, NULL, 0, + gpr_inf_future); break; case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the @@ -499,7 +514,8 @@ static void shutdown_channel(channel_data *chand) { } static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -511,6 +527,8 @@ static void init_call_elem(grpc_call_element *elem, gpr_mu_unlock(&chand->server->mu); server_ref(chand->server); + + if (initial_op) server_mutate_op(elem, initial_op); } static void destroy_call_elem(grpc_call_element *elem) { @@ -589,9 +607,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "server", + server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "server", }; static void addcq(grpc_server *server, grpc_completion_queue *cq) { @@ -599,6 +617,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } + grpc_cq_internal_ref(cq); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); @@ -620,6 +639,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, if (cq) addcq(server, cq); gpr_mu_init(&server->mu); + gpr_cv_init(&server->cv); server->unregistered_cq = cq; /* decremented by grpc_server_destroy */ @@ -733,7 +753,8 @@ grpc_transport_setup_result grpc_server_setup_transport( channel = grpc_channel_create_from_filters(filters, num_filters, s->channel_args, mdctx, 0); chand = (channel_data *)grpc_channel_stack_element( - grpc_channel_get_channel_stack(channel), 0)->channel_data; + grpc_channel_get_channel_stack(channel), 0) + ->channel_data; chand->server = s; server_ref(s); chand->channel = channel; @@ -754,7 +775,7 @@ grpc_transport_setup_result grpc_server_setup_transport( method = grpc_mdstr_from_string(mdctx, rm->method); hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); for (probes = 0; chand->registered_methods[(hash + probes) % slots] - .server_registered_method != NULL; + .server_registered_method != NULL; probes++) ; if (probes > max_probes) max_probes = probes; @@ -781,6 +802,15 @@ grpc_transport_setup_result grpc_server_setup_transport( return result; } +static int num_listeners(grpc_server *server) { + listener *l; + int n = 0; + for (l = server->listeners; l; l = l->next) { + n++; + } + return n; +} + static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, void *shutdown_tag) { listener *l; @@ -878,11 +908,6 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, for (l = server->listeners; l; l = l->next) { l->destroy(server, l->arg); } - while (server->listeners) { - l = server->listeners; - server->listeners = l->next; - gpr_free(l); - } } void grpc_server_shutdown(grpc_server *server) { @@ -893,8 +918,20 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) { shutdown_internal(server, 1, tag); } +void grpc_server_listener_destroy_done(void *s) { + grpc_server *server = s; + gpr_mu_lock(&server->mu); + server->listeners_destroyed++; + gpr_cv_signal(&server->cv); + gpr_mu_unlock(&server->mu); +} + void grpc_server_destroy(grpc_server *server) { channel_data *c; + listener *l; + size_t i; + call_data *calld; + gpr_mu_lock(&server->mu); if (!server->shutdown) { gpr_mu_unlock(&server->mu); @@ -902,6 +939,32 @@ void grpc_server_destroy(grpc_server *server) { gpr_mu_lock(&server->mu); } + while (server->listeners_destroyed != num_listeners(server)) { + for (i = 0; i < server->cq_count; i++) { + gpr_mu_unlock(&server->mu); + grpc_cq_hack_spin_pollset(server->cqs[i]); + gpr_mu_lock(&server->mu); + } + + gpr_cv_wait(&server->cv, &server->mu, + gpr_time_add(gpr_now(), gpr_time_from_millis(100))); + } + + while (server->listeners) { + l = server->listeners; + server->listeners = l->next; + gpr_free(l); + } + + while ((calld = call_list_remove_head(&server->lists[PENDING_START], + PENDING_START)) != NULL) { + gpr_log(GPR_DEBUG, "server destroys call %p", calld->call); + calld->state = ZOMBIED; + grpc_iomgr_add_callback( + kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + } + for (c = server->root_channel_data.next; c != &server->root_channel_data; c = c->next) { shutdown_channel(c); @@ -1048,6 +1111,7 @@ static void begin_call(grpc_server *server, call_data *calld, &rc->data.batch.details->host_capacity, calld->host); cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); + rc->data.batch.details->deadline = calld->deadline; grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); *rc->data.batch.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; @@ -1073,7 +1137,7 @@ static void begin_call(grpc_server *server, call_data *calld, break; } - grpc_call_internal_ref(calld->call); + GRPC_CALL_INTERNAL_REF(calld->call, "server"); grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc->tag); } diff --git a/src/core/surface/server.h b/src/core/surface/server.h index e33f69b8c7..2cfa38fa43 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -48,9 +48,12 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, and when it shuts down, it will call destroy */ void grpc_server_add_listener(grpc_server *server, void *listener, void (*start)(grpc_server *server, void *arg, - grpc_pollset **pollsets, size_t npollsets), + grpc_pollset **pollsets, + size_t npollsets), void (*destroy)(grpc_server *server, void *arg)); +void grpc_server_listener_destroy_done(void *server); + /* Setup a transport - creates a channel stack, binds the transport to the server */ grpc_transport_setup_result grpc_server_setup_transport( @@ -60,4 +63,4 @@ grpc_transport_setup_result grpc_server_setup_transport( const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); -#endif /* GRPC_INTERNAL_CORE_SURFACE_SERVER_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_SERVER_H */ diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 27434b39e2..7b5c2f227b 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -33,7 +33,6 @@ #include <grpc/grpc.h> -#include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_server.h" @@ -46,8 +45,8 @@ static grpc_transport_setup_result setup_transport(void *server, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_server_filter}; return grpc_server_setup_transport(server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } @@ -66,7 +65,8 @@ static void new_transport(void *server, grpc_endpoint *tcp) { } /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) { +static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, + size_t pollset_count) { grpc_tcp_server *tcp = tcpp; grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server); } @@ -75,7 +75,7 @@ static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, size callbacks) */ static void destroy(grpc_server *server, void *tcpp) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_destroy(tcp); + grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server); } int grpc_server_add_http2_port(grpc_server *server, const char *addr) { @@ -131,7 +131,7 @@ error: grpc_resolved_addresses_destroy(resolved); } if (tcp) { - grpc_tcp_server_destroy(tcp); + grpc_tcp_server_destroy(tcp, NULL, NULL); } return 0; } |