diff options
author | vjpai <vpai@google.com> | 2015-07-10 19:48:17 -0700 |
---|---|---|
committer | vjpai <vpai@google.com> | 2015-07-10 19:48:17 -0700 |
commit | a8822999840a7c550692b372ff93841fa43907bd (patch) | |
tree | 0feda344bff960f3331f9f222a4cd0316e5e4099 /src/core/surface | |
parent | 10a6546cb94373b7d57f3cb2d567edfbb0742d7c (diff) | |
parent | 4547d503d37ade45866688dae3ce291ec8a450bd (diff) |
Merge branch 'master' into lockfree-stack
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 60 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 232 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 18 | ||||
-rw-r--r-- | src/core/surface/init.c | 2 | ||||
-rw-r--r-- | src/core/surface/server.c | 207 | ||||
-rw-r--r-- | src/core/surface/version.c | 41 |
6 files changed, 304 insertions, 256 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index fb3b0b1918..0a551ac47f 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -49,6 +49,17 @@ #include <stdlib.h> #include <string.h> +/** The maximum number of completions possible. + Based upon the maximum number of individually queueable ops in the batch + api: + - initial metadata send + - message send + - status/close send (depending on client/server) + - initial metadata recv + - message recv + - status/close recv (depending on client/server) */ +#define MAX_CONCURRENT_COMPLETIONS 6 + typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; typedef enum { @@ -135,6 +146,7 @@ struct grpc_call { grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; + gpr_mu completion_mu; /* how far through the stream have we read? */ read_state read_state; @@ -162,6 +174,8 @@ struct grpc_call { gpr_uint8 error_status_set; /** should the alarm be cancelled */ gpr_uint8 cancel_alarm; + /** bitmask of allocated completion events in completions */ + gpr_uint8 allocated_completions; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -250,6 +264,9 @@ struct grpc_call { grpc_iomgr_closure on_done_recv; grpc_iomgr_closure on_done_send; grpc_iomgr_closure on_done_bind; + + /** completion events - for completion queue use */ + grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -286,6 +303,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); + gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; if (cq) { @@ -349,6 +367,29 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } +static grpc_cq_completion *allocate_completion(grpc_call *call) { + gpr_uint8 i; + gpr_mu_lock(&call->completion_mu); + for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { + if (call->allocated_completions & (1u << i)) { + continue; + } + call->allocated_completions |= 1u << i; + gpr_mu_unlock(&call->completion_mu); + return &call->completions[i]; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +static void done_completion(void *call, grpc_cq_completion *completion) { + grpc_call *c = call; + gpr_mu_lock(&c->completion_mu); + c->allocated_completions &= ~(1u << (completion - c->completions)); + gpr_mu_unlock(&c->completion_mu); + GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); +} + #ifdef GRPC_CALL_REF_COUNT_DEBUG void grpc_call_internal_ref(grpc_call *c, const char *reason) { gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, @@ -365,6 +406,7 @@ static void destroy_call(void *call, int ignored_success) { grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); gpr_mu_destroy(&c->mu); + gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { GRPC_MDSTR_UNREF(c->status[i].details); @@ -1188,7 +1230,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); + grpc_alarm_init(&call->alarm, deadline, call_alarm, call, + gpr_now(GPR_CLOCK_REALTIME)); } /* we offset status by a small amount when storing it into transport metadata @@ -1316,11 +1359,13 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, success); + grpc_cq_end_op(call->cq, tag, success, done_completion, call, + allocate_completion(call)); } static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1343,8 +1388,10 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); if (nops == 0) { - grpc_cq_begin_op(call->cq, call); - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_begin_op(call->cq); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); return GRPC_CALL_OK; } @@ -1466,7 +1513,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, } } - grpc_cq_begin_op(call->cq, call); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_begin_op(call->cq); return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 030a8b4e6f..c67f75fc5c 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -45,34 +45,20 @@ #include <grpc/support/atm.h> #include <grpc/support/log.h> -#define NUM_TAG_BUCKETS 31 - -/* A single event: extends grpc_event to form a linked list with a destruction - function (on_finish) that is hidden from outside this module */ -typedef struct event { - grpc_event base; - struct event *queue_next; - struct event *queue_prev; - struct event *bucket_next; - struct event *bucket_prev; -} event; - /* Completion queue structure */ struct grpc_completion_queue { - /* When refs drops to zero, we are in shutdown mode, and will be destroyable - once all queued events are drained */ - gpr_refcount refs; - /* Once owning_refs drops to zero, we will destroy the cq */ + /** completed events */ + grpc_cq_completion completed_head; + grpc_cq_completion *completed_tail; + /** Number of pending events (+1 if we're not shutdown) */ + gpr_refcount pending_events; + /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /* the set of low level i/o things that concern this cq */ + /** the set of low level i/o things that concern this cq */ grpc_pollset pollset; - /* 0 initially, 1 once we've begun shutting down */ + /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; - /* Head of a linked list of queued events (prev points to the last element) */ - event *queue; - /* Fixed size chained hash table of events for pluck() */ - event *buckets[NUM_TAG_BUCKETS]; int is_server_cq; }; @@ -80,19 +66,20 @@ grpc_completion_queue *grpc_completion_queue_create(void) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cc->refs, 1); + gpr_ref_init(&cc->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ gpr_ref_init(&cc->owning_refs, 2); grpc_pollset_init(&cc->pollset); + cc->completed_tail = &cc->completed_head; + cc->completed_head.next = (gpr_uintptr)cc->completed_tail; return cc; } #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", - cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, - reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc, + (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { #endif @@ -107,186 +94,135 @@ static void on_pollset_destroy_done(void *arg) { #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", - cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, - reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, + (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { - GPR_ASSERT(cc->queue == NULL); + GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head); grpc_pollset_destroy(&cc->pollset); gpr_free(cc); } } -/* Create and append an event to the queue. Returns the event so that its data - members can be filled in. - Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ -static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, - void *tag, grpc_call *call) { - event *ev = gpr_malloc(sizeof(event)); - gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; - ev->base.type = type; - ev->base.tag = tag; - if (cc->queue == NULL) { - cc->queue = ev->queue_next = ev->queue_prev = ev; - } else { - ev->queue_next = cc->queue; - ev->queue_prev = cc->queue->queue_prev; - ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev; - } - if (cc->buckets[bucket] == NULL) { - cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev; - } else { - ev->bucket_next = cc->buckets[bucket]; - ev->bucket_prev = cc->buckets[bucket]->bucket_prev; - ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; - } - grpc_pollset_kick(&cc->pollset); - return ev; -} - -void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) { - gpr_ref(&cc->refs); - if (call) GRPC_CALL_INTERNAL_REF(call, "cq"); +void grpc_cq_begin_op(grpc_completion_queue *cc) { + gpr_ref(&cc->pending_events); } /* Signal the end of an operation - if this is the last waiting-to-be-queued event, then enter shutdown mode */ -void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, - int success) { - event *ev; - int shutdown = 0; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call); - ev->base.success = success; - if (gpr_unref(&cc->refs)) { +/* Queue a GRPC_OP_COMPLETED operation */ +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + int shutdown = gpr_unref(&cc->pending_events); + + storage->tag = tag; + storage->done = done; + storage->done_arg = done_arg; + storage->next = + ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); + + if (!shutdown) { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + cc->completed_tail->next = + ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail = storage; + grpc_pollset_kick(&cc->pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + } else { + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + cc->completed_tail->next = + ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail = storage; GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - shutdown = 1; - } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0); - if (shutdown) { + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); } } -/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ -static event *create_shutdown_event(void) { - event *ev = gpr_malloc(sizeof(event)); - ev->base.type = GRPC_QUEUE_SHUTDOWN; - ev->base.tag = NULL; - return ev; -} - grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { - event *ev = NULL; grpc_event ret; GRPC_CQ_INTERNAL_REF(cc, "next"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { - if (cc->queue != NULL) { - gpr_uintptr bucket; - ev = cc->queue; - bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS; - cc->queue = ev->queue_next; - ev->queue_next->queue_prev = ev->queue_prev; - ev->queue_prev->queue_next = ev->queue_next; - ev->bucket_next->bucket_prev = ev->bucket_prev; - ev->bucket_prev->bucket_next = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = NULL; - } - } - if (cc->queue == ev) { - cc->queue = NULL; + if (cc->completed_tail != &cc->completed_head) { + grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; + cc->completed_head.next = c->next & ~(gpr_uintptr)1; + if (c == cc->completed_tail) { + cc->completed_tail = &cc->completed_head; } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(c->done_arg, c); break; } if (cc->shutdown) { - ev = create_shutdown_event(); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_SHUTDOWN; break; } if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "next"); - return ret; + break; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - ret = ev->base; - gpr_free(ev); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); return ret; } -static event *pluck_event(grpc_completion_queue *cc, void *tag) { - gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; - event *ev = cc->buckets[bucket]; - if (ev == NULL) return NULL; - do { - if (ev->base.tag == tag) { - ev->queue_next->queue_prev = ev->queue_prev; - ev->queue_prev->queue_next = ev->queue_next; - ev->bucket_next->bucket_prev = ev->bucket_prev; - ev->bucket_prev->bucket_next = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = ev->bucket_next; - if (ev == cc->buckets[bucket]) { - cc->buckets[bucket] = NULL; - } - } - if (cc->queue == ev) { - cc->queue = ev->queue_next; - if (cc->queue == ev) { - cc->queue = NULL; - } - } - return ev; - } - ev = ev->bucket_next; - } while (ev != cc->buckets[bucket]); - return NULL; -} - grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { - event *ev = NULL; grpc_event ret; + grpc_cq_completion *c; + grpc_cq_completion *prev; GRPC_CQ_INTERNAL_REF(cc, "pluck"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { - if ((ev = pluck_event(cc, tag))) { - break; + prev = &cc->completed_head; + while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) != + &cc->completed_head) { + if (c->tag == tag) { + prev->next = + (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); + if (c == cc->completed_tail) { + cc->completed_tail = prev; + } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(c->done_arg, c); + goto done; + } + prev = c; } if (cc->shutdown) { - ev = create_shutdown_event(); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_SHUTDOWN; break; } if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); - return ret; + break; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - ret = ev->base; - gpr_free(ev); +done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); return ret; @@ -303,7 +239,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { cc->shutdown_called = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (gpr_unref(&cc->refs)) { + if (gpr_unref(&cc->pending_events)) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; @@ -324,8 +260,8 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_kick(&cc->pollset); - grpc_pollset_work(&cc->pollset, - gpr_time_add(gpr_now(), gpr_time_from_millis(100))); + grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(100))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 1b9010f462..f944f48d8e 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -39,6 +39,17 @@ #include "src/core/iomgr/pollset.h" #include <grpc/grpc.h> +typedef struct grpc_cq_completion { + /** user supplied tag */ + void *tag; + /** done callback - called when this queue element is no longer + needed by the completion queue */ + void (*done)(void *done_arg, struct grpc_cq_completion *c); + void *done_arg; + /** next pointer; low bit is used to indicate success or not */ + gpr_uintptr next; +} grpc_cq_completion; + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); @@ -57,11 +68,12 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made */ -void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call); +void grpc_cq_begin_op(grpc_completion_queue *cc); /* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, - int success); +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 3847ded28c..04e27d30ac 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -35,6 +35,7 @@ #include <grpc/census.h> #include <grpc/grpc.h> +#include <grpc/support/time.h> #include "src/core/channel/channel_stack.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/client_config/resolvers/dns_resolver.h" @@ -64,6 +65,7 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { + gpr_time_init(); grpc_resolver_registry_init("dns:///"); grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create()); #ifdef GPR_POSIX_SOCKET diff --git a/src/core/surface/server.c b/src/core/surface/server.c index a9d8940631..4dc51bf031 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -72,12 +72,14 @@ typedef struct { typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; -typedef struct { +typedef struct requested_call { requested_call_type type; + struct requested_call *next; void *tag; grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_for_notification; grpc_call **call; + grpc_cq_completion completion; union { struct { grpc_call_details *details; @@ -92,17 +94,11 @@ typedef struct { } data; } requested_call; -typedef struct { - requested_call *calls; - size_t count; - size_t capacity; -} requested_call_array; - struct registered_method { char *method; char *host; call_data *pending; - requested_call_array requested; + requested_call *requests; registered_method *next; }; @@ -131,6 +127,7 @@ struct channel_data { typedef struct shutdown_tag { void *tag; grpc_completion_queue *cq; + grpc_cq_completion completion; } shutdown_tag; struct grpc_server { @@ -153,7 +150,7 @@ struct grpc_server { gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; - requested_call_array requested_calls; + requested_call *requests; gpr_uint8 shutdown; gpr_uint8 shutdown_published; @@ -166,6 +163,9 @@ struct grpc_server { listener *listeners; int listeners_destroyed; gpr_refcount internal_refcount; + + /** when did we print the last shutdown progress message */ + gpr_timespec last_shutdown_message_time; }; typedef enum { @@ -270,7 +270,8 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, int force_disconnect) { + int send_goaway, + int force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { @@ -329,22 +330,6 @@ static int call_list_remove(call_data *call, call_list list) { return 1; } -static void requested_call_array_destroy(requested_call_array *array) { - gpr_free(array->calls); -} - -static requested_call *requested_call_array_add(requested_call_array *array) { - requested_call *rc; - if (array->count == array->capacity) { - array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2); - array->calls = - gpr_realloc(array->calls, sizeof(requested_call) * array->capacity); - } - rc = &array->calls[array->count++]; - memset(rc, 0, sizeof(*rc)); - return rc; -} - static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -356,12 +341,10 @@ static void server_delete(grpc_server *server) { gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); gpr_free(server->channel_filters); - requested_call_array_destroy(&server->requested_calls); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; gpr_free(rm->method); gpr_free(rm->host); - requested_call_array_destroy(&rm->requested); gpr_free(rm); } for (i = 0; i < server->cq_count; i++) { @@ -409,23 +392,24 @@ static void destroy_channel(channel_data *chand) { static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, call_data **pending_root, - requested_call_array *array) { - requested_call rc; + requested_call **requests) { + requested_call *rc; call_data *calld = elem->call_data; gpr_mu_lock(&server->mu_call); - if (array->count == 0) { + rc = *requests; + if (rc == NULL) { gpr_mu_lock(&calld->mu_state); calld->state = PENDING; gpr_mu_unlock(&calld->mu_state); call_list_join(pending_root, calld, PENDING_START); gpr_mu_unlock(&server->mu_call); } else { - rc = array->calls[--array->count]; + *requests = rc->next; gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&server->mu_call); - begin_call(server, calld, &rc); + begin_call(server, calld, rc); } } @@ -441,14 +425,14 @@ static void start_new_rpc(grpc_call_element *elem) { /* TODO(ctiller): unify these two searches */ /* check for an exact match with host */ hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); - for (i = 0; i < chand->registered_method_max_probes; i++) { + for (i = 0; i <= chand->registered_method_max_probes; i++) { rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + &rm->server_registered_method->requests); return; } /* check for a wildcard method definition (no host set) */ @@ -460,12 +444,12 @@ static void start_new_rpc(grpc_call_element *elem) { if (rm->host != NULL) continue; if (rm->method != calld->path) continue; finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + &rm->server_registered_method->requests); return; } } finish_start_new_rpc(server, elem, &server->lists[PENDING_START], - &server->requested_calls); + &server->requests); } static void kill_zombie(void *elem, int success) { @@ -481,26 +465,47 @@ static int num_listeners(grpc_server *server) { return n; } +static void done_shutdown_event(void *server, grpc_cq_completion *completion) { + server_unref(server); +} + +static int num_channels(grpc_server *server) { + channel_data *chand; + int n = 0; + for (chand = server->root_channel_data.next; + chand != &server->root_channel_data; chand = chand->next) { + n++; + } + return n; +} + static void maybe_finish_shutdown(grpc_server *server) { size_t i; if (!server->shutdown || server->shutdown_published) { return; } - if (server->root_channel_data.next != &server->root_channel_data) { - gpr_log(GPR_DEBUG, - "Waiting for all channels to close before destroying server"); - return; - } - if (server->listeners_destroyed < num_listeners(server)) { - gpr_log(GPR_DEBUG, "Waiting for all listeners to be destroyed (@ %d/%d)", - server->listeners_destroyed, num_listeners(server)); + if (server->root_channel_data.next != &server->root_channel_data || + server->listeners_destroyed < num_listeners(server)) { + if (gpr_time_cmp( + gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time), + gpr_time_from_seconds(1)) >= 0) { + server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); + gpr_log(GPR_DEBUG, + "Waiting for %d channels and %d/%d listeners to be destroyed" + " before shutting down server", + num_channels(server), + num_listeners(server) - server->listeners_destroyed, + num_listeners(server)); + } return; } server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { - grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, - NULL, 1); + server_ref(server); + grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1, + done_shutdown_event, server, + &server->shutdown_tags[i].completion); } } @@ -924,15 +929,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - requested_call_array requested_calls; - size_t i; + requested_call *requests = NULL; registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); - grpc_cq_begin_op(cq, NULL); + grpc_cq_begin_op(cq); server->shutdown_tags = gpr_realloc(server->shutdown_tags, sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)); @@ -944,27 +948,21 @@ void grpc_server_shutdown_and_notify(grpc_server *server, return; } + server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); + channel_broadcaster_init(server, &broadcaster); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requested_calls = server->requested_calls; - memset(&server->requested_calls, 0, sizeof(server->requested_calls)); + requests = server->requests; + server->requests = NULL; for (rm = server->registered_methods; rm; rm = rm->next) { - if (requested_calls.count + rm->requested.count > - requested_calls.capacity) { - requested_calls.capacity = - GPR_MAX(requested_calls.count + rm->requested.count, - 2 * requested_calls.capacity); - requested_calls.calls = - gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * - requested_calls.capacity); + while (rm->requests != NULL) { + requested_call *c = rm->requests; + rm->requests = c->next; + c->next = requests; + requests = c; } - memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, - sizeof(*requested_calls.calls) * rm->requested.count); - requested_calls.count += rm->requested.count; - gpr_free(rm->requested.calls); - memset(&rm->requested, 0, sizeof(rm->requested)); } gpr_mu_unlock(&server->mu_call); @@ -973,10 +971,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ - for (i = 0; i < requested_calls.count; i++) { - fail_call(server, &requested_calls.calls[i]); + while (requests != NULL) { + requested_call *next = requests->next; + fail_call(server, requests); + requests = next; } - gpr_free(requested_calls.calls); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -1038,7 +1037,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; - requested_call_array *requested_calls = NULL; + requested_call **requests = NULL; gpr_mu_lock(&server->mu_call); if (server->shutdown) { gpr_mu_unlock(&server->mu_call); @@ -1049,12 +1048,12 @@ static grpc_call_error queue_call_request(grpc_server *server, case BATCH_CALL: calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); - requested_calls = &server->requested_calls; + requests = &server->requests; break; case REGISTERED_CALL: calld = call_list_remove_head( &rc->data.registered.registered_method->pending, PENDING_START); - requested_calls = &rc->data.registered.registered_method->requested; + requests = &rc->data.registered.registered_method->requests; break; } if (calld != NULL) { @@ -1066,7 +1065,8 @@ static grpc_call_error queue_call_request(grpc_server *server, begin_call(server, calld, rc); return GRPC_CALL_OK; } else { - *requested_call_array_add(requested_calls) = *rc; + rc->next = *requests; + *requests = rc; gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } @@ -1077,22 +1077,23 @@ grpc_call_error grpc_server_request_call( grpc_metadata_array *initial_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { - requested_call rc; + requested_call *rc = gpr_malloc(sizeof(*rc)); GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); if (!grpc_cq_is_server_cq(cq_for_notification)) { + gpr_free(rc); return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } - grpc_cq_begin_op(cq_for_notification, NULL); - rc.type = BATCH_CALL; - rc.tag = tag; - rc.cq_bound_to_call = cq_bound_to_call; - rc.cq_for_notification = cq_for_notification; - rc.call = call; - rc.data.batch.details = details; - rc.data.batch.initial_metadata = initial_metadata; - return queue_call_request(server, &rc); + grpc_cq_begin_op(cq_for_notification); + rc->type = BATCH_CALL; + rc->tag = tag; + rc->cq_bound_to_call = cq_bound_to_call; + rc->cq_for_notification = cq_for_notification; + rc->call = call; + rc->data.batch.details = details; + rc->data.batch.initial_metadata = initial_metadata; + return queue_call_request(server, rc); } grpc_call_error grpc_server_request_registered_call( @@ -1100,22 +1101,23 @@ grpc_call_error grpc_server_request_registered_call( grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag) { - requested_call rc; + requested_call *rc = gpr_malloc(sizeof(*rc)); registered_method *registered_method = rm; if (!grpc_cq_is_server_cq(cq_for_notification)) { + gpr_free(rc); return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } - grpc_cq_begin_op(cq_for_notification, NULL); - rc.type = REGISTERED_CALL; - rc.tag = tag; - rc.cq_bound_to_call = cq_bound_to_call; - rc.cq_for_notification = cq_for_notification; - rc.call = call; - rc.data.registered.registered_method = registered_method; - rc.data.registered.deadline = deadline; - rc.data.registered.initial_metadata = initial_metadata; - rc.data.registered.optional_payload = optional_payload; - return queue_call_request(server, &rc); + grpc_cq_begin_op(cq_for_notification); + rc->type = REGISTERED_CALL; + rc->tag = tag; + rc->cq_bound_to_call = cq_bound_to_call; + rc->cq_for_notification = cq_for_notification; + rc->call = call; + rc->data.registered.registered_method = registered_method; + rc->data.registered.deadline = deadline; + rc->data.registered.initial_metadata = initial_metadata; + rc->data.registered.optional_payload = optional_payload; + return queue_call_request(server, rc); } static void publish_registered_or_batch(grpc_call *call, int success, @@ -1182,8 +1184,11 @@ static void begin_call(grpc_server *server, call_data *calld, } GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, - rc->tag); + grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc); +} + +static void done_request_event(void *req, grpc_cq_completion *c) { + gpr_free(req); } static void fail_call(grpc_server *server, requested_call *rc) { @@ -1196,15 +1201,19 @@ static void fail_call(grpc_server *server, requested_call *rc) { rc->data.registered.initial_metadata->count = 0; break; } - grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0); + grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, + &rc->completion); } static void publish_registered_or_batch(grpc_call *call, int success, - void *tag) { + void *prc) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + requested_call *rc = prc; call_data *calld = elem->call_data; - grpc_cq_end_op(calld->cq_new, tag, call, success); + grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, + &rc->completion); + GRPC_CALL_INTERNAL_UNREF(call, "server", 0); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/core/surface/version.c b/src/core/surface/version.c new file mode 100644 index 0000000000..4f5d648371 --- /dev/null +++ b/src/core/surface/version.c @@ -0,0 +1,41 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* This file is autogenerated from: + templates/src/core/surface/version.c.template */ + +#include <grpc/grpc.h> + +const char *grpc_version_string(void) { + return "0.10.0.0"; +} |