aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer_queue.c8
-rw-r--r--src/core/surface/byte_buffer_queue.h2
-rw-r--r--src/core/surface/call.c103
-rw-r--r--src/core/surface/call.h2
-rw-r--r--src/core/surface/call_log_batch.c4
-rw-r--r--src/core/surface/channel.c35
-rw-r--r--src/core/surface/completion_queue.c230
-rw-r--r--src/core/surface/completion_queue.h18
-rw-r--r--src/core/surface/init.c2
-rw-r--r--src/core/surface/secure_channel_create.c1
-rw-r--r--src/core/surface/server.c397
-rw-r--r--src/core/surface/version.c41
12 files changed, 413 insertions, 430 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index 7c31bfe5da..e47dc4f4ce 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
}
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+ q->bytes += grpc_byte_buffer_length(buffer);
bba_push(&q->filling, buffer);
}
@@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) {
}
}
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; }
+
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
grpc_bbq_array temp_array;
+ grpc_byte_buffer *out;
if (q->drain_pos == q->draining.count) {
if (q->filling.count == 0) {
@@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
q->draining = temp_array;
}
- return q->draining.data[q->drain_pos++];
+ out = q->draining.data[q->drain_pos++];
+ q->bytes -= grpc_byte_buffer_length(out);
+ return out;
}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index 32c57f8756..f01958984f 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -49,6 +49,7 @@ typedef struct {
size_t drain_pos;
grpc_bbq_array filling;
grpc_bbq_array draining;
+ size_t bytes;
} grpc_byte_buffer_queue;
void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
@@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
void grpc_bbq_flush(grpc_byte_buffer_queue *q);
int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q);
#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index ae1b215767..71f4235571 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -49,6 +49,17 @@
#include <stdlib.h>
#include <string.h>
+/** The maximum number of completions possible.
+ Based upon the maximum number of individually queueable ops in the batch
+ api:
+ - initial metadata send
+ - message send
+ - status/close send (depending on client/server)
+ - initial metadata recv
+ - message recv
+ - status/close recv (depending on client/server) */
+#define MAX_CONCURRENT_COMPLETIONS 6
+
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
typedef enum {
@@ -76,14 +87,14 @@ typedef struct {
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
non-OK */
- int success;
- /* Completion function to call at the end of the operation */
- grpc_ioreq_completion_func on_complete;
- void *user_data;
+ gpr_uint8 success;
/* a bit mask of which request ops are needed (1u << opid) */
gpr_uint16 need_mask;
/* a bit mask of which request ops are now completed */
gpr_uint16 complete_mask;
+ /* Completion function to call at the end of the operation */
+ grpc_ioreq_completion_func on_complete;
+ void *user_data;
} reqinfo_master;
/* Status data for a request can come from several sources; this
@@ -135,6 +146,7 @@ struct grpc_call {
grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
+ gpr_mu completion_mu;
/* how far through the stream have we read? */
read_state read_state;
@@ -162,6 +174,8 @@ struct grpc_call {
gpr_uint8 error_status_set;
/** should the alarm be cancelled */
gpr_uint8 cancel_alarm;
+ /** bitmask of allocated completion events in completions */
+ gpr_uint8 allocated_completions;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
@@ -250,6 +264,9 @@ struct grpc_call {
grpc_iomgr_closure on_done_recv;
grpc_iomgr_closure on_done_send;
grpc_iomgr_closure on_done_bind;
+
+ /** completion events - for completion queue use */
+ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -286,6 +303,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
memset(call, 0, sizeof(grpc_call));
gpr_mu_init(&call->mu);
+ gpr_mu_init(&call->completion_mu);
call->channel = channel;
call->cq = cq;
if (cq) {
@@ -298,8 +316,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
if (call->is_client) {
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
- call->context[GRPC_CONTEXT_TRACING].value = grpc_census_context_create();
- call->context[GRPC_CONTEXT_TRACING].destroy = grpc_census_context_destroy;
}
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
for (i = 0; i < add_initial_metadata_count; i++) {
@@ -351,6 +367,29 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq;
}
+static grpc_cq_completion *allocate_completion(grpc_call *call) {
+ gpr_uint8 i;
+ gpr_mu_lock(&call->completion_mu);
+ for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
+ if (call->allocated_completions & (1u << i)) {
+ continue;
+ }
+ call->allocated_completions |= 1u << i;
+ gpr_mu_unlock(&call->completion_mu);
+ return &call->completions[i];
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
+static void done_completion(void *call, grpc_cq_completion *completion) {
+ grpc_call *c = call;
+ gpr_mu_lock(&c->completion_mu);
+ c->allocated_completions &= ~(1u << (completion - c->completions));
+ gpr_mu_unlock(&c->completion_mu);
+ GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
+}
+
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_ref(grpc_call *c, const char *reason) {
gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
@@ -367,20 +406,21 @@ static void destroy_call(void *call, int ignored_success) {
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
gpr_mu_destroy(&c->mu);
+ gpr_mu_destroy(&c->completion_mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (c->status[i].details) {
- grpc_mdstr_unref(c->status[i].details);
+ GRPC_MDSTR_UNREF(c->status[i].details);
}
}
for (i = 0; i < c->owned_metadata_count; i++) {
- grpc_mdelem_unref(c->owned_metadata[i]);
+ GRPC_MDELEM_UNREF(c->owned_metadata[i]);
}
gpr_free(c->owned_metadata);
for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
gpr_free(c->buffered_metadata[i].metadata);
}
for (i = 0; i < c->send_initial_metadata_count; i++) {
- grpc_mdelem_unref(c->send_initial_metadata[i].md);
+ GRPC_MDELEM_UNREF(c->send_initial_metadata[i].md);
}
for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
if (c->context[i].destroy) {
@@ -437,7 +477,7 @@ static void set_decode_compression_level(grpc_call *call,
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
- grpc_mdstr_unref(call->status[source].details);
+ GRPC_MDSTR_UNREF(call->status[source].details);
}
call->status[source].details = status;
}
@@ -473,6 +513,8 @@ static void unlock(grpc_call *call) {
int completing_requests = 0;
int start_op = 0;
int i;
+ const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
+ size_t buffered_bytes;
int cancel_alarm = 0;
memset(&op, 0, sizeof(op));
@@ -488,6 +530,17 @@ static void unlock(grpc_call *call) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
op.on_done_recv = &call->on_done_recv;
+ if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
+ op.max_recv_bytes = call->incoming_message_length -
+ call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
+ } else {
+ buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
+ if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
+ op.max_recv_bytes = 0;
+ } else {
+ op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
+ }
+ }
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;
@@ -616,7 +669,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
case GRPC_IOREQ_SEND_STATUS:
if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details !=
NULL) {
- grpc_mdstr_unref(
+ GRPC_MDSTR_UNREF(
call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details);
call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
NULL;
@@ -945,7 +998,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
&mdb, &call->details_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
- grpc_mdstr_ref(
+ GRPC_MDSTR_REF(
grpc_channel_get_message_string(call->channel)),
data.send_status.details));
call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
@@ -1053,7 +1106,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
reqs[i].data.send_status.code);
if (reqs[i].data.send_status.details) {
set_status_details(call, STATUS_FROM_SERVER_STATUS,
- grpc_mdstr_ref(reqs[i].data.send_status.details));
+ GRPC_MDSTR_REF(reqs[i].data.send_status.details));
}
}
have_ops |= 1u << op;
@@ -1190,7 +1243,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
- grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
+ grpc_alarm_init(&call->alarm, deadline, call_alarm, call,
+ gpr_now(GPR_CLOCK_REALTIME));
}
/* we offset status by a small amount when storing it into transport metadata
@@ -1257,7 +1311,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
if (key == grpc_channel_get_status_string(call->channel)) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) {
- set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
+ set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value));
} else if (key ==
grpc_channel_get_compresssion_level_string(call->channel)) {
set_decode_compression_level(call, decode_compression(md));
@@ -1293,10 +1347,10 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_mdctx_lock(mdctx);
for (l = md->list.head; l; l = l->next) {
- if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
}
for (l = md->garbage.head; l; l = l->next) {
- grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
}
grpc_mdctx_unlock(mdctx);
}
@@ -1318,11 +1372,13 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, success);
+ grpc_cq_end_op(call->cq, tag, success, done_completion, call,
+ allocate_completion(call));
}
static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
+ allocate_completion(call));
}
static int are_write_flags_valid(gpr_uint32 flags) {
@@ -1345,8 +1401,10 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
- grpc_cq_begin_op(call->cq, call);
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_begin_op(call->cq);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
+ allocate_completion(call));
return GRPC_CALL_OK;
}
@@ -1468,7 +1526,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
}
}
- grpc_cq_begin_op(call->cq, call);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_begin_op(call->cq);
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index fb3662b50d..3b6f9c942e 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -78,8 +78,8 @@ typedef union {
typedef struct {
grpc_ioreq_op op;
- grpc_ioreq_data data;
gpr_uint32 flags; /**< A copy of the write flags from grpc_op */
+ grpc_ioreq_data data;
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c
index 55663298c9..997046d954 100644
--- a/src/core/surface/call_log_batch.c
+++ b/src/core/surface/call_log_batch.c
@@ -46,8 +46,8 @@ static void add_metadata(gpr_strvec *b, const grpc_metadata *md, size_t count) {
gpr_strvec_add(b, gpr_strdup(md[i].key));
gpr_strvec_add(b, gpr_strdup(" value="));
- gpr_strvec_add(b, gpr_hexdump(md[i].value, md[i].value_length,
- GPR_HEXDUMP_PLAINTEXT));
+ gpr_strvec_add(b, gpr_dump(md[i].value, md[i].value_length,
+ GPR_DUMP_HEX | GPR_DUMP_ASCII));
}
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index f8151c121c..b7826d4dfc 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -91,6 +91,7 @@ grpc_channel *grpc_channel_create_from_filters(
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
+ memset(channel, 0, sizeof(*channel));
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
/* decremented by grpc_channel_destroy */
@@ -104,7 +105,7 @@ grpc_channel *grpc_channel_create_from_filters(
char buf[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(i, buf);
channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings(
- mdctx, grpc_mdstr_ref(channel->grpc_status_string),
+ mdctx, GRPC_MDSTR_REF(channel->grpc_status_string),
grpc_mdstr_from_string(mdctx, buf));
}
channel->path_string = grpc_mdstr_from_string(mdctx, ":path");
@@ -157,10 +158,10 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
return grpc_channel_create_call_internal(
channel, cq,
grpc_mdelem_from_metadata_strings(
- channel->metadata_context, grpc_mdstr_ref(channel->path_string),
+ channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method)),
grpc_mdelem_from_metadata_strings(
- channel->metadata_context, grpc_mdstr_ref(channel->authority_string),
+ channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string),
grpc_mdstr_from_string(channel->metadata_context, host)),
deadline);
}
@@ -169,10 +170,10 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host) {
registered_call *rc = gpr_malloc(sizeof(registered_call));
rc->path = grpc_mdelem_from_metadata_strings(
- channel->metadata_context, grpc_mdstr_ref(channel->path_string),
+ channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method));
rc->authority = grpc_mdelem_from_metadata_strings(
- channel->metadata_context, grpc_mdstr_ref(channel->authority_string),
+ channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string),
grpc_mdstr_from_string(channel->metadata_context, host));
gpr_mu_lock(&channel->registered_call_mu);
rc->next = channel->registered_calls;
@@ -186,8 +187,8 @@ grpc_call *grpc_channel_create_registered_call(
void *registered_call_handle, gpr_timespec deadline) {
registered_call *rc = registered_call_handle;
return grpc_channel_create_call_internal(
- channel, completion_queue, grpc_mdelem_ref(rc->path),
- grpc_mdelem_ref(rc->authority), deadline);
+ channel, completion_queue, GRPC_MDELEM_REF(rc->path),
+ GRPC_MDELEM_REF(rc->authority), deadline);
}
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
@@ -205,18 +206,18 @@ static void destroy_channel(void *p, int ok) {
size_t i;
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
- grpc_mdelem_unref(channel->grpc_status_elem[i]);
+ GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]);
}
- grpc_mdstr_unref(channel->grpc_status_string);
- grpc_mdstr_unref(channel->grpc_compression_level_string);
- grpc_mdstr_unref(channel->grpc_message_string);
- grpc_mdstr_unref(channel->path_string);
- grpc_mdstr_unref(channel->authority_string);
+ GRPC_MDSTR_UNREF(channel->grpc_status_string);
+ GRPC_MDSTR_UNREF(channel->grpc_compression_level_string);
+ GRPC_MDSTR_UNREF(channel->grpc_message_string);
+ GRPC_MDSTR_UNREF(channel->path_string);
+ GRPC_MDSTR_UNREF(channel->authority_string);
while (channel->registered_calls) {
registered_call *rc = channel->registered_calls;
channel->registered_calls = rc->next;
- grpc_mdelem_unref(rc->path);
- grpc_mdelem_unref(rc->authority);
+ GRPC_MDELEM_UNREF(rc->path);
+ GRPC_MDELEM_UNREF(rc->authority);
gpr_free(rc);
}
grpc_mdctx_unref(channel->metadata_context);
@@ -267,12 +268,12 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
- return grpc_mdelem_ref(channel->grpc_status_elem[i]);
+ return GRPC_MDELEM_REF(channel->grpc_status_elem[i]);
} else {
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(i, tmp);
return grpc_mdelem_from_metadata_strings(
- channel->metadata_context, grpc_mdstr_ref(channel->grpc_status_string),
+ channel->metadata_context, GRPC_MDSTR_REF(channel->grpc_status_string),
grpc_mdstr_from_string(channel->metadata_context, tmp));
}
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 030a8b4e6f..f3630b31dd 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -45,34 +45,20 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
-#define NUM_TAG_BUCKETS 31
-
-/* A single event: extends grpc_event to form a linked list with a destruction
- function (on_finish) that is hidden from outside this module */
-typedef struct event {
- grpc_event base;
- struct event *queue_next;
- struct event *queue_prev;
- struct event *bucket_next;
- struct event *bucket_prev;
-} event;
-
/* Completion queue structure */
struct grpc_completion_queue {
- /* When refs drops to zero, we are in shutdown mode, and will be destroyable
- once all queued events are drained */
- gpr_refcount refs;
- /* Once owning_refs drops to zero, we will destroy the cq */
+ /** completed events */
+ grpc_cq_completion completed_head;
+ grpc_cq_completion *completed_tail;
+ /** Number of pending events (+1 if we're not shutdown) */
+ gpr_refcount pending_events;
+ /** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
- /* the set of low level i/o things that concern this cq */
+ /** the set of low level i/o things that concern this cq */
grpc_pollset pollset;
- /* 0 initially, 1 once we've begun shutting down */
+ /** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
- /* Head of a linked list of queued events (prev points to the last element) */
- event *queue;
- /* Fixed size chained hash table of events for pluck() */
- event *buckets[NUM_TAG_BUCKETS];
int is_server_cq;
};
@@ -80,19 +66,20 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cc->refs, 1);
+ gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cc->owning_refs, 2);
grpc_pollset_init(&cc->pollset);
+ cc->completed_tail = &cc->completed_head;
+ cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
return cc;
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s",
- cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1,
- reason);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc,
+ (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc) {
#endif
@@ -107,186 +94,135 @@ static void on_pollset_destroy_done(void *arg) {
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s",
- cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1,
- reason);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
+ (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
#else
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
- GPR_ASSERT(cc->queue == NULL);
+ GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head);
grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
}
-/* Create and append an event to the queue. Returns the event so that its data
- members can be filled in.
- Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
-static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
- void *tag, grpc_call *call) {
- event *ev = gpr_malloc(sizeof(event));
- gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- ev->base.type = type;
- ev->base.tag = tag;
- if (cc->queue == NULL) {
- cc->queue = ev->queue_next = ev->queue_prev = ev;
- } else {
- ev->queue_next = cc->queue;
- ev->queue_prev = cc->queue->queue_prev;
- ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
- }
- if (cc->buckets[bucket] == NULL) {
- cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
- } else {
- ev->bucket_next = cc->buckets[bucket];
- ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
- ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
- }
- grpc_pollset_kick(&cc->pollset);
- return ev;
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
- gpr_ref(&cc->refs);
- if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
+void grpc_cq_begin_op(grpc_completion_queue *cc) {
+ gpr_ref(&cc->pending_events);
}
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- int success) {
- event *ev;
- int shutdown = 0;
+/* Queue a GRPC_OP_COMPLETED operation */
+void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
+ void *done_arg, grpc_cq_completion *storage) {
+ int shutdown;
+
+ storage->tag = tag;
+ storage->done = done;
+ storage->done_arg = done_arg;
+ storage->next =
+ ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
+
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
- ev->base.success = success;
- if (gpr_unref(&cc->refs)) {
+ shutdown = gpr_unref(&cc->pending_events);
+ if (!shutdown) {
+ cc->completed_tail->next =
+ ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
+ cc->completed_tail = storage;
+ grpc_pollset_kick(&cc->pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ } else {
+ cc->completed_tail->next =
+ ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
+ cc->completed_tail = storage;
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
- shutdown = 1;
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0);
- if (shutdown) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
}
-/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
-static event *create_shutdown_event(void) {
- event *ev = gpr_malloc(sizeof(event));
- ev->base.type = GRPC_QUEUE_SHUTDOWN;
- ev->base.tag = NULL;
- return ev;
-}
-
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
- event *ev = NULL;
grpc_event ret;
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
- if (cc->queue != NULL) {
- gpr_uintptr bucket;
- ev = cc->queue;
- bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS;
- cc->queue = ev->queue_next;
- ev->queue_next->queue_prev = ev->queue_prev;
- ev->queue_prev->queue_next = ev->queue_next;
- ev->bucket_next->bucket_prev = ev->bucket_prev;
- ev->bucket_prev->bucket_next = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = NULL;
- }
- }
- if (cc->queue == ev) {
- cc->queue = NULL;
+ if (cc->completed_tail != &cc->completed_head) {
+ grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
+ cc->completed_head.next = c->next & ~(gpr_uintptr)1;
+ if (c == cc->completed_tail) {
+ cc->completed_tail = &cc->completed_head;
}
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(c->done_arg, c);
break;
}
if (cc->shutdown) {
- ev = create_shutdown_event();
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(cc, "next");
- return ret;
+ break;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- ret = ev->base;
- gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
return ret;
}
-static event *pluck_event(grpc_completion_queue *cc, void *tag) {
- gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- event *ev = cc->buckets[bucket];
- if (ev == NULL) return NULL;
- do {
- if (ev->base.tag == tag) {
- ev->queue_next->queue_prev = ev->queue_prev;
- ev->queue_prev->queue_next = ev->queue_next;
- ev->bucket_next->bucket_prev = ev->bucket_prev;
- ev->bucket_prev->bucket_next = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = NULL;
- }
- }
- if (cc->queue == ev) {
- cc->queue = ev->queue_next;
- if (cc->queue == ev) {
- cc->queue = NULL;
- }
- }
- return ev;
- }
- ev = ev->bucket_next;
- } while (ev != cc->buckets[bucket]);
- return NULL;
-}
-
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
- event *ev = NULL;
grpc_event ret;
+ grpc_cq_completion *c;
+ grpc_cq_completion *prev;
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
- if ((ev = pluck_event(cc, tag))) {
- break;
+ prev = &cc->completed_head;
+ while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) !=
+ &cc->completed_head) {
+ if (c->tag == tag) {
+ prev->next =
+ (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1);
+ if (c == cc->completed_tail) {
+ cc->completed_tail = prev;
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(c->done_arg, c);
+ goto done;
+ }
+ prev = c;
}
if (cc->shutdown) {
- ev = create_shutdown_event();
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
- return ret;
+ break;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- ret = ev->base;
- gpr_free(ev);
+done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
return ret;
@@ -303,7 +239,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
cc->shutdown_called = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (gpr_unref(&cc->refs)) {
+ if (gpr_unref(&cc->pending_events)) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
@@ -324,8 +260,8 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_kick(&cc->pollset);
- grpc_pollset_work(&cc->pollset,
- gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
+ grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(100)));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 1b9010f462..f944f48d8e 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -39,6 +39,17 @@
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
+typedef struct grpc_cq_completion {
+ /** user supplied tag */
+ void *tag;
+ /** done callback - called when this queue element is no longer
+ needed by the completion queue */
+ void (*done)(void *done_arg, struct grpc_cq_completion *c);
+ void *done_arg;
+ /** next pointer; low bit is used to indicate success or not */
+ gpr_uintptr next;
+} grpc_cq_completion;
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
@@ -57,11 +68,12 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
+void grpc_cq_begin_op(grpc_completion_queue *cc);
/* Queue a GRPC_OP_COMPLETED operation */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- int success);
+void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
+ void *done_arg, grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index a31f0960aa..f642810d12 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -38,6 +38,7 @@
#include <grpc/census.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/time.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/client_config/resolvers/dns_resolver.h"
@@ -91,6 +92,7 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
+ gpr_time_init();
grpc_resolver_registry_init("dns:///");
grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create());
#ifdef GPR_POSIX_SOCKET
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 34ee3f8400..f3c7d8397b 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -75,6 +75,7 @@ static void connector_unref(grpc_connector *con) {
static void on_secure_transport_setup_done(void *arg,
grpc_security_status status,
+ grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
connector *c = arg;
grpc_iomgr_closure *notify;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index f29d47c17c..4dc51bf031 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -51,7 +51,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
+typedef enum { PENDING_START, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
@@ -72,12 +72,14 @@ typedef struct {
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
-typedef struct {
+typedef struct requested_call {
requested_call_type type;
+ struct requested_call *next;
void *tag;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
+ grpc_cq_completion completion;
union {
struct {
grpc_call_details *details;
@@ -92,17 +94,11 @@ typedef struct {
} data;
} requested_call;
-typedef struct {
- requested_call *calls;
- size_t count;
- size_t capacity;
-} requested_call_array;
-
struct registered_method {
char *method;
char *host;
call_data *pending;
- requested_call_array requested;
+ requested_call *requests;
registered_method *next;
};
@@ -114,7 +110,6 @@ typedef struct channel_registered_method {
struct channel_data {
grpc_server *server;
- size_t num_calls;
grpc_connectivity_state connectivity_state;
grpc_channel *channel;
grpc_mdstr *path_key;
@@ -132,6 +127,7 @@ struct channel_data {
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
+ grpc_cq_completion completion;
} shutdown_tag;
struct grpc_server {
@@ -154,7 +150,7 @@ struct grpc_server {
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
- requested_call_array requested_calls;
+ requested_call *requests;
gpr_uint8 shutdown;
gpr_uint8 shutdown_published;
@@ -167,6 +163,9 @@ struct grpc_server {
listener *listeners;
int listeners_destroyed;
gpr_refcount internal_refcount;
+
+ /** when did we print the last shutdown progress message */
+ gpr_timespec last_shutdown_message_time;
};
typedef enum {
@@ -183,7 +182,11 @@ typedef enum {
struct call_data {
grpc_call *call;
+ /** protects state */
+ gpr_mu mu_state;
+ /** the current state of a call - see call_state */
call_state state;
+
grpc_mdstr *path;
grpc_mdstr *host;
gpr_timespec deadline;
@@ -204,9 +207,7 @@ struct call_data {
typedef struct {
grpc_channel **channels;
- grpc_channel **disconnects;
size_t num_channels;
- size_t num_disconnects;
} channel_broadcaster;
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -225,26 +226,15 @@ static void maybe_finish_shutdown(grpc_server *server);
static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
channel_data *c;
size_t count = 0;
- size_t dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
count++;
- if (c->num_calls == 0) {
- dc_count++;
- }
}
cb->num_channels = count;
- cb->num_disconnects = dc_count;
cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
- cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects);
count = 0;
- dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
cb->channels[count++] = c->channel;
GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
- if (c->num_calls == 0) {
- cb->disconnects[dc_count++] = c->channel;
- GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect");
- }
}
}
@@ -280,19 +270,15 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
- int send_goaway, int send_disconnect) {
+ int send_goaway,
+ int force_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
- send_shutdown(cb->channels[i], 1, 0);
+ send_shutdown(cb->channels[i], send_goaway, force_disconnect);
GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
}
- for (i = 0; i < cb->num_disconnects; i++) {
- send_shutdown(cb->disconnects[i], 0, 1);
- GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect");
- }
gpr_free(cb->channels);
- gpr_free(cb->disconnects);
}
/* call list */
@@ -344,22 +330,6 @@ static int call_list_remove(call_data *call, call_list list) {
return 1;
}
-static void requested_call_array_destroy(requested_call_array *array) {
- gpr_free(array->calls);
-}
-
-static requested_call *requested_call_array_add(requested_call_array *array) {
- requested_call *rc;
- if (array->count == array->capacity) {
- array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
- array->calls =
- gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
- }
- rc = &array->calls[array->count++];
- memset(rc, 0, sizeof(*rc));
- return rc;
-}
-
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@@ -371,12 +341,10 @@ static void server_delete(grpc_server *server) {
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters);
- requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
gpr_free(rm->method);
gpr_free(rm->host);
- requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
@@ -422,21 +390,26 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
}
-static void finish_start_new_rpc_and_unlock(grpc_server *server,
- grpc_call_element *elem,
- call_data **pending_root,
- requested_call_array *array) {
- requested_call rc;
+static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
+ call_data **pending_root,
+ requested_call **requests) {
+ requested_call *rc;
call_data *calld = elem->call_data;
- if (array->count == 0) {
+ gpr_mu_lock(&server->mu_call);
+ rc = *requests;
+ if (rc == NULL) {
+ gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
+ gpr_mu_unlock(&calld->mu_state);
call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu_call);
} else {
- rc = array->calls[--array->count];
+ *requests = rc->next;
+ gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
+ gpr_mu_unlock(&calld->mu_state);
gpr_mu_unlock(&server->mu_call);
- begin_call(server, calld, &rc);
+ begin_call(server, calld, rc);
}
}
@@ -448,20 +421,18 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_uint32 hash;
channel_registered_method *rm;
- gpr_mu_lock(&server->mu_call);
if (chand->registered_methods && calld->path && calld->host) {
/* TODO(ctiller): unify these two searches */
/* check for an exact match with host */
hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
- for (i = 0; i < chand->registered_method_max_probes; i++) {
+ for (i = 0; i <= chand->registered_method_max_probes; i++) {
rm = &chand->registered_methods[(hash + i) %
chand->registered_method_slots];
if (!rm) break;
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc_and_unlock(server, elem,
- &rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
+ &rm->server_registered_method->requests);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -472,14 +443,13 @@ static void start_new_rpc(grpc_call_element *elem) {
if (!rm) break;
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc_and_unlock(server, elem,
- &rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
+ &rm->server_registered_method->requests);
return;
}
}
- finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
- &server->requested_calls);
+ finish_start_new_rpc(server, elem, &server->lists[PENDING_START],
+ &server->requests);
}
static void kill_zombie(void *elem, int success) {
@@ -495,35 +465,47 @@ static int num_listeners(grpc_server *server) {
return n;
}
+static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
+ server_unref(server);
+}
+
+static int num_channels(grpc_server *server) {
+ channel_data *chand;
+ int n = 0;
+ for (chand = server->root_channel_data.next;
+ chand != &server->root_channel_data; chand = chand->next) {
+ n++;
+ }
+ return n;
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!server->shutdown || server->shutdown_published) {
return;
}
- gpr_mu_lock(&server->mu_call);
- if (server->lists[ALL_CALLS] != NULL) {
- gpr_log(GPR_DEBUG,
- "Waiting for all calls to finish before destroying server");
- gpr_mu_unlock(&server->mu_call);
- return;
- }
- gpr_mu_unlock(&server->mu_call);
-
- if (server->root_channel_data.next != &server->root_channel_data) {
- gpr_log(GPR_DEBUG,
- "Waiting for all channels to close before destroying server");
- return;
- }
- if (server->listeners_destroyed < num_listeners(server)) {
- gpr_log(GPR_DEBUG, "Waiting for all listeners to be destroyed (@ %d/%d)",
- server->listeners_destroyed, num_listeners(server));
+ if (server->root_channel_data.next != &server->root_channel_data ||
+ server->listeners_destroyed < num_listeners(server)) {
+ if (gpr_time_cmp(
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time),
+ gpr_time_from_seconds(1)) >= 0) {
+ server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
+ gpr_log(GPR_DEBUG,
+ "Waiting for %d channels and %d/%d listeners to be destroyed"
+ " before shutting down server",
+ num_channels(server),
+ num_listeners(server) - server->listeners_destroyed,
+ num_listeners(server));
+ }
return;
}
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
- grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
- NULL, 1);
+ server_ref(server);
+ grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
+ done_shutdown_event, server,
+ &server->shutdown_tags[i].completion);
}
}
@@ -532,31 +514,19 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
if (md->key == chand->path_key) {
- calld->path = grpc_mdstr_ref(md->value);
+ calld->path = GRPC_MDSTR_REF(md->value);
return NULL;
} else if (md->key == chand->authority_key) {
- calld->host = grpc_mdstr_ref(md->value);
+ calld->host = GRPC_MDSTR_REF(md->value);
return NULL;
}
return md;
}
-static int decrement_call_count(channel_data *chand) {
- int disconnect = 0;
- chand->num_calls--;
- if (0 == chand->num_calls && chand->server->shutdown) {
- disconnect = 1;
- }
- maybe_finish_shutdown(chand->server);
- return disconnect;
-}
-
static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- int remove_res;
- int disconnect = 0;
if (success && !calld->got_initial_metadata) {
size_t i;
@@ -581,39 +551,33 @@ static void server_on_recv(void *ptr, int success) {
case GRPC_STREAM_SEND_CLOSED:
break;
case GRPC_STREAM_RECV_CLOSED:
- gpr_mu_lock(&chand->server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
}
- gpr_mu_unlock(&chand->server->mu_call);
break;
case GRPC_STREAM_CLOSED:
- gpr_mu_lock(&chand->server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else if (calld->state == PENDING) {
- call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
+ gpr_mu_lock(&chand->server->mu_call);
+ call_list_remove(calld, PENDING_START);
+ gpr_mu_unlock(&chand->server->mu_call);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
- }
- remove_res = call_list_remove(calld, ALL_CALLS);
- gpr_mu_unlock(&chand->server->mu_call);
- gpr_mu_lock(&chand->server->mu_global);
- if (remove_res) {
- disconnect = decrement_call_count(chand);
- if (disconnect) {
- GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect");
- }
- }
- gpr_mu_unlock(&chand->server->mu_global);
- if (disconnect) {
- send_shutdown(chand->channel, 0, 1);
- GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect");
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
}
break;
}
@@ -676,17 +640,10 @@ static void init_call_elem(grpc_call_element *elem,
memset(calld, 0, sizeof(call_data));
calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem);
+ gpr_mu_init(&calld->mu_state);
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
- gpr_mu_lock(&chand->server->mu_call);
- call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
- gpr_mu_unlock(&chand->server->mu_call);
-
- gpr_mu_lock(&chand->server->mu_global);
- chand->num_calls++;
- gpr_mu_unlock(&chand->server->mu_global);
-
server_ref(chand->server);
if (initial_op) server_mutate_op(elem, initial_op);
@@ -695,27 +652,22 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- int removed[CALL_LIST_COUNT];
- size_t i;
- gpr_mu_lock(&chand->server->mu_call);
- for (i = 0; i < CALL_LIST_COUNT; i++) {
- removed[i] = call_list_remove(elem->call_data, i);
- }
- gpr_mu_unlock(&chand->server->mu_call);
- if (removed[ALL_CALLS]) {
- gpr_mu_lock(&chand->server->mu_global);
- decrement_call_count(chand);
- gpr_mu_unlock(&chand->server->mu_global);
+ if (calld->state == PENDING) {
+ gpr_mu_lock(&chand->server->mu_call);
+ call_list_remove(elem->call_data, PENDING_START);
+ gpr_mu_unlock(&chand->server->mu_call);
}
if (calld->host) {
- grpc_mdstr_unref(calld->host);
+ GRPC_MDSTR_UNREF(calld->host);
}
if (calld->path) {
- grpc_mdstr_unref(calld->path);
+ GRPC_MDSTR_UNREF(calld->path);
}
+ gpr_mu_destroy(&calld->mu_state);
+
server_unref(chand->server);
}
@@ -727,7 +679,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GPR_ASSERT(is_first);
GPR_ASSERT(!is_last);
chand->server = NULL;
- chand->num_calls = 0;
chand->channel = NULL;
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
@@ -744,10 +695,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
if (chand->registered_methods[i].method) {
- grpc_mdstr_unref(chand->registered_methods[i].method);
+ GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
}
if (chand->registered_methods[i].host) {
- grpc_mdstr_unref(chand->registered_methods[i].host);
+ GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
}
}
gpr_free(chand->registered_methods);
@@ -759,8 +710,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand->next = chand->prev = chand;
maybe_finish_shutdown(chand->server);
gpr_mu_unlock(&chand->server->mu_global);
- grpc_mdstr_unref(chand->path_key);
- grpc_mdstr_unref(chand->authority_key);
+ GRPC_MDSTR_UNREF(chand->path_key);
+ GRPC_MDSTR_UNREF(chand->authority_key);
server_unref(chand->server);
}
}
@@ -978,15 +929,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- requested_call_array requested_calls;
- size_t i;
+ requested_call *requests = NULL;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
- grpc_cq_begin_op(cq, NULL);
+ grpc_cq_begin_op(cq);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
@@ -998,27 +948,21 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
return;
}
+ server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
+
channel_broadcaster_init(server, &broadcaster);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- requested_calls = server->requested_calls;
- memset(&server->requested_calls, 0, sizeof(server->requested_calls));
+ requests = server->requests;
+ server->requests = NULL;
for (rm = server->registered_methods; rm; rm = rm->next) {
- if (requested_calls.count + rm->requested.count >
- requested_calls.capacity) {
- requested_calls.capacity =
- GPR_MAX(requested_calls.count + rm->requested.count,
- 2 * requested_calls.capacity);
- requested_calls.calls =
- gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
- requested_calls.capacity);
+ while (rm->requests != NULL) {
+ requested_call *c = rm->requests;
+ rm->requests = c->next;
+ c->next = requests;
+ requests = c;
}
- memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
- sizeof(*requested_calls.calls) * rm->requested.count);
- requested_calls.count += rm->requested.count;
- gpr_free(rm->requested.calls);
- memset(&rm->requested, 0, sizeof(rm->requested));
}
gpr_mu_unlock(&server->mu_call);
@@ -1027,10 +971,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
- for (i = 0; i < requested_calls.count; i++) {
- fail_call(server, &requested_calls.calls[i]);
+ while (requests != NULL) {
+ requested_call *next = requests->next;
+ fail_call(server, requests);
+ requests = next;
}
- gpr_free(requested_calls.calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@@ -1049,47 +994,13 @@ void grpc_server_listener_destroy_done(void *s) {
}
void grpc_server_cancel_all_calls(grpc_server *server) {
- call_data *calld;
- grpc_call **calls;
- size_t call_count;
- size_t call_capacity;
- int is_first = 1;
- size_t i;
-
- gpr_mu_lock(&server->mu_call);
-
- GPR_ASSERT(server->shutdown);
-
- if (!server->lists[ALL_CALLS]) {
- gpr_mu_unlock(&server->mu_call);
- return;
- }
-
- call_capacity = 8;
- call_count = 0;
- calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
-
- for (calld = server->lists[ALL_CALLS];
- calld != server->lists[ALL_CALLS] || is_first;
- calld = calld->links[ALL_CALLS].next) {
- if (call_count == call_capacity) {
- call_capacity *= 2;
- calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
- }
- calls[call_count++] = calld->call;
- GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all");
- is_first = 0;
- }
-
- gpr_mu_unlock(&server->mu_call);
+ channel_broadcaster broadcaster;
- for (i = 0; i < call_count; i++) {
- grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
- "Unavailable");
- GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
- }
+ gpr_mu_lock(&server->mu_global);
+ channel_broadcaster_init(server, &broadcaster);
+ gpr_mu_unlock(&server->mu_global);
- gpr_free(calls);
+ channel_broadcaster_shutdown(&broadcaster, 0, 1);
}
void grpc_server_destroy(grpc_server *server) {
@@ -1126,7 +1037,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
- requested_call_array *requested_calls = NULL;
+ requested_call **requests = NULL;
gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
gpr_mu_unlock(&server->mu_call);
@@ -1137,22 +1048,25 @@ static grpc_call_error queue_call_request(grpc_server *server,
case BATCH_CALL:
calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
- requested_calls = &server->requested_calls;
+ requests = &server->requests;
break;
case REGISTERED_CALL:
calld = call_list_remove_head(
&rc->data.registered.registered_method->pending, PENDING_START);
- requested_calls = &rc->data.registered.registered_method->requested;
+ requests = &rc->data.registered.registered_method->requests;
break;
}
- if (calld) {
+ if (calld != NULL) {
+ gpr_mu_unlock(&server->mu_call);
+ gpr_mu_lock(&calld->mu_state);
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu_call);
+ gpr_mu_unlock(&calld->mu_state);
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
- *requested_call_array_add(requested_calls) = *rc;
+ rc->next = *requests;
+ *requests = rc;
gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
@@ -1163,22 +1077,23 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = BATCH_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.batch.details = details;
- rc.data.batch.initial_metadata = initial_metadata;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = BATCH_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.batch.details = details;
+ rc->data.batch.initial_metadata = initial_metadata;
+ return queue_call_request(server, rc);
}
grpc_call_error grpc_server_request_registered_call(
@@ -1186,22 +1101,23 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm;
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = REGISTERED_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.registered.registered_method = registered_method;
- rc.data.registered.deadline = deadline;
- rc.data.registered.initial_metadata = initial_metadata;
- rc.data.registered.optional_payload = optional_payload;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = REGISTERED_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.registered.registered_method = registered_method;
+ rc->data.registered.deadline = deadline;
+ rc->data.registered.initial_metadata = initial_metadata;
+ rc->data.registered.optional_payload = optional_payload;
+ return queue_call_request(server, rc);
}
static void publish_registered_or_batch(grpc_call *call, int success,
@@ -1268,8 +1184,11 @@ static void begin_call(grpc_server *server, call_data *calld,
}
GRPC_CALL_INTERNAL_REF(calld->call, "server");
- grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
- rc->tag);
+ grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc);
+}
+
+static void done_request_event(void *req, grpc_cq_completion *c) {
+ gpr_free(req);
}
static void fail_call(grpc_server *server, requested_call *rc) {
@@ -1282,15 +1201,19 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc->data.registered.initial_metadata->count = 0;
break;
}
- grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
+ grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
+ &rc->completion);
}
static void publish_registered_or_batch(grpc_call *call, int success,
- void *tag) {
+ void *prc) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ requested_call *rc = prc;
call_data *calld = elem->call_data;
- grpc_cq_end_op(calld->cq_new, tag, call, success);
+ grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
+ &rc->completion);
+ GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/core/surface/version.c b/src/core/surface/version.c
new file mode 100644
index 0000000000..4f5d648371
--- /dev/null
+++ b/src/core/surface/version.c
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* This file is autogenerated from:
+ templates/src/core/surface/version.c.template */
+
+#include <grpc/grpc.h>
+
+const char *grpc_version_string(void) {
+ return "0.10.0.0";
+}