aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-07-10 16:35:35 -0700
committerGravatar Vijay Pai <vpai@google.com>2015-07-10 16:35:35 -0700
commit13608fe33f0f15ef0864e10156ea3855b886d344 (patch)
treee1075789d8056c03d0f1a28433b9d36078829b9e /src/core/surface
parentaa91ad2c8d824615f198be659b6987f5dd150161 (diff)
parentb7e3633eb6c4f4a83936105ac2fab36f5587ff3b (diff)
Merge pull request #2347 from ctiller/simpler-cq
Rewrite completion queue
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c57
-rw-r--r--src/core/surface/completion_queue.c228
-rw-r--r--src/core/surface/completion_queue.h18
-rw-r--r--src/core/surface/server.c169
4 files changed, 228 insertions, 244 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index fb3b0b1918..20c41cbbd7 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -49,6 +49,17 @@
#include <stdlib.h>
#include <string.h>
+/** The maximum number of completions possible.
+ Based upon the maximum number of individually queueable ops in the batch
+ api:
+ - initial metadata send
+ - message send
+ - status/close send (depending on client/server)
+ - initial metadata recv
+ - message recv
+ - status/close recv (depending on client/server) */
+#define MAX_CONCURRENT_COMPLETIONS 6
+
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
typedef enum {
@@ -135,6 +146,7 @@ struct grpc_call {
grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
+ gpr_mu completion_mu;
/* how far through the stream have we read? */
read_state read_state;
@@ -162,6 +174,8 @@ struct grpc_call {
gpr_uint8 error_status_set;
/** should the alarm be cancelled */
gpr_uint8 cancel_alarm;
+ /** bitmask of allocated completion events in completions */
+ gpr_uint8 allocated_completions;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
@@ -250,6 +264,9 @@ struct grpc_call {
grpc_iomgr_closure on_done_recv;
grpc_iomgr_closure on_done_send;
grpc_iomgr_closure on_done_bind;
+
+ /** completion events - for completion queue use */
+ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -286,6 +303,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
memset(call, 0, sizeof(grpc_call));
gpr_mu_init(&call->mu);
+ gpr_mu_init(&call->completion_mu);
call->channel = channel;
call->cq = cq;
if (cq) {
@@ -349,6 +367,29 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq;
}
+static grpc_cq_completion *allocate_completion(grpc_call *call) {
+ gpr_uint8 i;
+ gpr_mu_lock(&call->completion_mu);
+ for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
+ if (call->allocated_completions & (1u << i)) {
+ continue;
+ }
+ call->allocated_completions |= 1u << i;
+ gpr_mu_unlock(&call->completion_mu);
+ return &call->completions[i];
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
+static void done_completion(void *call, grpc_cq_completion *completion) {
+ grpc_call *c = call;
+ gpr_mu_lock(&c->completion_mu);
+ c->allocated_completions &= ~(1u << (completion - c->completions));
+ gpr_mu_unlock(&c->completion_mu);
+ GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
+}
+
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_ref(grpc_call *c, const char *reason) {
gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
@@ -365,6 +406,7 @@ static void destroy_call(void *call, int ignored_success) {
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
gpr_mu_destroy(&c->mu);
+ gpr_mu_destroy(&c->completion_mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (c->status[i].details) {
GRPC_MDSTR_UNREF(c->status[i].details);
@@ -1316,11 +1358,13 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, success);
+ grpc_cq_end_op(call->cq, tag, success, done_completion, call,
+ allocate_completion(call));
}
static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
+ allocate_completion(call));
}
static int are_write_flags_valid(gpr_uint32 flags) {
@@ -1343,8 +1387,10 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
- grpc_cq_begin_op(call->cq, call);
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_begin_op(call->cq);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
+ allocate_completion(call));
return GRPC_CALL_OK;
}
@@ -1466,7 +1512,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
}
}
- grpc_cq_begin_op(call->cq, call);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_begin_op(call->cq);
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 030a8b4e6f..2ff47eb107 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -45,34 +45,20 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
-#define NUM_TAG_BUCKETS 31
-
-/* A single event: extends grpc_event to form a linked list with a destruction
- function (on_finish) that is hidden from outside this module */
-typedef struct event {
- grpc_event base;
- struct event *queue_next;
- struct event *queue_prev;
- struct event *bucket_next;
- struct event *bucket_prev;
-} event;
-
/* Completion queue structure */
struct grpc_completion_queue {
- /* When refs drops to zero, we are in shutdown mode, and will be destroyable
- once all queued events are drained */
- gpr_refcount refs;
- /* Once owning_refs drops to zero, we will destroy the cq */
+ /** completed events */
+ grpc_cq_completion completed_head;
+ grpc_cq_completion *completed_tail;
+ /** Number of pending events (+1 if we're not shutdown) */
+ gpr_refcount pending_events;
+ /** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
- /* the set of low level i/o things that concern this cq */
+ /** the set of low level i/o things that concern this cq */
grpc_pollset pollset;
- /* 0 initially, 1 once we've begun shutting down */
+ /** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
- /* Head of a linked list of queued events (prev points to the last element) */
- event *queue;
- /* Fixed size chained hash table of events for pluck() */
- event *buckets[NUM_TAG_BUCKETS];
int is_server_cq;
};
@@ -80,19 +66,20 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cc->refs, 1);
+ gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cc->owning_refs, 2);
grpc_pollset_init(&cc->pollset);
+ cc->completed_tail = &cc->completed_head;
+ cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
return cc;
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s",
- cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1,
- reason);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc,
+ (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc) {
#endif
@@ -107,186 +94,135 @@ static void on_pollset_destroy_done(void *arg) {
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s",
- cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1,
- reason);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
+ (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
#else
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
- GPR_ASSERT(cc->queue == NULL);
+ GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head);
grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
}
-/* Create and append an event to the queue. Returns the event so that its data
- members can be filled in.
- Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
-static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
- void *tag, grpc_call *call) {
- event *ev = gpr_malloc(sizeof(event));
- gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- ev->base.type = type;
- ev->base.tag = tag;
- if (cc->queue == NULL) {
- cc->queue = ev->queue_next = ev->queue_prev = ev;
- } else {
- ev->queue_next = cc->queue;
- ev->queue_prev = cc->queue->queue_prev;
- ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
- }
- if (cc->buckets[bucket] == NULL) {
- cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
- } else {
- ev->bucket_next = cc->buckets[bucket];
- ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
- ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
- }
- grpc_pollset_kick(&cc->pollset);
- return ev;
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
- gpr_ref(&cc->refs);
- if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
+void grpc_cq_begin_op(grpc_completion_queue *cc) {
+ gpr_ref(&cc->pending_events);
}
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- int success) {
- event *ev;
- int shutdown = 0;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
- ev->base.success = success;
- if (gpr_unref(&cc->refs)) {
+/* Queue a GRPC_OP_COMPLETED operation */
+void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
+ void *done_arg, grpc_cq_completion *storage) {
+ int shutdown = gpr_unref(&cc->pending_events);
+
+ storage->tag = tag;
+ storage->done = done;
+ storage->done_arg = done_arg;
+ storage->next =
+ ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
+
+ if (!shutdown) {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ cc->completed_tail->next =
+ ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
+ cc->completed_tail = storage;
+ grpc_pollset_kick(&cc->pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ } else {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ cc->completed_tail->next =
+ ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
+ cc->completed_tail = storage;
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
- shutdown = 1;
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0);
- if (shutdown) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
}
-/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
-static event *create_shutdown_event(void) {
- event *ev = gpr_malloc(sizeof(event));
- ev->base.type = GRPC_QUEUE_SHUTDOWN;
- ev->base.tag = NULL;
- return ev;
-}
-
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
- event *ev = NULL;
grpc_event ret;
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
- if (cc->queue != NULL) {
- gpr_uintptr bucket;
- ev = cc->queue;
- bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS;
- cc->queue = ev->queue_next;
- ev->queue_next->queue_prev = ev->queue_prev;
- ev->queue_prev->queue_next = ev->queue_next;
- ev->bucket_next->bucket_prev = ev->bucket_prev;
- ev->bucket_prev->bucket_next = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = NULL;
- }
- }
- if (cc->queue == ev) {
- cc->queue = NULL;
+ if (cc->completed_tail != &cc->completed_head) {
+ grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
+ cc->completed_head.next = c->next & ~(gpr_uintptr)1;
+ if (c == cc->completed_tail) {
+ cc->completed_tail = &cc->completed_head;
}
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(c->done_arg, c);
break;
}
if (cc->shutdown) {
- ev = create_shutdown_event();
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(cc, "next");
- return ret;
+ break;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- ret = ev->base;
- gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
return ret;
}
-static event *pluck_event(grpc_completion_queue *cc, void *tag) {
- gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- event *ev = cc->buckets[bucket];
- if (ev == NULL) return NULL;
- do {
- if (ev->base.tag == tag) {
- ev->queue_next->queue_prev = ev->queue_prev;
- ev->queue_prev->queue_next = ev->queue_next;
- ev->bucket_next->bucket_prev = ev->bucket_prev;
- ev->bucket_prev->bucket_next = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = NULL;
- }
- }
- if (cc->queue == ev) {
- cc->queue = ev->queue_next;
- if (cc->queue == ev) {
- cc->queue = NULL;
- }
- }
- return ev;
- }
- ev = ev->bucket_next;
- } while (ev != cc->buckets[bucket]);
- return NULL;
-}
-
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
- event *ev = NULL;
grpc_event ret;
+ grpc_cq_completion *c;
+ grpc_cq_completion *prev;
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
- if ((ev = pluck_event(cc, tag))) {
- break;
+ prev = &cc->completed_head;
+ while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) !=
+ &cc->completed_head) {
+ if (c->tag == tag) {
+ prev->next =
+ (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1);
+ if (c == cc->completed_tail) {
+ cc->completed_tail = prev;
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(c->done_arg, c);
+ goto done;
+ }
+ prev = c;
}
if (cc->shutdown) {
- ev = create_shutdown_event();
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
- return ret;
+ break;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- ret = ev->base;
- gpr_free(ev);
+done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
return ret;
@@ -303,7 +239,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
cc->shutdown_called = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (gpr_unref(&cc->refs)) {
+ if (gpr_unref(&cc->pending_events)) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 1b9010f462..f944f48d8e 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -39,6 +39,17 @@
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
+typedef struct grpc_cq_completion {
+ /** user supplied tag */
+ void *tag;
+ /** done callback - called when this queue element is no longer
+ needed by the completion queue */
+ void (*done)(void *done_arg, struct grpc_cq_completion *c);
+ void *done_arg;
+ /** next pointer; low bit is used to indicate success or not */
+ gpr_uintptr next;
+} grpc_cq_completion;
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
@@ -57,11 +68,12 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
+void grpc_cq_begin_op(grpc_completion_queue *cc);
/* Queue a GRPC_OP_COMPLETED operation */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- int success);
+void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
+ void *done_arg, grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index adc7ef8efe..eed2320e45 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -72,12 +72,14 @@ typedef struct {
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
-typedef struct {
+typedef struct requested_call {
requested_call_type type;
+ struct requested_call *next;
void *tag;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
+ grpc_cq_completion completion;
union {
struct {
grpc_call_details *details;
@@ -92,17 +94,11 @@ typedef struct {
} data;
} requested_call;
-typedef struct {
- requested_call *calls;
- size_t count;
- size_t capacity;
-} requested_call_array;
-
struct registered_method {
char *method;
char *host;
call_data *pending;
- requested_call_array requested;
+ requested_call *requests;
registered_method *next;
};
@@ -131,6 +127,7 @@ struct channel_data {
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
+ grpc_cq_completion completion;
} shutdown_tag;
struct grpc_server {
@@ -153,7 +150,7 @@ struct grpc_server {
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
- requested_call_array requested_calls;
+ requested_call *requests;
gpr_uint8 shutdown;
gpr_uint8 shutdown_published;
@@ -273,7 +270,8 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
- int send_goaway, int force_disconnect) {
+ int send_goaway,
+ int force_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
@@ -332,22 +330,6 @@ static int call_list_remove(call_data *call, call_list list) {
return 1;
}
-static void requested_call_array_destroy(requested_call_array *array) {
- gpr_free(array->calls);
-}
-
-static requested_call *requested_call_array_add(requested_call_array *array) {
- requested_call *rc;
- if (array->count == array->capacity) {
- array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
- array->calls =
- gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
- }
- rc = &array->calls[array->count++];
- memset(rc, 0, sizeof(*rc));
- return rc;
-}
-
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@@ -359,12 +341,10 @@ static void server_delete(grpc_server *server) {
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters);
- requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
gpr_free(rm->method);
gpr_free(rm->host);
- requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
@@ -412,23 +392,24 @@ static void destroy_channel(channel_data *chand) {
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
call_data **pending_root,
- requested_call_array *array) {
- requested_call rc;
+ requested_call **requests) {
+ requested_call *rc;
call_data *calld = elem->call_data;
gpr_mu_lock(&server->mu_call);
- if (array->count == 0) {
+ rc = *requests;
+ if (rc == NULL) {
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
gpr_mu_unlock(&calld->mu_state);
call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu_call);
} else {
- rc = array->calls[--array->count];
+ *requests = rc->next;
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
gpr_mu_unlock(&server->mu_call);
- begin_call(server, calld, &rc);
+ begin_call(server, calld, rc);
}
}
@@ -451,7 +432,7 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ &rm->server_registered_method->requests);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -463,12 +444,12 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ &rm->server_registered_method->requests);
return;
}
}
finish_start_new_rpc(server, elem, &server->lists[PENDING_START],
- &server->requested_calls);
+ &server->requests);
}
static void kill_zombie(void *elem, int success) {
@@ -484,6 +465,10 @@ static int num_listeners(grpc_server *server) {
return n;
}
+static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
+ server_unref(server);
+}
+
static int num_channels(grpc_server *server) {
channel_data *chand;
int n = 0;
@@ -517,8 +502,10 @@ static void maybe_finish_shutdown(grpc_server *server) {
}
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
- grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
- NULL, 1);
+ server_ref(server);
+ grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
+ done_shutdown_event, server,
+ &server->shutdown_tags[i].completion);
}
}
@@ -942,15 +929,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- requested_call_array requested_calls;
- size_t i;
+ requested_call *requests = NULL;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
- grpc_cq_begin_op(cq, NULL);
+ grpc_cq_begin_op(cq);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
@@ -968,23 +954,15 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- requested_calls = server->requested_calls;
- memset(&server->requested_calls, 0, sizeof(server->requested_calls));
+ requests = server->requests;
+ server->requests = NULL;
for (rm = server->registered_methods; rm; rm = rm->next) {
- if (requested_calls.count + rm->requested.count >
- requested_calls.capacity) {
- requested_calls.capacity =
- GPR_MAX(requested_calls.count + rm->requested.count,
- 2 * requested_calls.capacity);
- requested_calls.calls =
- gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
- requested_calls.capacity);
+ while (rm->requests != NULL) {
+ requested_call *c = rm->requests;
+ rm->requests = c->next;
+ c->next = requests;
+ requests = c;
}
- memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
- sizeof(*requested_calls.calls) * rm->requested.count);
- requested_calls.count += rm->requested.count;
- gpr_free(rm->requested.calls);
- memset(&rm->requested, 0, sizeof(rm->requested));
}
gpr_mu_unlock(&server->mu_call);
@@ -993,10 +971,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
- for (i = 0; i < requested_calls.count; i++) {
- fail_call(server, &requested_calls.calls[i]);
+ while (requests != NULL) {
+ requested_call *next = requests->next;
+ fail_call(server, requests);
+ requests = next;
}
- gpr_free(requested_calls.calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@@ -1058,7 +1037,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
- requested_call_array *requested_calls = NULL;
+ requested_call **requests = NULL;
gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
gpr_mu_unlock(&server->mu_call);
@@ -1069,12 +1048,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
case BATCH_CALL:
calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
- requested_calls = &server->requested_calls;
+ requests = &server->requests;
break;
case REGISTERED_CALL:
calld = call_list_remove_head(
&rc->data.registered.registered_method->pending, PENDING_START);
- requested_calls = &rc->data.registered.registered_method->requested;
+ requests = &rc->data.registered.registered_method->requests;
break;
}
if (calld != NULL) {
@@ -1086,7 +1065,8 @@ static grpc_call_error queue_call_request(grpc_server *server,
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
- *requested_call_array_add(requested_calls) = *rc;
+ rc->next = *requests;
+ *requests = rc;
gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
@@ -1097,22 +1077,23 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = BATCH_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.batch.details = details;
- rc.data.batch.initial_metadata = initial_metadata;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = BATCH_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.batch.details = details;
+ rc->data.batch.initial_metadata = initial_metadata;
+ return queue_call_request(server, rc);
}
grpc_call_error grpc_server_request_registered_call(
@@ -1120,22 +1101,23 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm;
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = REGISTERED_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.registered.registered_method = registered_method;
- rc.data.registered.deadline = deadline;
- rc.data.registered.initial_metadata = initial_metadata;
- rc.data.registered.optional_payload = optional_payload;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = REGISTERED_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.registered.registered_method = registered_method;
+ rc->data.registered.deadline = deadline;
+ rc->data.registered.initial_metadata = initial_metadata;
+ rc->data.registered.optional_payload = optional_payload;
+ return queue_call_request(server, rc);
}
static void publish_registered_or_batch(grpc_call *call, int success,
@@ -1202,8 +1184,11 @@ static void begin_call(grpc_server *server, call_data *calld,
}
GRPC_CALL_INTERNAL_REF(calld->call, "server");
- grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
- rc->tag);
+ grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc);
+}
+
+static void done_request_event(void *req, grpc_cq_completion *c) {
+ gpr_free(req);
}
static void fail_call(grpc_server *server, requested_call *rc) {
@@ -1216,15 +1201,19 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc->data.registered.initial_metadata->count = 0;
break;
}
- grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
+ grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
+ &rc->completion);
}
static void publish_registered_or_batch(grpc_call *call, int success,
- void *tag) {
+ void *prc) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ requested_call *rc = prc;
call_data *calld = elem->call_data;
- grpc_cq_end_op(calld->cq_new, tag, call, success);
+ grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
+ &rc->completion);
+ GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {