aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-08 15:31:35 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-08 15:31:35 -0700
commit97fc6a3f3f6e3bbb39b6fda1444bc533deb8803d (patch)
treea8eeea51f1d682c7069c31344de1a3f0f192aaeb
parentc6964b1d980df3c8809809ef05c954626cdda95f (diff)
Rewrite completion queue internals to use pre-allocation of events
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c3
-rw-r--r--src/core/surface/call.c42
-rw-r--r--src/core/surface/completion_queue.c216
-rw-r--r--src/core/surface/completion_queue.h22
-rw-r--r--src/core/surface/server.c165
-rw-r--r--test/core/surface/completion_queue_test.c24
6 files changed, 227 insertions, 245 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 1900bbf9e1..3746c8edaf 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -105,10 +105,11 @@ static void multipoll_with_epoll_pollset_maybe_work(
* here.
*/
- timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
+ timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
+
do {
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
if (ep_rv < 0) {
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index a28a542c8d..445111ca40 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -162,6 +162,8 @@ struct grpc_call {
gpr_uint8 error_status_set;
/** should the alarm be cancelled */
gpr_uint8 cancel_alarm;
+ /** bitmask of allocated completion events in completions */
+ gpr_uint8 allocated_completions;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
@@ -250,6 +252,9 @@ struct grpc_call {
grpc_iomgr_closure on_done_recv;
grpc_iomgr_closure on_done_send;
grpc_iomgr_closure on_done_bind;
+
+ /** completion events - for completion queue use */
+ grpc_cq_completion completions[6];
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -349,6 +354,27 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq;
}
+grpc_cq_completion *allocate_completion(grpc_call *call) {
+ gpr_uint8 i;
+ for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
+ if (call->allocated_completions & (1u << i)) {
+ continue;
+ }
+ call->allocated_completions |= 1u << i;
+ return &call->completions[i];
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
+void done_completion(void *call, grpc_cq_completion *completion) {
+ grpc_call *c = call;
+ gpr_mu_lock(&c->mu);
+ c->allocated_completions &= ~(1u << (completion - c->completions));
+ gpr_mu_unlock(&c->mu);
+ GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
+}
+
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_ref(grpc_call *c, const char *reason) {
gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
@@ -1316,11 +1342,15 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, success);
+ grpc_cq_end_op(call->cq,
+ tag, success, done_completion, call,
+ allocate_completion(call));
}
static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_end_op(call->cq,
+ tag, 1, done_completion, call,
+ allocate_completion(call));
}
static int are_write_flags_valid(gpr_uint32 flags) {
@@ -1343,8 +1373,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
- grpc_cq_begin_op(call->cq, call);
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_begin_op(call->cq);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_end_op(call->cq, tag, 1, done_completion, call, allocate_completion(call));
return GRPC_CALL_OK;
}
@@ -1466,7 +1497,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
}
}
- grpc_cq_begin_op(call->cq, call);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_begin_op(call->cq);
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 030a8b4e6f..86481af02c 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -45,34 +45,20 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
-#define NUM_TAG_BUCKETS 31
-
-/* A single event: extends grpc_event to form a linked list with a destruction
- function (on_finish) that is hidden from outside this module */
-typedef struct event {
- grpc_event base;
- struct event *queue_next;
- struct event *queue_prev;
- struct event *bucket_next;
- struct event *bucket_prev;
-} event;
-
/* Completion queue structure */
struct grpc_completion_queue {
- /* When refs drops to zero, we are in shutdown mode, and will be destroyable
- once all queued events are drained */
- gpr_refcount refs;
- /* Once owning_refs drops to zero, we will destroy the cq */
+ /** completed events */
+ grpc_cq_completion completed_head;
+ grpc_cq_completion *completed_tail;
+ /** Number of pending events (+1 if we're not shutdown) */
+ gpr_refcount pending_events;
+ /** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
- /* the set of low level i/o things that concern this cq */
+ /** the set of low level i/o things that concern this cq */
grpc_pollset pollset;
- /* 0 initially, 1 once we've begun shutting down */
+ /** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
- /* Head of a linked list of queued events (prev points to the last element) */
- event *queue;
- /* Fixed size chained hash table of events for pluck() */
- event *buckets[NUM_TAG_BUCKETS];
int is_server_cq;
};
@@ -80,10 +66,12 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cc->refs, 1);
+ gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cc->owning_refs, 2);
grpc_pollset_init(&cc->pollset);
+ cc->completed_tail = &cc->completed_head;
+ cc->completed_head.next = (gpr_uintptr) cc->completed_tail;
return cc;
}
@@ -114,179 +102,127 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
- GPR_ASSERT(cc->queue == NULL);
+ GPR_ASSERT(cc->completed_head.next == (gpr_uintptr) &cc->completed_head);
grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
}
-/* Create and append an event to the queue. Returns the event so that its data
- members can be filled in.
- Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
-static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
- void *tag, grpc_call *call) {
- event *ev = gpr_malloc(sizeof(event));
- gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- ev->base.type = type;
- ev->base.tag = tag;
- if (cc->queue == NULL) {
- cc->queue = ev->queue_next = ev->queue_prev = ev;
- } else {
- ev->queue_next = cc->queue;
- ev->queue_prev = cc->queue->queue_prev;
- ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
- }
- if (cc->buckets[bucket] == NULL) {
- cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
- } else {
- ev->bucket_next = cc->buckets[bucket];
- ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
- ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
- }
- grpc_pollset_kick(&cc->pollset);
- return ev;
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
- gpr_ref(&cc->refs);
- if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
+void grpc_cq_begin_op(grpc_completion_queue *cc) {
+ gpr_ref(&cc->pending_events);
}
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- int success) {
- event *ev;
- int shutdown = 0;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
- ev->base.success = success;
- if (gpr_unref(&cc->refs)) {
+/* Queue a GRPC_OP_COMPLETED operation */
+void grpc_cq_end_op(
+ grpc_completion_queue *cc,
+ void *tag,
+ int success,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
+ void *done_arg,
+ grpc_cq_completion *storage) {
+ int shutdown = gpr_unref(&cc->pending_events);
+
+ storage->tag = tag;
+ storage->done = done;
+ storage->done_arg = done_arg;
+ storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
+
+ if (!shutdown) {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
+ cc->completed_tail = storage;
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ } else {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
+ cc->completed_tail = storage;
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
- shutdown = 1;
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0);
- if (shutdown) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
}
-/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
-static event *create_shutdown_event(void) {
- event *ev = gpr_malloc(sizeof(event));
- ev->base.type = GRPC_QUEUE_SHUTDOWN;
- ev->base.tag = NULL;
- return ev;
-}
-
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
- event *ev = NULL;
grpc_event ret;
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
- if (cc->queue != NULL) {
- gpr_uintptr bucket;
- ev = cc->queue;
- bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS;
- cc->queue = ev->queue_next;
- ev->queue_next->queue_prev = ev->queue_prev;
- ev->queue_prev->queue_next = ev->queue_next;
- ev->bucket_next->bucket_prev = ev->bucket_prev;
- ev->bucket_prev->bucket_next = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = NULL;
- }
- }
- if (cc->queue == ev) {
- cc->queue = NULL;
+ if (cc->completed_tail != &cc->completed_head) {
+ grpc_cq_completion *c = (grpc_cq_completion *) cc->completed_head.next;
+ cc->completed_head.next = c->next & ~(gpr_uintptr)1;
+ if (c == cc->completed_tail) {
+ cc->completed_tail = &cc->completed_head;
}
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(c->done_arg, c);
break;
}
if (cc->shutdown) {
- ev = create_shutdown_event();
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(cc, "next");
- return ret;
+ break;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- ret = ev->base;
- gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
return ret;
}
-static event *pluck_event(grpc_completion_queue *cc, void *tag) {
- gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- event *ev = cc->buckets[bucket];
- if (ev == NULL) return NULL;
- do {
- if (ev->base.tag == tag) {
- ev->queue_next->queue_prev = ev->queue_prev;
- ev->queue_prev->queue_next = ev->queue_next;
- ev->bucket_next->bucket_prev = ev->bucket_prev;
- ev->bucket_prev->bucket_next = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = NULL;
- }
- }
- if (cc->queue == ev) {
- cc->queue = ev->queue_next;
- if (cc->queue == ev) {
- cc->queue = NULL;
- }
- }
- return ev;
- }
- ev = ev->bucket_next;
- } while (ev != cc->buckets[bucket]);
- return NULL;
-}
-
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
- event *ev = NULL;
grpc_event ret;
+ grpc_cq_completion *c;
+ grpc_cq_completion *prev;
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
- if ((ev = pluck_event(cc, tag))) {
- break;
+ prev = &cc->completed_head;
+ while ((c = (grpc_cq_completion*)(prev->next & ~(gpr_uintptr)1)) != &cc->completed_head) {
+ if (c->tag == tag) {
+ prev->next = (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1);
+ if (c == cc->completed_tail) {
+ cc->completed_tail = prev;
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(c->done_arg, c);
+ goto done;
+ }
+ prev = c;
}
if (cc->shutdown) {
- ev = create_shutdown_event();
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
- return ret;
+ break;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- ret = ev->base;
- gpr_free(ev);
+done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
return ret;
@@ -303,7 +239,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
cc->shutdown_called = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (gpr_unref(&cc->refs)) {
+ if (gpr_unref(&cc->pending_events)) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 1b9010f462..f926d411f3 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -39,6 +39,17 @@
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
+typedef struct grpc_cq_completion {
+ /** user supplied tag */
+ void *tag;
+ /** done callback - called when this queue element is no longer
+ needed by the completion queue */
+ void (*done)(void *done_arg, struct grpc_cq_completion *c);
+ void *done_arg;
+ /** next pointer; low bit is used to indicate success or not */
+ gpr_uintptr next;
+} grpc_cq_completion;
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
@@ -57,11 +68,16 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
+void grpc_cq_begin_op(grpc_completion_queue *cc);
/* Queue a GRPC_OP_COMPLETED operation */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- int success);
+void grpc_cq_end_op(
+ grpc_completion_queue *cc,
+ void *tag,
+ int success,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
+ void *done_arg,
+ grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 341ca2942c..2c9115e9ee 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -72,12 +72,14 @@ typedef struct {
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
-typedef struct {
+typedef struct requested_call {
requested_call_type type;
+ struct requested_call *next;
void *tag;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
+ grpc_cq_completion completion;
union {
struct {
grpc_call_details *details;
@@ -92,17 +94,11 @@ typedef struct {
} data;
} requested_call;
-typedef struct {
- requested_call *calls;
- size_t count;
- size_t capacity;
-} requested_call_array;
-
struct registered_method {
char *method;
char *host;
call_data *pending;
- requested_call_array requested;
+ requested_call *requests;
registered_method *next;
};
@@ -131,6 +127,7 @@ struct channel_data {
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
+ grpc_cq_completion completion;
} shutdown_tag;
struct grpc_server {
@@ -153,7 +150,7 @@ struct grpc_server {
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
- requested_call_array requested_calls;
+ requested_call *requests;
gpr_uint8 shutdown;
gpr_uint8 shutdown_published;
@@ -325,22 +322,6 @@ static int call_list_remove(call_data *call, call_list list) {
return 1;
}
-static void requested_call_array_destroy(requested_call_array *array) {
- gpr_free(array->calls);
-}
-
-static requested_call *requested_call_array_add(requested_call_array *array) {
- requested_call *rc;
- if (array->count == array->capacity) {
- array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
- array->calls =
- gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
- }
- rc = &array->calls[array->count++];
- memset(rc, 0, sizeof(*rc));
- return rc;
-}
-
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@@ -352,12 +333,10 @@ static void server_delete(grpc_server *server) {
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters);
- requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
gpr_free(rm->method);
gpr_free(rm->host);
- requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
@@ -406,18 +385,18 @@ static void destroy_channel(channel_data *chand) {
static void finish_start_new_rpc_and_unlock(grpc_server *server,
grpc_call_element *elem,
call_data **pending_root,
- requested_call_array *array) {
- requested_call rc;
+ requested_call **requests) {
+ requested_call *rc = *requests;
call_data *calld = elem->call_data;
- if (array->count == 0) {
+ if (rc == NULL) {
calld->state = PENDING;
call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu_call);
} else {
- rc = array->calls[--array->count];
+ *requests = rc->next;
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu_call);
- begin_call(server, calld, &rc);
+ begin_call(server, calld, rc);
}
}
@@ -442,7 +421,7 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->method != calld->path) continue;
finish_start_new_rpc_and_unlock(server, elem,
&rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ &rm->server_registered_method->requests);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -455,12 +434,12 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->method != calld->path) continue;
finish_start_new_rpc_and_unlock(server, elem,
&rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ &rm->server_registered_method->requests);
return;
}
}
finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
- &server->requested_calls);
+ &server->requests);
}
static void kill_zombie(void *elem, int success) {
@@ -476,6 +455,10 @@ static int num_listeners(grpc_server *server) {
return n;
}
+static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
+ server_unref(server);
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!server->shutdown || server->shutdown_published) {
@@ -494,8 +477,14 @@ static void maybe_finish_shutdown(grpc_server *server) {
}
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
- grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
- NULL, 1);
+ server_ref(server);
+ grpc_cq_end_op(server->shutdown_tags[i].cq,
+ server->shutdown_tags[i].tag,
+ 1,
+ done_shutdown_event,
+ server,
+ &server->shutdown_tags[i].completion
+ );
}
}
@@ -910,15 +899,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- requested_call_array requested_calls;
- size_t i;
+ requested_call *requests = NULL;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
- grpc_cq_begin_op(cq, NULL);
+ grpc_cq_begin_op(cq);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
@@ -934,23 +922,15 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- requested_calls = server->requested_calls;
- memset(&server->requested_calls, 0, sizeof(server->requested_calls));
+ requests = server->requests;
+ server->requests = NULL;
for (rm = server->registered_methods; rm; rm = rm->next) {
- if (requested_calls.count + rm->requested.count >
- requested_calls.capacity) {
- requested_calls.capacity =
- GPR_MAX(requested_calls.count + rm->requested.count,
- 2 * requested_calls.capacity);
- requested_calls.calls =
- gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
- requested_calls.capacity);
+ while (rm->requests != NULL) {
+ requested_call *c = rm->requests;
+ rm->requests = c->next;
+ c->next = requests;
+ requests = c;
}
- memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
- sizeof(*requested_calls.calls) * rm->requested.count);
- requested_calls.count += rm->requested.count;
- gpr_free(rm->requested.calls);
- memset(&rm->requested, 0, sizeof(rm->requested));
}
gpr_mu_unlock(&server->mu_call);
@@ -959,10 +939,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
- for (i = 0; i < requested_calls.count; i++) {
- fail_call(server, &requested_calls.calls[i]);
+ while (requests != NULL) {
+ requested_call *next = requests->next;
+ fail_call(server, requests);
+ requests = next;
}
- gpr_free(requested_calls.calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@@ -1024,7 +1005,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
- requested_call_array *requested_calls = NULL;
+ requested_call **requests;
gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
gpr_mu_unlock(&server->mu_call);
@@ -1035,12 +1016,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
case BATCH_CALL:
calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
- requested_calls = &server->requested_calls;
+ requests = &server->requests;
break;
case REGISTERED_CALL:
calld = call_list_remove_head(
&rc->data.registered.registered_method->pending, PENDING_START);
- requested_calls = &rc->data.registered.registered_method->requested;
+ requests = &rc->data.registered.registered_method->requests;
break;
}
if (calld) {
@@ -1050,7 +1031,8 @@ static grpc_call_error queue_call_request(grpc_server *server,
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
- *requested_call_array_add(requested_calls) = *rc;
+ rc->next = *requests;
+ *requests = rc;
gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
@@ -1061,22 +1043,23 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = BATCH_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.batch.details = details;
- rc.data.batch.initial_metadata = initial_metadata;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = BATCH_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.batch.details = details;
+ rc->data.batch.initial_metadata = initial_metadata;
+ return queue_call_request(server, rc);
}
grpc_call_error grpc_server_request_registered_call(
@@ -1084,22 +1067,23 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm;
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = REGISTERED_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.registered.registered_method = registered_method;
- rc.data.registered.deadline = deadline;
- rc.data.registered.initial_metadata = initial_metadata;
- rc.data.registered.optional_payload = optional_payload;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = REGISTERED_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.registered.registered_method = registered_method;
+ rc->data.registered.deadline = deadline;
+ rc->data.registered.initial_metadata = initial_metadata;
+ rc->data.registered.optional_payload = optional_payload;
+ return queue_call_request(server, rc);
}
static void publish_registered_or_batch(grpc_call *call, int success,
@@ -1167,7 +1151,11 @@ static void begin_call(grpc_server *server, call_data *calld,
GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
- rc->tag);
+ rc);
+}
+
+static void done_request_event(void *req, grpc_cq_completion *c) {
+ gpr_free(req);
}
static void fail_call(grpc_server *server, requested_call *rc) {
@@ -1180,15 +1168,16 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc->data.registered.initial_metadata->count = 0;
break;
}
- grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
+ grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion);
}
static void publish_registered_or_batch(grpc_call *call, int success,
- void *tag) {
+ void *prc) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ requested_call *rc = prc;
call_data *calld = elem->call_data;
- grpc_cq_end_op(calld->cq_new, tag, call, success);
+ grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index eba24f5c6e..5b1d9cada9 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -74,17 +74,20 @@ static void test_wait_empty(void) {
shutdown_and_destroy(cc);
}
+static void do_nothing_end_completion(void *arg, grpc_cq_completion *c) {}
+
static void test_cq_end_op(void) {
grpc_event ev;
grpc_completion_queue *cc;
+ grpc_cq_completion completion;
void *tag = create_test_tag();
LOG_TEST("test_cq_end_op");
cc = grpc_completion_queue_create();
- grpc_cq_begin_op(cc, NULL);
- grpc_cq_end_op(cc, tag, NULL, 1);
+ grpc_cq_begin_op(cc);
+ grpc_cq_end_op(cc, tag, 1, do_nothing_end_completion, NULL, &completion);
ev = grpc_completion_queue_next(cc, gpr_inf_past);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
@@ -120,6 +123,7 @@ static void test_pluck(void) {
grpc_event ev;
grpc_completion_queue *cc;
void *tags[128];
+ grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
unsigned i, j;
LOG_TEST("test_pluck");
@@ -134,8 +138,8 @@ static void test_pluck(void) {
cc = grpc_completion_queue_create();
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
- grpc_cq_begin_op(cc, NULL);
- grpc_cq_end_op(cc, tags[i], NULL, 1);
+ grpc_cq_begin_op(cc);
+ grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
@@ -144,8 +148,8 @@ static void test_pluck(void) {
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
- grpc_cq_begin_op(cc, NULL);
- grpc_cq_end_op(cc, tags[i], NULL, 1);
+ grpc_cq_begin_op(cc);
+ grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
@@ -174,6 +178,10 @@ gpr_timespec ten_seconds_time(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
}
+static void free_completion(void *arg, grpc_cq_completion *completion) {
+ gpr_free(completion);
+}
+
static void producer_thread(void *arg) {
test_thread_options *opt = arg;
int i;
@@ -184,7 +192,7 @@ static void producer_thread(void *arg) {
gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
- grpc_cq_begin_op(opt->cc, NULL);
+ grpc_cq_begin_op(opt->cc);
}
gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
@@ -193,7 +201,7 @@ static void producer_thread(void *arg) {
gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
- grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, NULL, 1);
+ grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion)));
opt->events_triggered++;
}