diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/byte_buffer.c | 44 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_reader.c | 65 | ||||
-rw-r--r-- | src/core/surface/call.c | 221 | ||||
-rw-r--r-- | src/core/surface/call.h | 3 | ||||
-rw-r--r-- | src/core/surface/call_log_batch.c | 1 | ||||
-rw-r--r-- | src/core/surface/channel.c | 45 | ||||
-rw-r--r-- | src/core/surface/channel.h | 11 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 4 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 31 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 15 | ||||
-rw-r--r-- | src/core/surface/event_string.c | 1 | ||||
-rw-r--r-- | src/core/surface/init.c | 1 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 12 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 10 | ||||
-rw-r--r-- | src/core/surface/server.c | 131 |
15 files changed, 445 insertions, 150 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c index 12244f6644..a930949f2d 100644 --- a/src/core/surface/byte_buffer.c +++ b/src/core/surface/byte_buffer.c @@ -35,25 +35,45 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) { +grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices, + size_t nslices) { + return grpc_raw_compressed_byte_buffer_create(slices, nslices, + GRPC_COMPRESS_NONE); +} + +grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create( + gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression) { size_t i; grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer)); - - bb->type = GRPC_BB_SLICE_BUFFER; - gpr_slice_buffer_init(&bb->data.slice_buffer); + bb->type = GRPC_BB_RAW; + bb->data.raw.compression = compression; + gpr_slice_buffer_init(&bb->data.raw.slice_buffer); for (i = 0; i < nslices; i++) { gpr_slice_ref(slices[i]); - gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]); + gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slices[i]); } + return bb; +} + +grpc_byte_buffer *grpc_raw_byte_buffer_from_reader( + grpc_byte_buffer_reader *reader) { + grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer)); + gpr_slice slice; + bb->type = GRPC_BB_RAW; + bb->data.raw.compression = GRPC_COMPRESS_NONE; + gpr_slice_buffer_init(&bb->data.raw.slice_buffer); + while (grpc_byte_buffer_reader_next(reader, &slice)) { + gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slice); + } return bb; } grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - return grpc_byte_buffer_create(bb->data.slice_buffer.slices, - bb->data.slice_buffer.count); + case GRPC_BB_RAW: + return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices, + bb->data.raw.slice_buffer.count); } gpr_log(GPR_INFO, "should never get here"); abort(); @@ -63,8 +83,8 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { if (!bb) return; switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - gpr_slice_buffer_destroy(&bb->data.slice_buffer); + case GRPC_BB_RAW: + gpr_slice_buffer_destroy(&bb->data.raw.slice_buffer); break; } free(bb); @@ -72,8 +92,8 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) { switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - return bb->data.slice_buffer.length; + case GRPC_BB_RAW: + return bb->data.raw.slice_buffer.length; } gpr_log(GPR_ERROR, "should never reach here"); abort(); diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c index fd5289bac3..283db83833 100644 --- a/src/core/surface/byte_buffer_reader.c +++ b/src/core/surface/byte_buffer_reader.c @@ -33,42 +33,73 @@ #include <grpc/byte_buffer_reader.h> +#include <grpc/compression.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> #include <grpc/byte_buffer.h> -grpc_byte_buffer_reader *grpc_byte_buffer_reader_create( - grpc_byte_buffer *buffer) { - grpc_byte_buffer_reader *reader = malloc(sizeof(grpc_byte_buffer_reader)); - reader->buffer = buffer; +#include "src/core/compression/message_compress.h" + +static int is_compressed(grpc_byte_buffer *buffer) { switch (buffer->type) { - case GRPC_BB_SLICE_BUFFER: + case GRPC_BB_RAW: + if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) { + return 0 /* GPR_FALSE */; + } + break; + } + return 1 /* GPR_TRUE */; +} + +void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, + grpc_byte_buffer *buffer) { + gpr_slice_buffer decompressed_slices_buffer; + reader->buffer_in = buffer; + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: + gpr_slice_buffer_init(&decompressed_slices_buffer); + if (is_compressed(reader->buffer_in)) { + grpc_msg_decompress(reader->buffer_in->data.raw.compression, + &reader->buffer_in->data.raw.slice_buffer, + &decompressed_slices_buffer); + reader->buffer_out = + grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, + decompressed_slices_buffer.count); + gpr_slice_buffer_destroy(&decompressed_slices_buffer); + } else { /* not compressed, use the input buffer as output */ + reader->buffer_out = reader->buffer_in; + } reader->current.index = 0; + break; + } +} + +void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) { + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: + /* keeping the same if-else structure as in the init function */ + if (is_compressed(reader->buffer_in)) { + grpc_byte_buffer_destroy(reader->buffer_out); + } + break; } - return reader; } int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, gpr_slice *slice) { - grpc_byte_buffer *buffer = reader->buffer; - gpr_slice_buffer *slice_buffer; - switch (buffer->type) { - case GRPC_BB_SLICE_BUFFER: - slice_buffer = &buffer->data.slice_buffer; + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: { + gpr_slice_buffer *slice_buffer; + slice_buffer = &reader->buffer_out->data.raw.slice_buffer; if (reader->current.index < slice_buffer->count) { *slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]); reader->current.index += 1; return 1; - } else { - return 0; } break; + } } return 0; } - -void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) { - free(reader); -} diff --git a/src/core/surface/call.c b/src/core/surface/call.c index eea02211ae..304af259d6 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -42,6 +42,7 @@ #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <assert.h> #include <stdio.h> @@ -98,6 +99,8 @@ typedef enum { /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, + /* Status came from the server sending status */ + STATUS_FROM_SERVER_STATUS, STATUS_SOURCE_COUNT } status_source; @@ -147,12 +150,19 @@ struct grpc_call { gpr_uint8 receiving; /* are we currently completing requests */ gpr_uint8 completing; + /** has grpc_call_destroy been called */ + gpr_uint8 destroy_called; /* pairs with completed_requests */ gpr_uint8 num_completed_requests; /* are we currently reading a message? */ gpr_uint8 reading_message; /* have we bound a pollset yet? */ gpr_uint8 bound_pollset; + /* is an error status set */ + gpr_uint8 error_status_set; + /** should the alarm be cancelled */ + gpr_uint8 cancel_alarm; + /* flags with bits corresponding to write states allowing us to determine what was sent */ gpr_uint16 last_send_contains; @@ -188,6 +198,7 @@ struct grpc_call { and a strong upper bound of a count of masters to be calculated. */ gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; + gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT]; reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; /* Dynamic array of ioreq's that have completed: the count of @@ -210,6 +221,9 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; + /* Compression level for the call */ + grpc_compression_level compression_level; + /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -231,7 +245,11 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; + gpr_uint32 incoming_message_flags; grpc_iomgr_closure destroy_closure; + grpc_iomgr_closure on_done_recv; + grpc_iomgr_closure on_done_send; + grpc_iomgr_closure on_done_bind; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -250,6 +268,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, const char *description); +static void finished_loose_op(void *call, int success); static void lock(grpc_call *call); static void unlock(grpc_call *call); @@ -293,16 +312,18 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, grpc_sopb_init(&call->send_ops); grpc_sopb_init(&call->recv_ops); gpr_slice_buffer_init(&call->incoming_message); - /* dropped in destroy */ - gpr_ref_init(&call->internal_refcount, 1); + grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call); + grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call); + grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call); + /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */ + gpr_ref_init(&call->internal_refcount, 2); /* server hack: start reads immediately so we can get initial metadata. TODO(ctiller): figure out a cleaner solution */ if (!call->is_client) { memset(&initial_op, 0, sizeof(initial_op)); initial_op.recv_ops = &call->recv_ops; initial_op.recv_state = &call->recv_state; - initial_op.on_done_recv = call_on_done_recv; - initial_op.recv_user_data = call; + initial_op.on_done_recv = &call->on_done_recv; initial_op.context = call->context; call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); @@ -397,14 +418,22 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { + if (call->status[source].is_set) return; + call->status[source].is_set = 1; call->status[source].code = status; + call->error_status_set = status != GRPC_STATUS_OK; if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) { grpc_bbq_flush(&call->incoming_queue); } } +static void set_decode_compression_level(grpc_call *call, + grpc_compression_level clevel) { + call->compression_level = clevel; +} + static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { @@ -435,7 +464,8 @@ static int need_more_data(grpc_call *call) { (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK); + (call->cancel_with_status != GRPC_STATUS_OK) || + call->destroy_called; } static void unlock(grpc_call *call) { @@ -446,6 +476,7 @@ static void unlock(grpc_call *call) { int i; const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; size_t buffered_bytes; + int cancel_alarm = 0; memset(&op, 0, sizeof(op)); @@ -453,11 +484,13 @@ static void unlock(grpc_call *call) { start_op = op.cancel_with_status != GRPC_STATUS_OK; call->cancel_with_status = GRPC_STATUS_OK; /* reset */ + cancel_alarm = call->cancel_alarm; + call->cancel_alarm = 0; + if (!call->receiving && need_more_data(call)) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; - op.on_done_recv = call_on_done_recv; - op.recv_user_data = call; + op.on_done_recv = &call->on_done_recv; if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { op.max_recv_bytes = call->incoming_message_length - call->incoming_message.length + MAX_RECV_PEEK_AHEAD; @@ -499,6 +532,10 @@ static void unlock(grpc_call *call) { gpr_mu_unlock(&call->mu); + if (cancel_alarm) { + grpc_alarm_cancel(&call->alarm); + } + if (start_op) { execute_op(call, &op); } @@ -590,10 +627,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, call->write_state = WRITE_STATE_WRITE_CLOSED; } break; + case GRPC_IOREQ_SEND_STATUS: + if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != + NULL) { + grpc_mdstr_unref( + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; + } + break; case GRPC_IOREQ_RECV_CLOSE: case GRPC_IOREQ_SEND_INITIAL_METADATA: case GRPC_IOREQ_SEND_TRAILING_METADATA: - case GRPC_IOREQ_SEND_STATUS: case GRPC_IOREQ_SEND_CLOSE: break; case GRPC_IOREQ_RECV_STATUS: @@ -676,13 +721,13 @@ static void call_on_done_send(void *pc, int success) { } static void finish_message(grpc_call *call) { - /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create( - call->incoming_message.slices, call->incoming_message.count); + if (call->error_status_set == 0) { + /* TODO(ctiller): this could be a lot faster if coded directly */ + grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( + call->incoming_message.slices, call->incoming_message.count); + grpc_bbq_push(&call->incoming_queue, byte_buffer); + } gpr_slice_buffer_reset_and_unref(&call->incoming_message); - - grpc_bbq_push(&call->incoming_queue, byte_buffer); - GPR_ASSERT(call->incoming_message.count == 0); call->reading_message = 0; } @@ -711,6 +756,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { } else if (msg.length > 0) { call->reading_message = 1; call->incoming_message_length = msg.length; + call->incoming_message_flags = msg.flags; return 1; } else { finish_message(call); @@ -782,10 +828,8 @@ static void call_on_done_recv(void *pc, int success) { if (call->recv_state == GRPC_STREAM_CLOSED) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; - if (call->have_alarm) { - grpc_alarm_cancel(&call->alarm); - call->have_alarm = 0; - } + call->cancel_alarm |= call->have_alarm; + GRPC_CALL_INTERNAL_UNREF(call, "closed", 0); } finish_read_ops(call); } else { @@ -800,7 +844,7 @@ static void call_on_done_recv(void *pc, int success) { unlock(call); GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0); - GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0); + GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0); } static int prepare_application_metadata(grpc_call *call, size_t count, @@ -847,9 +891,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, size_t i; switch (byte_buffer->type) { - case GRPC_BB_SLICE_BUFFER: - for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { - gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + case GRPC_BB_RAW: + for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i]; gpr_slice_ref(slice); grpc_sopb_add_slice(sopb, slice); } @@ -859,9 +903,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; + gpr_uint32 flags; grpc_metadata_batch mdb; size_t i; - char status_str[GPR_LTOA_MIN_BUFSIZE]; GPR_ASSERT(op->send_ops == NULL); switch (call->write_state) { @@ -885,8 +929,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { case WRITE_STATE_STARTED: if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE]; grpc_sopb_add_begin_message( - &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + &call->send_ops, grpc_byte_buffer_length(data.send_message), flags); copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; @@ -905,13 +950,10 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - gpr_ltoa(data.send_status.code, status_str); grpc_metadata_batch_add_tail( &mdb, &call->status_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); + grpc_channel_get_reffed_status_elem(call->channel, + data.send_status.code)); if (data.send_status.details) { grpc_metadata_batch_add_tail( &mdb, &call->details_link, @@ -919,8 +961,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { call->metadata_context, grpc_mdstr_ref( grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + data.send_status.details)); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; } grpc_sopb_add_metadata(&call->send_ops, mdb); } @@ -930,8 +973,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { break; } if (op->send_ops) { - op->on_done_send = call_on_done_send; - op->send_user_data = call; + op->on_done_send = &call->on_done_send; } return op->send_ops != NULL; } @@ -965,7 +1007,7 @@ static void finish_read_ops(grpc_call *call) { switch (call->read_state) { case READ_STATE_STREAM_CLOSED: - if (empty) { + if (empty && !call->have_alarm) { finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1); } /* fallthrough */ @@ -1020,9 +1062,18 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, GRPC_CALL_ERROR_INVALID_METADATA); } } + if (op == GRPC_IOREQ_SEND_STATUS) { + set_status_code(call, STATUS_FROM_SERVER_STATUS, + reqs[i].data.send_status.code); + if (reqs[i].data.send_status.details) { + set_status_details(call, STATUS_FROM_SERVER_STATUS, + grpc_mdstr_ref(reqs[i].data.send_status.details)); + } + } have_ops |= 1u << op; call->request_data[op] = data; + call->request_flags[op] = reqs[i].flags; call->request_set[op] = set; } @@ -1052,10 +1103,9 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( void grpc_call_destroy(grpc_call *c) { int cancel; lock(c); - if (c->have_alarm) { - grpc_alarm_cancel(&c->alarm); - c->have_alarm = 0; - } + GPR_ASSERT(!c->destroy_called); + c->destroy_called = 1; + c->cancel_alarm |= c->have_alarm; cancel = c->read_state != READ_STATE_STREAM_CLOSED; unlock(c); if (cancel) grpc_call_cancel(c); @@ -1096,14 +1146,31 @@ static void finished_loose_op(void *call, int success_ignored) { GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0); } +typedef struct { + grpc_call *call; + grpc_iomgr_closure closure; +} finished_loose_op_allocated_args; + +static void finished_loose_op_allocated(void *alloc, int success) { + finished_loose_op_allocated_args *args = alloc; + finished_loose_op(args->call, success); + gpr_free(args); +} + static void execute_op(grpc_call *call, grpc_transport_op *op) { grpc_call_element *elem; GPR_ASSERT(op->on_consumed == NULL); if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) { GRPC_CALL_INTERNAL_REF(call, "loose-op"); - op->on_consumed = finished_loose_op; - op->on_consumed_user_data = call; + if (op->bind_pollset) { + op->on_consumed = &call->on_done_bind; + } else { + finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); + args->call = call; + grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args); + op->on_consumed = &args->closure; + } } elem = CALL_ELEM_FROM_CALL(call, 0); @@ -1117,12 +1184,14 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { static void call_alarm(void *arg, int success) { grpc_call *call = arg; + lock(call); + call->have_alarm = 0; if (success) { - lock(call); cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED, "Deadline Exceeded"); - unlock(call); } + finish_read_ops(call); + unlock(call); GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1); } @@ -1160,6 +1229,33 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } +/* just as for status above, we need to offset: metadata userdata can't hold a + * zero (null), which in this case is used to signal no compression */ +#define COMPRESS_OFFSET 1 +static void destroy_compression(void *ignored) {} + +static gpr_uint32 decode_compression(grpc_mdelem *md) { + grpc_compression_level clevel; + void *user_data = grpc_mdelem_get_user_data(md, destroy_status); + if (user_data) { + clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; + } else { + gpr_uint32 parsed_clevel_bytes; + if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), + GPR_SLICE_LENGTH(md->value->slice), + &parsed_clevel_bytes)) { + /* the following cast is safe, as a gpr_uint32 should be able to hold all + * possible values of the grpc_compression_level enum */ + clevel = (grpc_compression_level) parsed_clevel_bytes; + } else { + clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + } + grpc_mdelem_set_user_data(md, destroy_compression, + (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); + } + return clevel; +} + static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_linked_mdelem *l; grpc_metadata_array *dest; @@ -1175,6 +1271,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { + set_decode_compression_level(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { @@ -1239,6 +1337,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } +static int are_write_flags_valid(gpr_uint32 flags) { + /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ + const gpr_uint32 allowed_write_positions = + (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); + const gpr_uint32 invalid_positions = ~allowed_write_positions; + return !(flags & invalid_positions); +} + grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag) { grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; @@ -1261,30 +1367,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op = &ops[in]; switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; req->data.send_metadata.count = op->data.send_initial_metadata.count; req->data.send_metadata.metadata = op->data.send_initial_metadata.metadata; + req->flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: + if (!are_write_flags_valid(op->flags)) { + return GRPC_CALL_ERROR_INVALID_FLAGS; + } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; + req->flags = ops->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; + req->flags = op->flags; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (call->is_client) { return GRPC_CALL_ERROR_NOT_ON_CLIENT; } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; + req->flags = op->flags; req->data.send_metadata.count = op->data.send_status_from_server.trailing_metadata_count; req->data.send_metadata.metadata = @@ -1293,29 +1412,42 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_SEND_STATUS; req->data.send_status.code = op->data.send_status_from_server.status; req->data.send_status.details = - op->data.send_status_from_server.status_details; + op->data.send_status_from_server.status_details != NULL + ? grpc_mdstr_from_string( + call->metadata_context, + op->data.send_status_from_server.status_details) + : NULL; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; break; case GRPC_OP_RECV_INITIAL_METADATA: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; + req->flags = op->flags; break; case GRPC_OP_RECV_MESSAGE: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_MESSAGE; req->data.recv_message = op->data.recv_message; + req->flags = op->flags; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_STATUS; + req->flags = op->flags; req->data.recv_status.set_value = set_status_value_directly; req->data.recv_status.user_data = op->data.recv_status_on_client.status; req = &reqs[out++]; @@ -1333,8 +1465,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_STATUS; + req->flags = op->flags; req->data.recv_status.set_value = set_cancelled_value; req->data.recv_status.user_data = op->data.recv_close_on_server.cancelled; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 17db8c2cdc..fb3662b50d 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -72,13 +72,14 @@ typedef union { grpc_byte_buffer *send_message; struct { grpc_status_code code; - const char *details; + grpc_mdstr *details; } send_status; } grpc_ioreq_data; typedef struct { grpc_ioreq_op op; grpc_ioreq_data data; + gpr_uint32 flags; /**< A copy of the write flags from grpc_op */ } grpc_ioreq; typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c index 9905401bee..55663298c9 100644 --- a/src/core/surface/call_log_batch.c +++ b/src/core/surface/call_log_batch.c @@ -35,6 +35,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> int grpc_trace_batch = 0; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 70485d71da..a3c4dcebc1 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -37,12 +37,20 @@ #include <string.h> #include "src/core/iomgr/iomgr.h" +#include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/client.h" #include "src/core/surface/init.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. + * Avoids needing to take a metadata context lock for sending status + * if the status code is <= NUM_CACHED_STATUS_ELEMS. + * Sized to allow the most commonly used codes to fit in + * (OK, Cancelled, Unknown). */ +#define NUM_CACHED_STATUS_ELEMS 3 + typedef struct registered_call { grpc_mdelem *path; grpc_mdelem *authority; @@ -54,10 +62,14 @@ struct grpc_channel { gpr_refcount refs; gpr_uint32 max_message_length; grpc_mdctx *metadata_context; + /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; + grpc_mdstr *grpc_compression_level_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; + /** mdelem for grpc-status: 0 thru grpc-status: 2 */ + grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS]; gpr_mu registered_call_mu; registered_call *registered_calls; @@ -87,7 +99,16 @@ grpc_channel *grpc_channel_create_from_filters( gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); + channel->grpc_compression_level_string = + grpc_mdstr_from_string(mdctx, "grpc-compression-level"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); + for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { + char buf[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(i, buf); + channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_ref(channel->grpc_status_string), + grpc_mdstr_from_string(mdctx, buf)); + } channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, @@ -181,8 +202,13 @@ void grpc_channel_internal_ref(grpc_channel *c) { static void destroy_channel(void *p, int ok) { grpc_channel *channel = p; + size_t i; grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); + for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { + grpc_mdelem_unref(channel->grpc_status_elem[i]); + } grpc_mdstr_unref(channel->grpc_status_string); + grpc_mdstr_unref(channel->grpc_compression_level_string); grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); @@ -200,7 +226,7 @@ static void destroy_channel(void *p, int ok) { #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) { - gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel, + gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel, channel->refs.count, channel->refs.count - 1, reason); #else void grpc_channel_internal_unref(grpc_channel *channel) { @@ -247,6 +273,23 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } +grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { + return channel->grpc_compression_level_string; +} + + +grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { + if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { + return grpc_mdelem_ref(channel->grpc_status_elem[i]); + } else { + char tmp[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(i, tmp); + return grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->grpc_status_string), + grpc_mdstr_from_string(channel->metadata_context, tmp)); + } +} + grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { return channel->grpc_message_string; } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index d1f62f2598..3c04676b43 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -40,9 +40,20 @@ grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t count, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); +/** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); + +/** Get a (borrowed) pointer to the channel wide metadata context */ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); + +/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of + status_code. + + The returned elem is owned by the caller. */ +grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, + int status_code); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); +grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index a49b754d9a..d069a04a9a 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -53,6 +53,7 @@ #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> @@ -137,7 +138,8 @@ static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { request *r = rp; /* if we're not still the active request, abort */ - if (!grpc_client_setup_request_should_continue(r->cs_request, "on_resolved")) { + if (!grpc_client_setup_request_should_continue(r->cs_request, + "on_resolved")) { if (resolved) { grpc_resolved_addresses_destroy(resolved); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 6808c976e1..030a8b4e6f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -73,6 +73,7 @@ struct grpc_completion_queue { event *queue; /* Fixed size chained hash table of events for pluck() */ event *buckets[NUM_TAG_BUCKETS]; + int is_server_cq; }; grpc_completion_queue *grpc_completion_queue_create(void) { @@ -86,21 +87,12 @@ grpc_completion_queue *grpc_completion_queue_create(void) { return cc; } - - - - - - - - - - - - #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason) { - gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); +void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", + cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, + reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { #endif @@ -113,8 +105,11 @@ static void on_pollset_destroy_done(void *arg) { } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason) { - gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", + cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, + reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif @@ -333,3 +328,7 @@ void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_time_add(gpr_now(), gpr_time_from_millis(100))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } + +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } + +int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 2249d0e789..1b9010f462 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -40,10 +40,14 @@ #include <grpc/grpc.h> #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason); -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason); -#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc, reason) -#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc, reason) +void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, + const char *file, int line); +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, + const char *file, int line); +#define GRPC_CQ_INTERNAL_REF(cc, reason) \ + grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__) +#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \ + grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__) #else void grpc_cq_internal_ref(grpc_completion_queue *cc); void grpc_cq_internal_unref(grpc_completion_queue *cc); @@ -63,4 +67,7 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc); +void grpc_cq_mark_server_cq(grpc_completion_queue *cc); +int grpc_cq_is_server_cq(grpc_completion_queue *cc); + #endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 448bb1162b..33cd4a43aa 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -37,6 +37,7 @@ #include "src/core/support/string.h" #include <grpc/byte_buffer.h> +#include <grpc/support/string_util.h> static void addhdr(gpr_strvec *buf, grpc_event *ev) { char *tmp; diff --git a/src/core/surface/init.c b/src/core/surface/init.c index ac6871c6f2..ca61a38a35 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -78,6 +78,7 @@ void grpc_shutdown(void) { grpc_iomgr_shutdown(); census_shutdown(); grpc_timers_global_destroy(); + grpc_tracer_shutdown(); } gpr_mu_unlock(&g_init_mu); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 6c07b01544..85e1ab5554 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -56,7 +56,7 @@ static void lame_start_transport_op(grpc_call_element *elem, GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->send_ops) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send(op->send_user_data, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0); } if (op->recv_ops) { char tmp[GPR_LTOA_MIN_BUFSIZE]; @@ -75,10 +75,10 @@ static void lame_start_transport_op(grpc_call_element *elem, mdb.deadline = gpr_inf_future; grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv(op->recv_user_data, 1); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); } if (op->on_consumed) { - op->on_consumed(op->on_consumed_user_data, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } @@ -118,9 +118,9 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "lame-client", + lame_start_transport_op, channel_op, sizeof(call_data), + init_call_elem, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, "lame-client", }; grpc_channel *grpc_lame_client_channel_create(void) { diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 3875eb0614..fae3e4e90a 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -56,6 +56,7 @@ #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> #include "src/core/tsi/transport_security_interface.h" @@ -96,7 +97,8 @@ static void on_secure_transport_setup_done(void *rp, if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); done(r, 0); - } else if (grpc_client_setup_cb_begin(r->cs_request, "on_secure_transport_setup_done")) { + } else if (grpc_client_setup_cb_begin(r->cs_request, + "on_secure_transport_setup_done")) { grpc_create_chttp2_transport( r->setup->setup_callback, r->setup->setup_user_data, grpc_client_setup_get_channel_args(r->cs_request), secure_endpoint, @@ -112,7 +114,8 @@ static void on_secure_transport_setup_done(void *rp, static void on_connect(void *rp, grpc_endpoint *tcp) { request *r = rp; - if (!grpc_client_setup_request_should_continue(r->cs_request, "on_connect.secure")) { + if (!grpc_client_setup_request_should_continue(r->cs_request, + "on_connect.secure")) { if (tcp) { grpc_endpoint_shutdown(tcp); grpc_endpoint_destroy(tcp); @@ -152,7 +155,8 @@ static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { request *r = rp; /* if we're not still the active request, abort */ - if (!grpc_client_setup_request_should_continue(r->cs_request, "on_resolved.secure")) { + if (!grpc_client_setup_request_should_continue(r->cs_request, + "on_resolved.secure")) { if (resolved) { grpc_resolved_addresses_destroy(resolved); } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 4cf9213e66..13ec5bee94 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -48,6 +48,7 @@ #include "src/core/transport/metadata.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; @@ -140,7 +141,15 @@ struct grpc_server { grpc_pollset **pollsets; size_t cq_count; - gpr_mu mu; + /* The two following mutexes control access to server-state + mu_global controls access to non-call-related state (e.g., channel state) + mu_call controls access to call-related state (e.g., the call lists) + + If they are ever required to be nested, you must lock mu_global + before mu_call. This is currently used in shutdown processing + (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ + gpr_mu mu_global; /* mutex for server and channel state */ + gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; requested_call_array requested_calls; @@ -182,9 +191,9 @@ struct call_data { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; + grpc_iomgr_closure *on_done_recv; + grpc_iomgr_closure server_on_recv; grpc_iomgr_closure kill_zombie_closure; call_data **root[CALL_LIST_COUNT]; @@ -199,6 +208,8 @@ static void begin_call(grpc_server *server, call_data *calld, static void fail_call(grpc_server *server, requested_call *rc); static void shutdown_channel(channel_data *chand, int send_goaway, int send_disconnect); +/* Before calling maybe_finish_shutdown, we must hold mu_global and not + hold mu_call */ static void maybe_finish_shutdown(grpc_server *server); static int call_list_join(call_data **root, call_data *call, call_list list) { @@ -272,7 +283,8 @@ static void server_delete(grpc_server *server) { registered_method *rm; size_t i; grpc_channel_args_destroy(server->channel_args); - gpr_mu_destroy(&server->mu); + gpr_mu_destroy(&server->mu_global); + gpr_mu_destroy(&server->mu_call); gpr_free(server->channel_filters); requested_call_array_destroy(&server->requested_calls); while ((rm = server->registered_methods) != NULL) { @@ -334,11 +346,11 @@ static void finish_start_new_rpc_and_unlock(grpc_server *server, if (array->count == 0) { calld->state = PENDING; call_list_join(pending_root, calld, PENDING_START); - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); } else { rc = array->calls[--array->count]; calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); begin_call(server, calld, &rc); } } @@ -351,7 +363,7 @@ static void start_new_rpc(grpc_call_element *elem) { gpr_uint32 hash; channel_registered_method *rm; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_call); if (chand->registered_methods && calld->path && calld->host) { /* TODO(ctiller): unify these two searches */ /* check for an exact match with host */ @@ -403,11 +415,16 @@ static void maybe_finish_shutdown(grpc_server *server) { if (!server->shutdown || server->shutdown_published) { return; } + + gpr_mu_lock(&server->mu_call); if (server->lists[ALL_CALLS] != NULL) { gpr_log(GPR_DEBUG, "Waiting for all calls to finish before destroying server"); + gpr_mu_unlock(&server->mu_call); return; } + gpr_mu_unlock(&server->mu_call); + if (server->root_channel_data.next != &server->root_channel_data) { gpr_log(GPR_DEBUG, "Waiting for all channels to close before destroying server"); @@ -451,6 +468,7 @@ static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; + int remove_res; if (success && !calld->got_initial_metadata) { size_t i; @@ -475,16 +493,16 @@ static void server_on_recv(void *ptr, int success) { case GRPC_STREAM_SEND_CLOSED: break; case GRPC_STREAM_RECV_CLOSED: - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_call); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_call); break; case GRPC_STREAM_CLOSED: - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_call); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); @@ -494,16 +512,18 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); - } - if (call_list_remove(calld, ALL_CALLS)) { + remove_res = call_list_remove(calld, ALL_CALLS); + gpr_mu_unlock(&chand->server->mu_call); + gpr_mu_lock(&chand->server->mu_global); + if (remove_res) { decrement_call_count(chand); } - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_global); break; } - calld->on_done_recv(calld->recv_user_data, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { @@ -514,9 +534,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { calld->recv_ops = op->recv_ops; calld->recv_state = op->recv_state; calld->on_done_recv = op->on_done_recv; - calld->recv_user_data = op->recv_user_data; - op->on_done_recv = server_on_recv; - op->recv_user_data = elem; + op->on_done_recv = &calld->server_on_recv; } } @@ -542,10 +560,10 @@ static void channel_op(grpc_channel_element *elem, case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the channel */ - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); server_ref(server); destroy_channel(chand); - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); server_unref(server); break; case GRPC_TRANSPORT_GOAWAY: @@ -590,7 +608,8 @@ static void finish_shutdown_channel(void *p, int success) { gpr_free(sca); } -static void shutdown_channel(channel_data *chand, int send_goaway, int send_disconnect) { +static void shutdown_channel(channel_data *chand, int send_goaway, + int send_disconnect) { shutdown_channel_args *sca; GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown"); sca = gpr_malloc(sizeof(shutdown_channel_args)); @@ -611,10 +630,15 @@ static void init_call_elem(grpc_call_element *elem, calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); - gpr_mu_lock(&chand->server->mu); + grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); + + gpr_mu_lock(&chand->server->mu_call); call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); + gpr_mu_unlock(&chand->server->mu_call); + + gpr_mu_lock(&chand->server->mu_global); chand->num_calls++; - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_global); server_ref(chand->server); @@ -627,14 +651,16 @@ static void destroy_call_elem(grpc_call_element *elem) { int removed[CALL_LIST_COUNT]; size_t i; - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_call); for (i = 0; i < CALL_LIST_COUNT; i++) { removed[i] = call_list_remove(elem->call_data, i); } + gpr_mu_unlock(&chand->server->mu_call); if (removed[ALL_CALLS]) { + gpr_mu_lock(&chand->server->mu_global); decrement_call_count(chand); + gpr_mu_unlock(&chand->server->mu_global); } - gpr_mu_unlock(&chand->server->mu); if (calld->host) { grpc_mdstr_unref(calld->host); @@ -677,12 +703,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) { gpr_free(chand->registered_methods); } if (chand->server) { - gpr_mu_lock(&chand->server->mu); + gpr_mu_lock(&chand->server->mu_global); chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; maybe_finish_shutdown(chand->server); - gpr_mu_unlock(&chand->server->mu); + gpr_mu_unlock(&chand->server->mu_global); grpc_mdstr_unref(chand->path_key); grpc_mdstr_unref(chand->authority_key); server_unref(chand->server); @@ -708,6 +734,7 @@ void grpc_server_register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); @@ -728,7 +755,8 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, memset(server, 0, sizeof(grpc_server)); - gpr_mu_init(&server->mu); + gpr_mu_init(&server->mu_global); + gpr_mu_init(&server->mu_call); /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); @@ -878,11 +906,11 @@ grpc_transport_setup_result grpc_server_setup_transport( result = grpc_connected_channel_bind_transport( grpc_channel_get_channel_stack(channel), transport); - gpr_mu_lock(&s->mu); + gpr_mu_lock(&s->mu_global); chand->next = &s->root_channel_data; chand->prev = chand->next->prev; chand->next->prev = chand->prev->next = chand; - gpr_mu_unlock(&s->mu); + gpr_mu_unlock(&s->mu_global); gpr_free(filters); @@ -899,7 +927,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, shutdown_tag *sdt; /* lock, and gather up some stuff to do */ - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq, NULL); server->shutdown_tags = gpr_realloc(server->shutdown_tags, @@ -908,7 +936,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, sdt->tag = tag; sdt->cq = cq; if (server->shutdown) { - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); return; } @@ -918,6 +946,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, } /* collect all unregistered then registered calls */ + gpr_mu_lock(&server->mu_call); requested_calls = server->requested_calls; memset(&server->requested_calls, 0, sizeof(server->requested_calls)); for (rm = server->registered_methods; rm; rm = rm->next) { @@ -936,10 +965,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_free(rm->requested.calls); memset(&rm->requested, 0, sizeof(rm->requested)); } + gpr_mu_unlock(&server->mu_call); server->shutdown = 1; maybe_finish_shutdown(server); - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ for (i = 0; i < requested_calls.count; i++) { @@ -955,10 +985,10 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void grpc_server_listener_destroy_done(void *s) { grpc_server *server = s; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; maybe_finish_shutdown(server); - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); } void grpc_server_cancel_all_calls(grpc_server *server) { @@ -969,12 +999,12 @@ void grpc_server_cancel_all_calls(grpc_server *server) { int is_first = 1; size_t i; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_call); GPR_ASSERT(server->shutdown); if (!server->lists[ALL_CALLS]) { - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); return; } @@ -994,7 +1024,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) { is_first = 0; } - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); for (i = 0; i < call_count; i++) { grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, @@ -1008,8 +1038,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) { void grpc_server_destroy(grpc_server *server) { listener *l; - gpr_mu_lock(&server->mu); - GPR_ASSERT(server->shutdown); + gpr_mu_lock(&server->mu_global); + GPR_ASSERT(server->shutdown || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { @@ -1018,7 +1048,7 @@ void grpc_server_destroy(grpc_server *server) { gpr_free(l); } - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); server_unref(server); } @@ -1040,9 +1070,9 @@ static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; requested_call_array *requested_calls = NULL; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_call); if (server->shutdown) { - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); fail_call(server, rc); return GRPC_CALL_OK; } @@ -1061,12 +1091,12 @@ static grpc_call_error queue_call_request(grpc_server *server, if (calld) { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); begin_call(server, calld, rc); return GRPC_CALL_OK; } else { *requested_call_array_add(requested_calls) = *rc; - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } } @@ -1080,6 +1110,9 @@ grpc_call_error grpc_server_request_call( GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); + if (!grpc_cq_is_server_cq(cq_for_notification)) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } grpc_cq_begin_op(cq_for_notification, NULL); rc.type = BATCH_CALL; rc.tag = tag; @@ -1098,6 +1131,9 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; registered_method *registered_method = rm; + if (!grpc_cq_is_server_cq(cq_for_notification)) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } grpc_cq_begin_op(cq_for_notification, NULL); rc.type = REGISTERED_CALL; rc.tag = tag; @@ -1152,6 +1188,7 @@ static void begin_call(grpc_server *server, call_data *calld, rc->data.batch.details->deadline = calld->deadline; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; + r->flags = 0; r++; publish = publish_registered_or_batch; break; @@ -1159,10 +1196,12 @@ static void begin_call(grpc_server *server, call_data *calld, *rc->data.registered.deadline = calld->deadline; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; + r->flags = 0; r++; if (rc->data.registered.optional_payload) { r->op = GRPC_IOREQ_RECV_MESSAGE; r->data.recv_message = rc->data.registered.optional_payload; + r->flags = 0; r++; } publish = publish_registered_or_batch; @@ -1201,8 +1240,8 @@ const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { int grpc_server_has_open_connections(grpc_server *server) { int r; - gpr_mu_lock(&server->mu); + gpr_mu_lock(&server->mu_global); r = server->root_channel_data.next != &server->root_channel_data; - gpr_mu_unlock(&server->mu); + gpr_mu_unlock(&server->mu_global); return r; } |