aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer.c44
-rw-r--r--src/core/surface/byte_buffer_reader.c65
-rw-r--r--src/core/surface/call.c221
-rw-r--r--src/core/surface/call.h3
-rw-r--r--src/core/surface/call_log_batch.c1
-rw-r--r--src/core/surface/channel.c45
-rw-r--r--src/core/surface/channel.h11
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/completion_queue.c31
-rw-r--r--src/core/surface/completion_queue.h15
-rw-r--r--src/core/surface/event_string.c1
-rw-r--r--src/core/surface/init.c1
-rw-r--r--src/core/surface/lame_client.c12
-rw-r--r--src/core/surface/secure_channel_create.c10
-rw-r--r--src/core/surface/server.c131
15 files changed, 445 insertions, 150 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 12244f6644..a930949f2d 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -35,25 +35,45 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
+grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
+ size_t nslices) {
+ return grpc_raw_compressed_byte_buffer_create(slices, nslices,
+ GRPC_COMPRESS_NONE);
+}
+
+grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
+ gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression) {
size_t i;
grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
-
- bb->type = GRPC_BB_SLICE_BUFFER;
- gpr_slice_buffer_init(&bb->data.slice_buffer);
+ bb->type = GRPC_BB_RAW;
+ bb->data.raw.compression = compression;
+ gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
for (i = 0; i < nslices; i++) {
gpr_slice_ref(slices[i]);
- gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]);
+ gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slices[i]);
}
+ return bb;
+}
+
+grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
+ grpc_byte_buffer_reader *reader) {
+ grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
+ gpr_slice slice;
+ bb->type = GRPC_BB_RAW;
+ bb->data.raw.compression = GRPC_COMPRESS_NONE;
+ gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
+ while (grpc_byte_buffer_reader_next(reader, &slice)) {
+ gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slice);
+ }
return bb;
}
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- return grpc_byte_buffer_create(bb->data.slice_buffer.slices,
- bb->data.slice_buffer.count);
+ case GRPC_BB_RAW:
+ return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices,
+ bb->data.raw.slice_buffer.count);
}
gpr_log(GPR_INFO, "should never get here");
abort();
@@ -63,8 +83,8 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
if (!bb) return;
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- gpr_slice_buffer_destroy(&bb->data.slice_buffer);
+ case GRPC_BB_RAW:
+ gpr_slice_buffer_destroy(&bb->data.raw.slice_buffer);
break;
}
free(bb);
@@ -72,8 +92,8 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) {
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- return bb->data.slice_buffer.length;
+ case GRPC_BB_RAW:
+ return bb->data.raw.slice_buffer.length;
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
index fd5289bac3..283db83833 100644
--- a/src/core/surface/byte_buffer_reader.c
+++ b/src/core/surface/byte_buffer_reader.c
@@ -33,42 +33,73 @@
#include <grpc/byte_buffer_reader.h>
+#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/byte_buffer.h>
-grpc_byte_buffer_reader *grpc_byte_buffer_reader_create(
- grpc_byte_buffer *buffer) {
- grpc_byte_buffer_reader *reader = malloc(sizeof(grpc_byte_buffer_reader));
- reader->buffer = buffer;
+#include "src/core/compression/message_compress.h"
+
+static int is_compressed(grpc_byte_buffer *buffer) {
switch (buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
+ case GRPC_BB_RAW:
+ if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) {
+ return 0 /* GPR_FALSE */;
+ }
+ break;
+ }
+ return 1 /* GPR_TRUE */;
+}
+
+void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
+ grpc_byte_buffer *buffer) {
+ gpr_slice_buffer decompressed_slices_buffer;
+ reader->buffer_in = buffer;
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW:
+ gpr_slice_buffer_init(&decompressed_slices_buffer);
+ if (is_compressed(reader->buffer_in)) {
+ grpc_msg_decompress(reader->buffer_in->data.raw.compression,
+ &reader->buffer_in->data.raw.slice_buffer,
+ &decompressed_slices_buffer);
+ reader->buffer_out =
+ grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
+ decompressed_slices_buffer.count);
+ gpr_slice_buffer_destroy(&decompressed_slices_buffer);
+ } else { /* not compressed, use the input buffer as output */
+ reader->buffer_out = reader->buffer_in;
+ }
reader->current.index = 0;
+ break;
+ }
+}
+
+void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW:
+ /* keeping the same if-else structure as in the init function */
+ if (is_compressed(reader->buffer_in)) {
+ grpc_byte_buffer_destroy(reader->buffer_out);
+ }
+ break;
}
- return reader;
}
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice) {
- grpc_byte_buffer *buffer = reader->buffer;
- gpr_slice_buffer *slice_buffer;
- switch (buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- slice_buffer = &buffer->data.slice_buffer;
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW: {
+ gpr_slice_buffer *slice_buffer;
+ slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
if (reader->current.index < slice_buffer->count) {
*slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]);
reader->current.index += 1;
return 1;
- } else {
- return 0;
}
break;
+ }
}
return 0;
}
-
-void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
- free(reader);
-}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index eea02211ae..304af259d6 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -42,6 +42,7 @@
#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <assert.h>
#include <stdio.h>
@@ -98,6 +99,8 @@ typedef enum {
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status came from the server sending status */
+ STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
} status_source;
@@ -147,12 +150,19 @@ struct grpc_call {
gpr_uint8 receiving;
/* are we currently completing requests */
gpr_uint8 completing;
+ /** has grpc_call_destroy been called */
+ gpr_uint8 destroy_called;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
/* are we currently reading a message? */
gpr_uint8 reading_message;
/* have we bound a pollset yet? */
gpr_uint8 bound_pollset;
+ /* is an error status set */
+ gpr_uint8 error_status_set;
+ /** should the alarm be cancelled */
+ gpr_uint8 cancel_alarm;
+
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint16 last_send_contains;
@@ -188,6 +198,7 @@ struct grpc_call {
and a strong upper bound of a count of masters to be calculated. */
gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
+ gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT];
reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
/* Dynamic array of ioreq's that have completed: the count of
@@ -210,6 +221,9 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
+ /* Compression level for the call */
+ grpc_compression_level compression_level;
+
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -231,7 +245,11 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ gpr_uint32 incoming_message_flags;
grpc_iomgr_closure destroy_closure;
+ grpc_iomgr_closure on_done_recv;
+ grpc_iomgr_closure on_done_send;
+ grpc_iomgr_closure on_done_bind;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -250,6 +268,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description);
+static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
@@ -293,16 +312,18 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message);
- /* dropped in destroy */
- gpr_ref_init(&call->internal_refcount, 1);
+ grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
+ grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
+ grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
+ /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
+ gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata.
TODO(ctiller): figure out a cleaner solution */
if (!call->is_client) {
memset(&initial_op, 0, sizeof(initial_op));
initial_op.recv_ops = &call->recv_ops;
initial_op.recv_state = &call->recv_state;
- initial_op.on_done_recv = call_on_done_recv;
- initial_op.recv_user_data = call;
+ initial_op.on_done_recv = &call->on_done_recv;
initial_op.context = call->context;
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
@@ -397,14 +418,22 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
+ if (call->status[source].is_set) return;
+
call->status[source].is_set = 1;
call->status[source].code = status;
+ call->error_status_set = status != GRPC_STATUS_OK;
if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
grpc_bbq_flush(&call->incoming_queue);
}
}
+static void set_decode_compression_level(grpc_call *call,
+ grpc_compression_level clevel) {
+ call->compression_level = clevel;
+}
+
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
@@ -435,7 +464,8 @@ static int need_more_data(grpc_call *call) {
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
- (call->cancel_with_status != GRPC_STATUS_OK);
+ (call->cancel_with_status != GRPC_STATUS_OK) ||
+ call->destroy_called;
}
static void unlock(grpc_call *call) {
@@ -446,6 +476,7 @@ static void unlock(grpc_call *call) {
int i;
const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
size_t buffered_bytes;
+ int cancel_alarm = 0;
memset(&op, 0, sizeof(op));
@@ -453,11 +484,13 @@ static void unlock(grpc_call *call) {
start_op = op.cancel_with_status != GRPC_STATUS_OK;
call->cancel_with_status = GRPC_STATUS_OK; /* reset */
+ cancel_alarm = call->cancel_alarm;
+ call->cancel_alarm = 0;
+
if (!call->receiving && need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
- op.on_done_recv = call_on_done_recv;
- op.recv_user_data = call;
+ op.on_done_recv = &call->on_done_recv;
if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
op.max_recv_bytes = call->incoming_message_length -
call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
@@ -499,6 +532,10 @@ static void unlock(grpc_call *call) {
gpr_mu_unlock(&call->mu);
+ if (cancel_alarm) {
+ grpc_alarm_cancel(&call->alarm);
+ }
+
if (start_op) {
execute_op(call, &op);
}
@@ -590,10 +627,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
+ case GRPC_IOREQ_SEND_STATUS:
+ if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details !=
+ NULL) {
+ grpc_mdstr_unref(
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details);
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
+ NULL;
+ }
+ break;
case GRPC_IOREQ_RECV_CLOSE:
case GRPC_IOREQ_SEND_INITIAL_METADATA:
case GRPC_IOREQ_SEND_TRAILING_METADATA:
- case GRPC_IOREQ_SEND_STATUS:
case GRPC_IOREQ_SEND_CLOSE:
break;
case GRPC_IOREQ_RECV_STATUS:
@@ -676,13 +721,13 @@ static void call_on_done_send(void *pc, int success) {
}
static void finish_message(grpc_call *call) {
- /* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
- call->incoming_message.slices, call->incoming_message.count);
+ if (call->error_status_set == 0) {
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
+ call->incoming_message.slices, call->incoming_message.count);
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+ }
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
-
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
-
GPR_ASSERT(call->incoming_message.count == 0);
call->reading_message = 0;
}
@@ -711,6 +756,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
} else if (msg.length > 0) {
call->reading_message = 1;
call->incoming_message_length = msg.length;
+ call->incoming_message_flags = msg.flags;
return 1;
} else {
finish_message(call);
@@ -782,10 +828,8 @@ static void call_on_done_recv(void *pc, int success) {
if (call->recv_state == GRPC_STREAM_CLOSED) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
- if (call->have_alarm) {
- grpc_alarm_cancel(&call->alarm);
- call->have_alarm = 0;
- }
+ call->cancel_alarm |= call->have_alarm;
+ GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
}
finish_read_ops(call);
} else {
@@ -800,7 +844,7 @@ static void call_on_done_recv(void *pc, int success) {
unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
- GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
+ GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
}
static int prepare_application_metadata(grpc_call *call, size_t count,
@@ -847,9 +891,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
size_t i;
switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ case GRPC_BB_RAW:
+ for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i];
gpr_slice_ref(slice);
grpc_sopb_add_slice(sopb, slice);
}
@@ -859,9 +903,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
+ gpr_uint32 flags;
grpc_metadata_batch mdb;
size_t i;
- char status_str[GPR_LTOA_MIN_BUFSIZE];
GPR_ASSERT(op->send_ops == NULL);
switch (call->write_state) {
@@ -885,8 +929,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE];
grpc_sopb_add_begin_message(
- &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), flags);
copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
@@ -905,13 +950,10 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
- gpr_ltoa(data.send_status.code, status_str);
grpc_metadata_batch_add_tail(
&mdb, &call->status_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context, status_str)));
+ grpc_channel_get_reffed_status_elem(call->channel,
+ data.send_status.code));
if (data.send_status.details) {
grpc_metadata_batch_add_tail(
&mdb, &call->details_link,
@@ -919,8 +961,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
call->metadata_context,
grpc_mdstr_ref(
grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_status.details)));
+ data.send_status.details));
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
+ NULL;
}
grpc_sopb_add_metadata(&call->send_ops, mdb);
}
@@ -930,8 +973,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
break;
}
if (op->send_ops) {
- op->on_done_send = call_on_done_send;
- op->send_user_data = call;
+ op->on_done_send = &call->on_done_send;
}
return op->send_ops != NULL;
}
@@ -965,7 +1007,7 @@ static void finish_read_ops(grpc_call *call) {
switch (call->read_state) {
case READ_STATE_STREAM_CLOSED:
- if (empty) {
+ if (empty && !call->have_alarm) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1);
}
/* fallthrough */
@@ -1020,9 +1062,18 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
GRPC_CALL_ERROR_INVALID_METADATA);
}
}
+ if (op == GRPC_IOREQ_SEND_STATUS) {
+ set_status_code(call, STATUS_FROM_SERVER_STATUS,
+ reqs[i].data.send_status.code);
+ if (reqs[i].data.send_status.details) {
+ set_status_details(call, STATUS_FROM_SERVER_STATUS,
+ grpc_mdstr_ref(reqs[i].data.send_status.details));
+ }
+ }
have_ops |= 1u << op;
call->request_data[op] = data;
+ call->request_flags[op] = reqs[i].flags;
call->request_set[op] = set;
}
@@ -1052,10 +1103,9 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
void grpc_call_destroy(grpc_call *c) {
int cancel;
lock(c);
- if (c->have_alarm) {
- grpc_alarm_cancel(&c->alarm);
- c->have_alarm = 0;
- }
+ GPR_ASSERT(!c->destroy_called);
+ c->destroy_called = 1;
+ c->cancel_alarm |= c->have_alarm;
cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c);
if (cancel) grpc_call_cancel(c);
@@ -1096,14 +1146,31 @@ static void finished_loose_op(void *call, int success_ignored) {
GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
}
+typedef struct {
+ grpc_call *call;
+ grpc_iomgr_closure closure;
+} finished_loose_op_allocated_args;
+
+static void finished_loose_op_allocated(void *alloc, int success) {
+ finished_loose_op_allocated_args *args = alloc;
+ finished_loose_op(args->call, success);
+ gpr_free(args);
+}
+
static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);
if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
GRPC_CALL_INTERNAL_REF(call, "loose-op");
- op->on_consumed = finished_loose_op;
- op->on_consumed_user_data = call;
+ if (op->bind_pollset) {
+ op->on_consumed = &call->on_done_bind;
+ } else {
+ finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
+ args->call = call;
+ grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args);
+ op->on_consumed = &args->closure;
+ }
}
elem = CALL_ELEM_FROM_CALL(call, 0);
@@ -1117,12 +1184,14 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
static void call_alarm(void *arg, int success) {
grpc_call *call = arg;
+ lock(call);
+ call->have_alarm = 0;
if (success) {
- lock(call);
cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
- unlock(call);
}
+ finish_read_ops(call);
+ unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
}
@@ -1160,6 +1229,33 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
+/* just as for status above, we need to offset: metadata userdata can't hold a
+ * zero (null), which in this case is used to signal no compression */
+#define COMPRESS_OFFSET 1
+static void destroy_compression(void *ignored) {}
+
+static gpr_uint32 decode_compression(grpc_mdelem *md) {
+ grpc_compression_level clevel;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ if (user_data) {
+ clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ } else {
+ gpr_uint32 parsed_clevel_bytes;
+ if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
+ GPR_SLICE_LENGTH(md->value->slice),
+ &parsed_clevel_bytes)) {
+ /* the following cast is safe, as a gpr_uint32 should be able to hold all
+ * possible values of the grpc_compression_level enum */
+ clevel = (grpc_compression_level) parsed_clevel_bytes;
+ } else {
+ clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
+ }
+ grpc_mdelem_set_user_data(md, destroy_compression,
+ (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
+ }
+ return clevel;
+}
+
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
@@ -1175,6 +1271,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
+ } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) {
+ set_decode_compression_level(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1239,6 +1337,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
+static int are_write_flags_valid(gpr_uint32 flags) {
+ /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
+ const gpr_uint32 allowed_write_positions =
+ (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
+ const gpr_uint32 invalid_positions = ~allowed_write_positions;
+ return !(flags & invalid_positions);
+}
+
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
@@ -1261,30 +1367,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op = &ops[in];
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
op->data.send_initial_metadata.metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
+ if (!are_write_flags_valid(op->flags)) {
+ return GRPC_CALL_ERROR_INVALID_FLAGS;
+ }
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
+ req->flags = ops->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_CLIENT;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
+ req->flags = op->flags;
req->data.send_metadata.count =
op->data.send_status_from_server.trailing_metadata_count;
req->data.send_metadata.metadata =
@@ -1293,29 +1412,42 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_SEND_STATUS;
req->data.send_status.code = op->data.send_status_from_server.status;
req->data.send_status.details =
- op->data.send_status_from_server.status_details;
+ op->data.send_status_from_server.status_details != NULL
+ ? grpc_mdstr_from_string(
+ call->metadata_context,
+ op->data.send_status_from_server.status_details)
+ : NULL;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_MESSAGE:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
@@ -1333,8 +1465,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 17db8c2cdc..fb3662b50d 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -72,13 +72,14 @@ typedef union {
grpc_byte_buffer *send_message;
struct {
grpc_status_code code;
- const char *details;
+ grpc_mdstr *details;
} send_status;
} grpc_ioreq_data;
typedef struct {
grpc_ioreq_op op;
grpc_ioreq_data data;
+ gpr_uint32 flags; /**< A copy of the write flags from grpc_op */
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c
index 9905401bee..55663298c9 100644
--- a/src/core/surface/call_log_batch.c
+++ b/src/core/surface/call_log_batch.c
@@ -35,6 +35,7 @@
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
int grpc_trace_batch = 0;
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 70485d71da..a3c4dcebc1 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -37,12 +37,20 @@
#include <string.h>
#include "src/core/iomgr/iomgr.h"
+#include "src/core/support/string.h"
#include "src/core/surface/call.h"
#include "src/core/surface/client.h"
#include "src/core/surface/init.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS.
+ * Avoids needing to take a metadata context lock for sending status
+ * if the status code is <= NUM_CACHED_STATUS_ELEMS.
+ * Sized to allow the most commonly used codes to fit in
+ * (OK, Cancelled, Unknown). */
+#define NUM_CACHED_STATUS_ELEMS 3
+
typedef struct registered_call {
grpc_mdelem *path;
grpc_mdelem *authority;
@@ -54,10 +62,14 @@ struct grpc_channel {
gpr_refcount refs;
gpr_uint32 max_message_length;
grpc_mdctx *metadata_context;
+ /** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
+ grpc_mdstr *grpc_compression_level_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
+ /** mdelem for grpc-status: 0 thru grpc-status: 2 */
+ grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS];
gpr_mu registered_call_mu;
registered_call *registered_calls;
@@ -87,7 +99,16 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
+ channel->grpc_compression_level_string =
+ grpc_mdstr_from_string(mdctx, "grpc-compression-level");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
+ for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
+ char buf[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(i, buf);
+ channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings(
+ mdctx, grpc_mdstr_ref(channel->grpc_status_string),
+ grpc_mdstr_from_string(mdctx, buf));
+ }
channel->path_string = grpc_mdstr_from_string(mdctx, ":path");
channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority");
grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context,
@@ -181,8 +202,13 @@ void grpc_channel_internal_ref(grpc_channel *c) {
static void destroy_channel(void *p, int ok) {
grpc_channel *channel = p;
+ size_t i;
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
+ for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
+ grpc_mdelem_unref(channel->grpc_status_elem[i]);
+ }
grpc_mdstr_unref(channel->grpc_status_string);
+ grpc_mdstr_unref(channel->grpc_compression_level_string);
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string);
@@ -200,7 +226,7 @@ static void destroy_channel(void *p, int ok) {
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) {
- gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel,
+ gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel,
channel->refs.count, channel->refs.count - 1, reason);
#else
void grpc_channel_internal_unref(grpc_channel *channel) {
@@ -247,6 +273,23 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string;
}
+grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
+ return channel->grpc_compression_level_string;
+}
+
+
+grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
+ if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
+ return grpc_mdelem_ref(channel->grpc_status_elem[i]);
+ } else {
+ char tmp[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(i, tmp);
+ return grpc_mdelem_from_metadata_strings(
+ channel->metadata_context, grpc_mdstr_ref(channel->grpc_status_string),
+ grpc_mdstr_from_string(channel->metadata_context, tmp));
+ }
+}
+
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
return channel->grpc_message_string;
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index d1f62f2598..3c04676b43 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -40,9 +40,20 @@ grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t count,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
+/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
+
+/** Get a (borrowed) pointer to the channel wide metadata context */
grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
+
+/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of
+ status_code.
+
+ The returned elem is owned by the caller. */
+grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
+ int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index a49b754d9a..d069a04a9a 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -53,6 +53,7 @@
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@@ -137,7 +138,8 @@ static void on_resolved(void *rp, grpc_resolved_addresses *resolved) {
request *r = rp;
/* if we're not still the active request, abort */
- if (!grpc_client_setup_request_should_continue(r->cs_request, "on_resolved")) {
+ if (!grpc_client_setup_request_should_continue(r->cs_request,
+ "on_resolved")) {
if (resolved) {
grpc_resolved_addresses_destroy(resolved);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 6808c976e1..030a8b4e6f 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -73,6 +73,7 @@ struct grpc_completion_queue {
event *queue;
/* Fixed size chained hash table of events for pluck() */
event *buckets[NUM_TAG_BUCKETS];
+ int is_server_cq;
};
grpc_completion_queue *grpc_completion_queue_create(void) {
@@ -86,21 +87,12 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
return cc;
}
-
-
-
-
-
-
-
-
-
-
-
-
#ifdef GRPC_CQ_REF_COUNT_DEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason) {
- gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
+void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
+ const char *file, int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s",
+ cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1,
+ reason);
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc) {
#endif
@@ -113,8 +105,11 @@ static void on_pollset_destroy_done(void *arg) {
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
-void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason) {
- gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
+void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
+ const char *file, int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s",
+ cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1,
+ reason);
#else
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
@@ -333,3 +328,7 @@ void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
+
+void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
+
+int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 2249d0e789..1b9010f462 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -40,10 +40,14 @@
#include <grpc/grpc.h>
#ifdef GRPC_CQ_REF_COUNT_DEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason);
-void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason);
-#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc, reason)
-#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc, reason)
+void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
+ const char *file, int line);
+void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
+ const char *file, int line);
+#define GRPC_CQ_INTERNAL_REF(cc, reason) \
+ grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__)
+#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \
+ grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__)
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc);
void grpc_cq_internal_unref(grpc_completion_queue *cc);
@@ -63,4 +67,7 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
+void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
+int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c
index 448bb1162b..33cd4a43aa 100644
--- a/src/core/surface/event_string.c
+++ b/src/core/surface/event_string.c
@@ -37,6 +37,7 @@
#include "src/core/support/string.h"
#include <grpc/byte_buffer.h>
+#include <grpc/support/string_util.h>
static void addhdr(gpr_strvec *buf, grpc_event *ev) {
char *tmp;
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index ac6871c6f2..ca61a38a35 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -78,6 +78,7 @@ void grpc_shutdown(void) {
grpc_iomgr_shutdown();
census_shutdown();
grpc_timers_global_destroy();
+ grpc_tracer_shutdown();
}
gpr_mu_unlock(&g_init_mu);
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 6c07b01544..85e1ab5554 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -56,7 +56,7 @@ static void lame_start_transport_op(grpc_call_element *elem,
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->send_ops) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send(op->send_user_data, 0);
+ op->on_done_send->cb(op->on_done_send->cb_arg, 0);
}
if (op->recv_ops) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
@@ -75,10 +75,10 @@ static void lame_start_transport_op(grpc_call_element *elem,
mdb.deadline = gpr_inf_future;
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv(op->recv_user_data, 1);
+ op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
}
if (op->on_consumed) {
- op->on_consumed(op->on_consumed_user_data, 0);
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
@@ -118,9 +118,9 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "lame-client",
+ lame_start_transport_op, channel_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "lame-client",
};
grpc_channel *grpc_lame_client_channel_create(void) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 3875eb0614..fae3e4e90a 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -56,6 +56,7 @@
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#include "src/core/tsi/transport_security_interface.h"
@@ -96,7 +97,8 @@ static void on_secure_transport_setup_done(void *rp,
if (status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
done(r, 0);
- } else if (grpc_client_setup_cb_begin(r->cs_request, "on_secure_transport_setup_done")) {
+ } else if (grpc_client_setup_cb_begin(r->cs_request,
+ "on_secure_transport_setup_done")) {
grpc_create_chttp2_transport(
r->setup->setup_callback, r->setup->setup_user_data,
grpc_client_setup_get_channel_args(r->cs_request), secure_endpoint,
@@ -112,7 +114,8 @@ static void on_secure_transport_setup_done(void *rp,
static void on_connect(void *rp, grpc_endpoint *tcp) {
request *r = rp;
- if (!grpc_client_setup_request_should_continue(r->cs_request, "on_connect.secure")) {
+ if (!grpc_client_setup_request_should_continue(r->cs_request,
+ "on_connect.secure")) {
if (tcp) {
grpc_endpoint_shutdown(tcp);
grpc_endpoint_destroy(tcp);
@@ -152,7 +155,8 @@ static void on_resolved(void *rp, grpc_resolved_addresses *resolved) {
request *r = rp;
/* if we're not still the active request, abort */
- if (!grpc_client_setup_request_should_continue(r->cs_request, "on_resolved.secure")) {
+ if (!grpc_client_setup_request_should_continue(r->cs_request,
+ "on_resolved.secure")) {
if (resolved) {
grpc_resolved_addresses_destroy(resolved);
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 4cf9213e66..13ec5bee94 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -48,6 +48,7 @@
#include "src/core/transport/metadata.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
@@ -140,7 +141,15 @@ struct grpc_server {
grpc_pollset **pollsets;
size_t cq_count;
- gpr_mu mu;
+ /* The two following mutexes control access to server-state
+ mu_global controls access to non-call-related state (e.g., channel state)
+ mu_call controls access to call-related state (e.g., the call lists)
+
+ If they are ever required to be nested, you must lock mu_global
+ before mu_call. This is currently used in shutdown processing
+ (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
+ gpr_mu mu_global; /* mutex for server and channel state */
+ gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
requested_call_array requested_calls;
@@ -182,9 +191,9 @@ struct call_data {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
- void (*on_done_recv)(void *user_data, int success);
- void *recv_user_data;
+ grpc_iomgr_closure *on_done_recv;
+ grpc_iomgr_closure server_on_recv;
grpc_iomgr_closure kill_zombie_closure;
call_data **root[CALL_LIST_COUNT];
@@ -199,6 +208,8 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call(grpc_server *server, requested_call *rc);
static void shutdown_channel(channel_data *chand, int send_goaway,
int send_disconnect);
+/* Before calling maybe_finish_shutdown, we must hold mu_global and not
+ hold mu_call */
static void maybe_finish_shutdown(grpc_server *server);
static int call_list_join(call_data **root, call_data *call, call_list list) {
@@ -272,7 +283,8 @@ static void server_delete(grpc_server *server) {
registered_method *rm;
size_t i;
grpc_channel_args_destroy(server->channel_args);
- gpr_mu_destroy(&server->mu);
+ gpr_mu_destroy(&server->mu_global);
+ gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters);
requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
@@ -334,11 +346,11 @@ static void finish_start_new_rpc_and_unlock(grpc_server *server,
if (array->count == 0) {
calld->state = PENDING;
call_list_join(pending_root, calld, PENDING_START);
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
} else {
rc = array->calls[--array->count];
calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, &rc);
}
}
@@ -351,7 +363,7 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_uint32 hash;
channel_registered_method *rm;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_call);
if (chand->registered_methods && calld->path && calld->host) {
/* TODO(ctiller): unify these two searches */
/* check for an exact match with host */
@@ -403,11 +415,16 @@ static void maybe_finish_shutdown(grpc_server *server) {
if (!server->shutdown || server->shutdown_published) {
return;
}
+
+ gpr_mu_lock(&server->mu_call);
if (server->lists[ALL_CALLS] != NULL) {
gpr_log(GPR_DEBUG,
"Waiting for all calls to finish before destroying server");
+ gpr_mu_unlock(&server->mu_call);
return;
}
+ gpr_mu_unlock(&server->mu_call);
+
if (server->root_channel_data.next != &server->root_channel_data) {
gpr_log(GPR_DEBUG,
"Waiting for all channels to close before destroying server");
@@ -451,6 +468,7 @@ static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ int remove_res;
if (success && !calld->got_initial_metadata) {
size_t i;
@@ -475,16 +493,16 @@ static void server_on_recv(void *ptr, int success) {
case GRPC_STREAM_SEND_CLOSED:
break;
case GRPC_STREAM_RECV_CLOSED:
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_call);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
- gpr_mu_unlock(&chand->server->mu);
+ gpr_mu_unlock(&chand->server->mu_call);
break;
case GRPC_STREAM_CLOSED:
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_call);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
@@ -494,16 +512,18 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
-
}
- if (call_list_remove(calld, ALL_CALLS)) {
+ remove_res = call_list_remove(calld, ALL_CALLS);
+ gpr_mu_unlock(&chand->server->mu_call);
+ gpr_mu_lock(&chand->server->mu_global);
+ if (remove_res) {
decrement_call_count(chand);
}
- gpr_mu_unlock(&chand->server->mu);
+ gpr_mu_unlock(&chand->server->mu_global);
break;
}
- calld->on_done_recv(calld->recv_user_data, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
@@ -514,9 +534,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
calld->recv_ops = op->recv_ops;
calld->recv_state = op->recv_state;
calld->on_done_recv = op->on_done_recv;
- calld->recv_user_data = op->recv_user_data;
- op->on_done_recv = server_on_recv;
- op->recv_user_data = elem;
+ op->on_done_recv = &calld->server_on_recv;
}
}
@@ -542,10 +560,10 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the
channel */
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
server_ref(server);
destroy_channel(chand);
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
server_unref(server);
break;
case GRPC_TRANSPORT_GOAWAY:
@@ -590,7 +608,8 @@ static void finish_shutdown_channel(void *p, int success) {
gpr_free(sca);
}
-static void shutdown_channel(channel_data *chand, int send_goaway, int send_disconnect) {
+static void shutdown_channel(channel_data *chand, int send_goaway,
+ int send_disconnect) {
shutdown_channel_args *sca;
GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown");
sca = gpr_malloc(sizeof(shutdown_channel_args));
@@ -611,10 +630,15 @@ static void init_call_elem(grpc_call_element *elem,
calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem);
- gpr_mu_lock(&chand->server->mu);
+ grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
+
+ gpr_mu_lock(&chand->server->mu_call);
call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
+ gpr_mu_unlock(&chand->server->mu_call);
+
+ gpr_mu_lock(&chand->server->mu_global);
chand->num_calls++;
- gpr_mu_unlock(&chand->server->mu);
+ gpr_mu_unlock(&chand->server->mu_global);
server_ref(chand->server);
@@ -627,14 +651,16 @@ static void destroy_call_elem(grpc_call_element *elem) {
int removed[CALL_LIST_COUNT];
size_t i;
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_call);
for (i = 0; i < CALL_LIST_COUNT; i++) {
removed[i] = call_list_remove(elem->call_data, i);
}
+ gpr_mu_unlock(&chand->server->mu_call);
if (removed[ALL_CALLS]) {
+ gpr_mu_lock(&chand->server->mu_global);
decrement_call_count(chand);
+ gpr_mu_unlock(&chand->server->mu_global);
}
- gpr_mu_unlock(&chand->server->mu);
if (calld->host) {
grpc_mdstr_unref(calld->host);
@@ -677,12 +703,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
gpr_free(chand->registered_methods);
}
if (chand->server) {
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_global);
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
maybe_finish_shutdown(chand->server);
- gpr_mu_unlock(&chand->server->mu);
+ gpr_mu_unlock(&chand->server->mu_global);
grpc_mdstr_unref(chand->path_key);
grpc_mdstr_unref(chand->authority_key);
server_unref(chand->server);
@@ -708,6 +734,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
if (server->cqs[i] == cq) return;
}
GRPC_CQ_INTERNAL_REF(cq, "server");
+ grpc_cq_mark_server_cq(cq);
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));
@@ -728,7 +755,8 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
memset(server, 0, sizeof(grpc_server));
- gpr_mu_init(&server->mu);
+ gpr_mu_init(&server->mu_global);
+ gpr_mu_init(&server->mu_call);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
@@ -878,11 +906,11 @@ grpc_transport_setup_result grpc_server_setup_transport(
result = grpc_connected_channel_bind_transport(
grpc_channel_get_channel_stack(channel), transport);
- gpr_mu_lock(&s->mu);
+ gpr_mu_lock(&s->mu_global);
chand->next = &s->root_channel_data;
chand->prev = chand->next->prev;
chand->next->prev = chand->prev->next = chand;
- gpr_mu_unlock(&s->mu);
+ gpr_mu_unlock(&s->mu_global);
gpr_free(filters);
@@ -899,7 +927,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
shutdown_tag *sdt;
/* lock, and gather up some stuff to do */
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
grpc_cq_begin_op(cq, NULL);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
@@ -908,7 +936,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
sdt->tag = tag;
sdt->cq = cq;
if (server->shutdown) {
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
return;
}
@@ -918,6 +946,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* collect all unregistered then registered calls */
+ gpr_mu_lock(&server->mu_call);
requested_calls = server->requested_calls;
memset(&server->requested_calls, 0, sizeof(server->requested_calls));
for (rm = server->registered_methods; rm; rm = rm->next) {
@@ -936,10 +965,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_free(rm->requested.calls);
memset(&rm->requested, 0, sizeof(rm->requested));
}
+ gpr_mu_unlock(&server->mu_call);
server->shutdown = 1;
maybe_finish_shutdown(server);
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
for (i = 0; i < requested_calls.count; i++) {
@@ -955,10 +985,10 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
void grpc_server_listener_destroy_done(void *s) {
grpc_server *server = s;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;
maybe_finish_shutdown(server);
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
}
void grpc_server_cancel_all_calls(grpc_server *server) {
@@ -969,12 +999,12 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
int is_first = 1;
size_t i;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_call);
GPR_ASSERT(server->shutdown);
if (!server->lists[ALL_CALLS]) {
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
return;
}
@@ -994,7 +1024,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
is_first = 0;
}
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
for (i = 0; i < call_count; i++) {
grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
@@ -1008,8 +1038,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
void grpc_server_destroy(grpc_server *server) {
listener *l;
- gpr_mu_lock(&server->mu);
- GPR_ASSERT(server->shutdown);
+ gpr_mu_lock(&server->mu_global);
+ GPR_ASSERT(server->shutdown || !server->listeners);
GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
while (server->listeners) {
@@ -1018,7 +1048,7 @@ void grpc_server_destroy(grpc_server *server) {
gpr_free(l);
}
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
server_unref(server);
}
@@ -1040,9 +1070,9 @@ static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
requested_call_array *requested_calls = NULL;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
fail_call(server, rc);
return GRPC_CALL_OK;
}
@@ -1061,12 +1091,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
if (calld) {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
*requested_call_array_add(requested_calls) = *rc;
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
}
@@ -1080,6 +1110,9 @@ grpc_call_error grpc_server_request_call(
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
+ if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = BATCH_CALL;
rc.tag = tag;
@@ -1098,6 +1131,9 @@ grpc_call_error grpc_server_request_registered_call(
grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
+ if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = REGISTERED_CALL;
rc.tag = tag;
@@ -1152,6 +1188,7 @@ static void begin_call(grpc_server *server, call_data *calld,
rc->data.batch.details->deadline = calld->deadline;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
+ r->flags = 0;
r++;
publish = publish_registered_or_batch;
break;
@@ -1159,10 +1196,12 @@ static void begin_call(grpc_server *server, call_data *calld,
*rc->data.registered.deadline = calld->deadline;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata;
+ r->flags = 0;
r++;
if (rc->data.registered.optional_payload) {
r->op = GRPC_IOREQ_RECV_MESSAGE;
r->data.recv_message = rc->data.registered.optional_payload;
+ r->flags = 0;
r++;
}
publish = publish_registered_or_batch;
@@ -1201,8 +1240,8 @@ const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
int grpc_server_has_open_connections(grpc_server *server) {
int r;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
r = server->root_channel_data.next != &server->root_channel_data;
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
return r;
}