diff options
-rw-r--r-- | src/core/surface/call.c | 13 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 41 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 12 | ||||
-rw-r--r-- | src/core/surface/server.c | 22 | ||||
-rw-r--r-- | test/core/surface/completion_queue_test.c | 9 |
5 files changed, 46 insertions, 51 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index db382a5fe7..79a399c227 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1342,15 +1342,13 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, - tag, success, done_completion, call, - allocate_completion(call)); + grpc_cq_end_op(call->cq, tag, success, done_completion, call, + allocate_completion(call)); } static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, - tag, 1, done_completion, call, - allocate_completion(call)); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1375,7 +1373,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, if (nops == 0) { grpc_cq_begin_op(call->cq); GRPC_CALL_INTERNAL_REF(call, "completion"); - grpc_cq_end_op(call->cq, tag, 1, done_completion, call, allocate_completion(call)); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); return GRPC_CALL_OK; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 86481af02c..67f4443e9d 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -71,16 +71,15 @@ grpc_completion_queue *grpc_completion_queue_create(void) { gpr_ref_init(&cc->owning_refs, 2); grpc_pollset_init(&cc->pollset); cc->completed_tail = &cc->completed_head; - cc->completed_head.next = (gpr_uintptr) cc->completed_tail; + cc->completed_head.next = (gpr_uintptr)cc->completed_tail; return cc; } #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", - cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, - reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc, + (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { #endif @@ -95,14 +94,13 @@ static void on_pollset_destroy_done(void *arg) { #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", - cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, - reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, + (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { - GPR_ASSERT(cc->completed_head.next == (gpr_uintptr) &cc->completed_head); + GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head); grpc_pollset_destroy(&cc->pollset); gpr_free(cc); } @@ -115,28 +113,27 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) { /* Signal the end of an operation - if this is the last waiting-to-be-queued event, then enter shutdown mode */ /* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op( - grpc_completion_queue *cc, - void *tag, - int success, - void (*done)(void *done_arg, grpc_cq_completion *storage), - void *done_arg, - grpc_cq_completion *storage) { +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { int shutdown = gpr_unref(&cc->pending_events); storage->tag = tag; storage->done = done; storage->done_arg = done_arg; - storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); + storage->next = + ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); if (!shutdown) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail->next = + ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } else { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); + cc->completed_tail->next = + ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); @@ -154,7 +151,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if (cc->completed_tail != &cc->completed_head) { - grpc_cq_completion *c = (grpc_cq_completion *) cc->completed_head.next; + grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; cc->completed_head.next = c->next & ~(gpr_uintptr)1; if (c == cc->completed_tail) { cc->completed_tail = &cc->completed_head; @@ -194,9 +191,11 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { prev = &cc->completed_head; - while ((c = (grpc_cq_completion*)(prev->next & ~(gpr_uintptr)1)) != &cc->completed_head) { + while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) != + &cc->completed_head) { if (c->tag == tag) { - prev->next = (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); + prev->next = + (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); if (c == cc->completed_tail) { cc->completed_tail = prev; } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index f926d411f3..f944f48d8e 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -40,7 +40,7 @@ #include <grpc/grpc.h> typedef struct grpc_cq_completion { - /** user supplied tag */ + /** user supplied tag */ void *tag; /** done callback - called when this queue element is no longer needed by the completion queue */ @@ -71,13 +71,9 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc); void grpc_cq_begin_op(grpc_completion_queue *cc); /* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op( - grpc_completion_queue *cc, - void *tag, - int success, - void (*done)(void *done_arg, grpc_cq_completion *storage), - void *done_arg, - grpc_cq_completion *storage); +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, + void (*done)(void *done_arg, grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index ff02428c49..32e5058b4d 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -267,7 +267,8 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, int force_disconnect) { + int send_goaway, + int force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { @@ -484,13 +485,9 @@ static void maybe_finish_shutdown(grpc_server *server) { server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { server_ref(server); - grpc_cq_end_op(server->shutdown_tags[i].cq, - server->shutdown_tags[i].tag, - 1, - done_shutdown_event, - server, - &server->shutdown_tags[i].completion - ); + grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1, + done_shutdown_event, server, + &server->shutdown_tags[i].completion); } } @@ -1167,8 +1164,7 @@ static void begin_call(grpc_server *server, call_data *calld, } GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, - rc); + grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc); } static void done_request_event(void *req, grpc_cq_completion *c) { @@ -1185,7 +1181,8 @@ static void fail_call(grpc_server *server, requested_call *rc) { rc->data.registered.initial_metadata->count = 0; break; } - grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); + grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, + &rc->completion); } static void publish_registered_or_batch(grpc_call *call, int success, @@ -1194,7 +1191,8 @@ static void publish_registered_or_batch(grpc_call *call, int success, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); requested_call *rc = prc; call_data *calld = elem->call_data; - grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion); + grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, + &rc->completion); GRPC_CALL_INTERNAL_UNREF(call, "server", 0); } diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index 5b1d9cada9..67e66a0a9f 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -139,7 +139,8 @@ static void test_pluck(void) { for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { grpc_cq_begin_op(cc); - grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]); + grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, + &completions[i]); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -149,7 +150,8 @@ static void test_pluck(void) { for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { grpc_cq_begin_op(cc); - grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, &completions[i]); + grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, + &completions[i]); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -201,7 +203,8 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 2", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, + gpr_malloc(sizeof(grpc_cq_completion))); opt->events_triggered++; } |