From 97fc6a3f3f6e3bbb39b6fda1444bc533deb8803d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 Jul 2015 15:31:35 -0700 Subject: Rewrite completion queue internals to use pre-allocation of events --- src/core/iomgr/pollset_multipoller_with_epoll.c | 3 +- src/core/surface/call.c | 42 ++++- src/core/surface/completion_queue.c | 216 +++++++++--------------- src/core/surface/completion_queue.h | 22 ++- src/core/surface/server.c | 165 +++++++++--------- test/core/surface/completion_queue_test.c | 24 ++- 6 files changed, 227 insertions(+), 245 deletions(-) diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 1900bbf9e1..3746c8edaf 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -105,10 +105,11 @@ static void multipoll_with_epoll_pollset_maybe_work( * here. */ - timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); pollset->counter += 1; gpr_mu_unlock(&pollset->mu); + timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); + do { ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms); if (ep_rv < 0) { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index a28a542c8d..445111ca40 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -162,6 +162,8 @@ struct grpc_call { gpr_uint8 error_status_set; /** should the alarm be cancelled */ gpr_uint8 cancel_alarm; + /** bitmask of allocated completion events in completions */ + gpr_uint8 allocated_completions; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -250,6 +252,9 @@ struct grpc_call { grpc_iomgr_closure on_done_recv; grpc_iomgr_closure on_done_send; grpc_iomgr_closure on_done_bind; + + /** completion events - for completion queue use */ + grpc_cq_completion completions[6]; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -349,6 +354,27 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } +grpc_cq_completion *allocate_completion(grpc_call *call) { + gpr_uint8 i; + for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { + if (call->allocated_completions & (1u << i)) { + continue; + } + call->allocated_completions |= 1u << i; + return &call->completions[i]; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +void done_completion(void *call, grpc_cq_completion *completion) { + grpc_call *c = call; + gpr_mu_lock(&c->mu); + c->allocated_completions &= ~(1u << (completion - c->completions)); + gpr_mu_unlock(&c->mu); + GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); +} + #ifdef GRPC_CALL_REF_COUNT_DEBUG void grpc_call_internal_ref(grpc_call *c, const char *reason) { gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, @@ -1316,11 +1342,15 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, success); + grpc_cq_end_op(call->cq, + tag, success, done_completion, call, + allocate_completion(call)); } static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_end_op(call->cq, + tag, 1, done_completion, call, + allocate_completion(call)); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1343,8 +1373,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); if (nops == 0) { - grpc_cq_begin_op(call->cq, call); - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_begin_op(call->cq); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, allocate_completion(call)); return GRPC_CALL_OK; } @@ -1466,7 +1497,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, } } - grpc_cq_begin_op(call->cq, call); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_begin_op(call->cq); return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 030a8b4e6f..86481af02c 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -45,34 +45,20 @@ #include #include -#define NUM_TAG_BUCKETS 31 - -/* A single event: extends grpc_event to form a linked list with a destruction - function (on_finish) that is hidden from outside this module */ -typedef struct event { - grpc_event base; - struct event *queue_next; - struct event *queue_prev; - struct event *bucket_next; - struct event *bucket_prev; -} event; - /* Completion queue structure */ struct grpc_completion_queue { - /* When refs drops to zero, we are in shutdown mode, and will be destroyable - once all queued events are drained */ - gpr_refcount refs; - /* Once owning_refs drops to zero, we will destroy the cq */ + /** completed events */ + grpc_cq_completion completed_head; + grpc_cq_completion *completed_tail; + /** Number of pending events (+1 if we're not shutdown) */ + gpr_refcount pending_events; + /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /* the set of low level i/o things that concern this cq */ + /** the set of low level i/o things that concern this cq */ grpc_pollset pollset; - /* 0 initially, 1 once we've begun shutting down */ + /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; - /* Head of a linked list of queued events (prev points to the last element) */ - event *queue; - /* Fixed size chained hash table of events for pluck() */ - event *buckets[NUM_TAG_BUCKETS]; int is_server_cq; }; @@ -80,10 +66,12 @@ grpc_completion_queue *grpc_completion_queue_create(void) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cc->refs, 1); + gpr_ref_init(&cc->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ gpr_ref_init(&cc->owning_refs, 2); grpc_pollset_init(&cc->pollset); + cc->completed_tail = &cc->completed_head; + cc->completed_head.next = (gpr_uintptr) cc->completed_tail; return cc; } @@ -114,179 +102,127 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { - GPR_ASSERT(cc->queue == NULL); + GPR_ASSERT(cc->completed_head.next == (gpr_uintptr) &cc->completed_head); grpc_pollset_destroy(&cc->pollset); gpr_free(cc); } } -/* Create and append an event to the queue. Returns the event so that its data - members can be filled in. - Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ -static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, - void *tag, grpc_call *call) { - event *ev = gpr_malloc(sizeof(event)); - gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; - ev->base.type = type; - ev->base.tag = tag; - if (cc->queue == NULL) { - cc->queue = ev->queue_next = ev->queue_prev = ev; - } else { - ev->queue_next = cc->queue; - ev->queue_prev = cc->queue->queue_prev; - ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev; - } - if (cc->buckets[bucket] == NULL) { - cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev; - } else { - ev->bucket_next = cc->buckets[bucket]; - ev->bucket_prev = cc->buckets[bucket]->bucket_prev; - ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; - } - grpc_pollset_kick(&cc->pollset); - return ev; -} - -void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) { - gpr_ref(&cc->refs); - if (call) GRPC_CALL_INTERNAL_REF(call, "cq"); +void grpc_cq_begin_op(grpc_completion_queue *cc) { + gpr_ref(&cc->pending_events); } /* Signal the end of an operation - if this is the last waiting-to-be-queued event, then enter shutdown mode */ -void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, - int success) { - event *ev; - int shutdown = 0; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call); - ev->base.success = success; - if (gpr_unref(&cc->refs)) { +/* Queue a GRPC_OP_COMPLETED operation */ +void grpc_cq_end_op( + grpc_completion_queue *cc, + void *tag, + int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, + grpc_cq_completion *storage) { + int shutdown = gpr_unref(&cc->pending_events); + + storage->tag = tag; + storage->done = done; + storage->done_arg = done_arg; + storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); + + if (!shutdown) { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail = storage; + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + } else { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail = storage; GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - shutdown = 1; - } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0); - if (shutdown) { + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); } } -/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ -static event *create_shutdown_event(void) { - event *ev = gpr_malloc(sizeof(event)); - ev->base.type = GRPC_QUEUE_SHUTDOWN; - ev->base.tag = NULL; - return ev; -} - grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { - event *ev = NULL; grpc_event ret; GRPC_CQ_INTERNAL_REF(cc, "next"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { - if (cc->queue != NULL) { - gpr_uintptr bucket; - ev = cc->queue; - bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS; - cc->queue = ev->queue_next; - ev->queue_next->queue_prev = ev->queue_prev; - ev->queue_prev->queue_next = ev->queue_next; - ev->bucket_next->bucket_prev = ev->bucket_prev; - ev->bucket_prev->bucket_next = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = NULL; - } - } - if (cc->queue == ev) { - cc->queue = NULL; + if (cc->completed_tail != &cc->completed_head) { + grpc_cq_completion *c = (grpc_cq_completion *) cc->completed_head.next; + cc->completed_head.next = c->next & ~(gpr_uintptr)1; + if (c == cc->completed_tail) { + cc->completed_tail = &cc->completed_head; } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(c->done_arg, c); break; } if (cc->shutdown) { - ev = create_shutdown_event(); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_SHUTDOWN; break; } if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "next"); - return ret; + break; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - ret = ev->base; - gpr_free(ev); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); return ret; } -static event *pluck_event(grpc_completion_queue *cc, void *tag) { - gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; - event *ev = cc->buckets[bucket]; - if (ev == NULL) return NULL; - do { - if (ev->base.tag == tag) { - ev->queue_next->queue_prev = ev->queue_prev; - ev->queue_prev->queue_next = ev->queue_next; - ev->bucket_next->bucket_prev = ev->bucket_prev; - ev->bucket_prev->bucket_next = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = NULL; - } - } - if (cc->queue == ev) { - cc->queue = ev->queue_next; - if (cc->queue == ev) { - cc->queue = NULL; - } - } - return ev; - } - ev = ev->bucket_next; - } while (ev != cc->buckets[bucket]); - return NULL; -} - grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { - event *ev = NULL; grpc_event ret; + grpc_cq_completion *c; + grpc_cq_completion *prev; GRPC_CQ_INTERNAL_REF(cc, "pluck"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { - if ((ev = pluck_event(cc, tag))) { - break; + prev = &cc->completed_head; + while ((c = (grpc_cq_completion*)(prev->next & ~(gpr_uintptr)1)) != &cc->completed_head) { + if (c->tag == tag) { + prev->next = (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); + if (c == cc->completed_tail) { + cc->completed_tail = prev; + } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(c->done_arg, c); + goto done; + } + prev = c; } if (cc->shutdown) { - ev = create_shutdown_event(); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_SHUTDOWN; break; } if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); - return ret; + break; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - ret = ev->base; - gpr_free(ev); +done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); return ret; @@ -303,7 +239,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { cc->shutdown_called = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (gpr_unref(&cc->refs)) { + if (gpr_unref(&cc->pending_events)) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 1b9010f462..f926d411f3 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -39,6 +39,17 @@ #include "src/core/iomgr/pollset.h" #include +typedef struct grpc_cq_completion { + /** user supplied tag */ + void *tag; + /** done callback - called when this queue element is no longer + needed by the completion queue */ + void (*done)(void *done_arg, struct grpc_cq_completion *c); + void *done_arg; + /** next pointer; low bit is used to indicate success or not */ + gpr_uintptr next; +} grpc_cq_completion; + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); @@ -57,11 +68,16 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made */ -void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call); +void grpc_cq_begin_op(grpc_completion_queue *cc); /* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, - int success); +void grpc_cq_end_op( + grpc_completion_queue *cc, + void *tag, + int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, + grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 341ca2942c..2c9115e9ee 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -72,12 +72,14 @@ typedef struct { typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; -typedef struct { +typedef struct requested_call { requested_call_type type; + struct requested_call *next; void *tag; grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_for_notification; grpc_call **call; + grpc_cq_completion completion; union { struct { grpc_call_details *details; @@ -92,17 +94,11 @@ typedef struct { } data; } requested_call; -typedef struct { - requested_call *calls; - size_t count; - size_t capacity; -} requested_call_array; - struct registered_method { char *method; char *host; call_data *pending; - requested_call_array requested; + requested_call *requests; registered_method *next; }; @@ -131,6 +127,7 @@ struct channel_data { typedef struct shutdown_tag { void *tag; grpc_completion_queue *cq; + grpc_cq_completion completion; } shutdown_tag; struct grpc_server { @@ -153,7 +150,7 @@ struct grpc_server { gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; - requested_call_array requested_calls; + requested_call *requests; gpr_uint8 shutdown; gpr_uint8 shutdown_published; @@ -325,22 +322,6 @@ static int call_list_remove(call_data *call, call_list list) { return 1; } -static void requested_call_array_destroy(requested_call_array *array) { - gpr_free(array->calls); -} - -static requested_call *requested_call_array_add(requested_call_array *array) { - requested_call *rc; - if (array->count == array->capacity) { - array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2); - array->calls = - gpr_realloc(array->calls, sizeof(requested_call) * array->capacity); - } - rc = &array->calls[array->count++]; - memset(rc, 0, sizeof(*rc)); - return rc; -} - static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -352,12 +333,10 @@ static void server_delete(grpc_server *server) { gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); gpr_free(server->channel_filters); - requested_call_array_destroy(&server->requested_calls); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; gpr_free(rm->method); gpr_free(rm->host); - requested_call_array_destroy(&rm->requested); gpr_free(rm); } for (i = 0; i < server->cq_count; i++) { @@ -406,18 +385,18 @@ static void destroy_channel(channel_data *chand) { static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, - requested_call_array *array) { - requested_call rc; + requested_call **requests) { + requested_call *rc = *requests; call_data *calld = elem->call_data; - if (array->count == 0) { + if (rc == NULL) { calld->state = PENDING; call_list_join(pending_root, calld, PENDING_START); gpr_mu_unlock(&server->mu_call); } else { - rc = array->calls[--array->count]; + *requests = rc->next; calld->state = ACTIVATED; gpr_mu_unlock(&server->mu_call); - begin_call(server, calld, &rc); + begin_call(server, calld, rc); } } @@ -442,7 +421,7 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->method != calld->path) continue; finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + &rm->server_registered_method->requests); return; } /* check for a wildcard method definition (no host set) */ @@ -455,12 +434,12 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->method != calld->path) continue; finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + &rm->server_registered_method->requests); return; } } finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], - &server->requested_calls); + &server->requests); } static void kill_zombie(void *elem, int success) { @@ -476,6 +455,10 @@ static int num_listeners(grpc_server *server) { return n; } +static void done_shutdown_event(void *server, grpc_cq_completion *completion) { + server_unref(server); +} + static void maybe_finish_shutdown(grpc_server *server) { size_t i; if (!server->shutdown || server->shutdown_published) { @@ -494,8 +477,14 @@ static void maybe_finish_shutdown(grpc_server *server) { } server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { - grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, - NULL, 1); + server_ref(server); + grpc_cq_end_op(server->shutdown_tags[i].cq, + server->shutdown_tags[i].tag, + 1, + done_shutdown_event, + server, + &server->shutdown_tags[i].completion + ); } } @@ -910,15 +899,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - requested_call_array requested_calls; - size_t i; + requested_call *requests = NULL; registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); - grpc_cq_begin_op(cq, NULL); + grpc_cq_begin_op(cq); server->shutdown_tags = gpr_realloc(server->shutdown_tags, sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)); @@ -934,23 +922,15 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requested_calls = server->requested_calls; - memset(&server->requested_calls, 0, sizeof(server->requested_calls)); + requests = server->requests; + server->requests = NULL; for (rm = server->registered_methods; rm; rm = rm->next) { - if (requested_calls.count + rm->requested.count > - requested_calls.capacity) { - requested_calls.capacity = - GPR_MAX(requested_calls.count + rm->requested.count, - 2 * requested_calls.capacity); - requested_calls.calls = - gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * - requested_calls.capacity); + while (rm->requests != NULL) { + requested_call *c = rm->requests; + rm->requests = c->next; + c->next = requests; + requests = c; } - memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, - sizeof(*requested_calls.calls) * rm->requested.count); - requested_calls.count += rm->requested.count; - gpr_free(rm->requested.calls); - memset(&rm->requested, 0, sizeof(rm->requested)); } gpr_mu_unlock(&server->mu_call); @@ -959,10 +939,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ - for (i = 0; i < requested_calls.count; i++) { - fail_call(server, &requested_calls.calls[i]); + while (requests != NULL) { + requested_call *next = requests->next; + fail_call(server, requests); + requests = next; } - gpr_free(requested_calls.calls); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -1024,7 +1005,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; - requested_call_array *requested_calls = NULL; + requested_call **requests; gpr_mu_lock(&server->mu_call); if (server->shutdown) { gpr_mu_unlock(&server->mu_call); @@ -1035,12 +1016,12 @@ static grpc_call_error queue_call_request(grpc_server *server, case BATCH_CALL: calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); - requested_calls = &server->requested_calls; + requests = &server->requests; break; case REGISTERED_CALL: calld = call_list_remove_head( &rc->data.registered.registered_method->pending, PENDING_START); - requested_calls = &rc->data.registered.registered_method->requested; + requests = &rc->data.registered.registered_method->requests; break; } if (calld) { @@ -1050,7 +1031,8 @@ static grpc_call_error queue_call_request(grpc_server *server, begin_call(server, calld, rc); return GRPC_CALL_OK; } else { - *requested_call_array_add(requested_calls) = *rc; + rc->next = *requests; + *requests = rc; gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } @@ -1061,22 +1043,23 @@ grpc_call_error grpc_server_request_call( grpc_metadata_array *initial_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { - requested_call rc; + requested_call *rc = gpr_malloc(sizeof(*rc)); GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); if (!grpc_cq_is_server_cq(cq_for_notification)) { + gpr_free(rc); return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } - grpc_cq_begin_op(cq_for_notification, NULL); - rc.type = BATCH_CALL; - rc.tag = tag; - rc.cq_bound_to_call = cq_bound_to_call; - rc.cq_for_notification = cq_for_notification; - rc.call = call; - rc.data.batch.details = details; - rc.data.batch.initial_metadata = initial_metadata; - return queue_call_request(server, &rc); + grpc_cq_begin_op(cq_for_notification); + rc->type = BATCH_CALL; + rc->tag = tag; + rc->cq_bound_to_call = cq_bound_to_call; + rc->cq_for_notification = cq_for_notification; + rc->call = call; + rc->data.batch.details = details; + rc->data.batch.initial_metadata = initial_metadata; + return queue_call_request(server, rc); } grpc_call_error grpc_server_request_registered_call( @@ -1084,22 +1067,23 @@ grpc_call_error grpc_server_request_registered_call( grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { - requested_call rc; + requested_call *rc = gpr_malloc(sizeof(*rc)); registered_method *registered_method = rm; if (!grpc_cq_is_server_cq(cq_for_notification)) { + gpr_free(rc); return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } - grpc_cq_begin_op(cq_for_notification, NULL); - rc.type = REGISTERED_CALL; - rc.tag = tag; - rc.cq_bound_to_call = cq_bound_to_call; - rc.cq_for_notification = cq_for_notification; - rc.call = call; - rc.data.registered.registered_method = registered_method; - rc.data.registered.deadline = deadline; - rc.data.registered.initial_metadata = initial_metadata; - rc.data.registered.optional_payload = optional_payload; - return queue_call_request(server, &rc); + grpc_cq_begin_op(cq_for_notification); + rc->type = REGISTERED_CALL; + rc->tag = tag; + rc->cq_bound_to_call = cq_bound_to_call; + rc->cq_for_notification = cq_for_notification; + rc->call = call; + rc->data.registered.registered_method = registered_method; + rc->data.registered.deadline = deadline; + rc->data.registered.initial_metadata = initial_metadata; + rc->data.registered.optional_payload = optional_payload; + return queue_call_request(server, rc); } static void publish_registered_or_batch(grpc_call *call, int success, @@ -1167,7 +1151,11 @@ static void begin_call(grpc_server *server, call_data *calld, GRPC_CALL_INTERNAL_REF(calld->call, "server"); grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, - rc->tag); + rc); +} + +static void done_request_event(void *req, grpc_cq_completion *c) { + gpr_free(req); } static void fail_call(grpc_server *server, requested_call *rc) { @@ -1180,15 +1168,16 @@ static void fail_call(grpc_server *server, requested_call *rc) { rc->data.registered.initial_metadata->count = 0; break; } - grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0); + grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); } static void publish_registered_or_batch(grpc_call *call, int success, - void *tag) { + void *prc) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + requested_call *rc = prc; call_data *calld = elem->call_data; - grpc_cq_end_op(calld->cq_new, tag, call, success); + grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index eba24f5c6e..5b1d9cada9 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -74,17 +74,20 @@ static void test_wait_empty(void) { shutdown_and_destroy(cc); } +static void do_nothing_end_completion(void *arg, grpc_cq_completion *c) {} + static void test_cq_end_op(void) { grpc_event ev; grpc_completion_queue *cc; + grpc_cq_completion completion; void *tag = create_test_tag(); LOG_TEST("test_cq_end_op"); cc = grpc_completion_queue_create(); - grpc_cq_begin_op(cc, NULL); - grpc_cq_end_op(cc, tag, NULL, 1); + grpc_cq_begin_op(cc); + grpc_cq_end_op(cc, tag, 1, do_nothing_end_completion, NULL, &completion); ev = grpc_completion_queue_next(cc, gpr_inf_past); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); @@ -120,6 +123,7 @@ static void test_pluck(void) { grpc_event ev; grpc_completion_queue *cc; void *tags[128]; + grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; unsigned i, j; LOG_TEST("test_pluck"); @@ -134,8 +138,8 @@ static void test_pluck(void) { cc = grpc_completion_queue_create(); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, NULL); - grpc_cq_end_op(cc, tags[i], NULL, 1); + grpc_cq_begin_op(cc); + grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -144,8 +148,8 @@ static void test_pluck(void) { } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, NULL); - grpc_cq_end_op(cc, tags[i], NULL, 1); + grpc_cq_begin_op(cc); + grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -174,6 +178,10 @@ gpr_timespec ten_seconds_time(void) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); } +static void free_completion(void *arg, grpc_cq_completion *completion) { + gpr_free(completion); +} + static void producer_thread(void *arg) { test_thread_options *opt = arg; int i; @@ -184,7 +192,7 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 1", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_begin_op(opt->cc, NULL); + grpc_cq_begin_op(opt->cc); } gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id); @@ -193,7 +201,7 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 2", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, NULL, 1); + grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); opt->events_triggered++; } -- cgit v1.2.3