From a8cf05c8034f0783fa62096f527387887f0c8a25 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 22 Jun 2017 15:08:55 -0700 Subject: Refcount grpc_alarm object so that grpc_alarm_destroy can safely be called before the alarm event is dequeued from the completion queue --- src/core/lib/surface/alarm.c | 72 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 8 deletions(-) (limited to 'src/core/lib/surface/alarm.c') diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index ef8405cca8..343e48c101 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -15,13 +15,19 @@ * limitations under the License. * */ +#include "src/core/lib/surface/alarm_internal.h" #include #include #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" +#ifndef NDEBUG +grpc_tracer_flag grpc_trace_alarm_refcount = GRPC_TRACER_INITIALIZER(false); +#endif + struct grpc_alarm { + gpr_refcount refs; grpc_timer alarm; grpc_closure on_alarm; grpc_cq_completion completion; @@ -31,13 +37,58 @@ struct grpc_alarm { void *tag; }; -static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, - grpc_cq_completion *c) {} +static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); } + +static void alarm_unref(grpc_alarm *alarm) { + if (gpr_unref(&alarm->refs)) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(alarm); + } +} + +#ifndef NDEBUG +static void alarm_ref_dbg(grpc_alarm *alarm, const char *reason, + const char *file, int line) { + if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { + gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "Alarm:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", alarm, val, + val + 1, reason); + } + + alarm_ref(alarm); +} + +static void alarm_unref_dbg(grpc_alarm *alarm, const char *reason, + const char *file, int line) { + if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { + gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "Alarm:%p Unref %" PRIdPTR " -> %" PRIdPTR " %s", alarm, val, + val - 1, reason); + } + + alarm_unref(alarm); +} +#endif + +static void alarm_end_completion(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *c) { + grpc_alarm *alarm = arg; + GRPC_ALARM_UNREF(alarm, "dequeue-end-op"); +} static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_alarm *alarm = arg; - grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, - do_nothing_end_completion, NULL, &alarm->completion); + + /* We are queuing an op on completion queue. This means, the alarm's structure + cannot be destroyed until the op is dequeued. Adding an extra ref + here and unref'ing when the op is dequeued will achieve this */ + GRPC_ALARM_REF(alarm, "queue-end-op"); + grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, alarm_end_completion, + (void *)alarm, &alarm->completion); } grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, @@ -45,6 +96,14 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm)); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_ref_init(&alarm->refs, 1); + +#ifndef NDEBUG + if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { + gpr_log(GPR_DEBUG, "Alarm:%p created (ref: 1)", alarm); + } +#endif + GRPC_CQ_INTERNAL_REF(cq, "alarm"); alarm->cq = cq; alarm->tag = tag; @@ -66,9 +125,6 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { } void grpc_alarm_destroy(grpc_alarm *alarm) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_alarm_cancel(alarm); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); - gpr_free(alarm); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_ALARM_UNREF(alarm, "alarm_destroy"); } -- cgit v1.2.3 From 0eaf7debd2915f947ae50367b1768cfad44205dc Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 5 Jul 2017 16:50:51 -0700 Subject: Allow adding events to cq after shutdown is called. --- include/grpc/impl/codegen/grpc_types.h | 4 +- .../filters/client_channel/channel_connectivity.c | 2 +- src/core/lib/surface/alarm.c | 3 +- src/core/lib/surface/call.c | 6 ++- src/core/lib/surface/channel_ping.c | 2 +- src/core/lib/surface/completion_queue.c | 11 ++++- src/core/lib/surface/completion_queue.h | 5 ++- src/core/lib/surface/server.c | 14 +++++-- src/cpp/server/server_cc.cc | 49 ++++++++++++---------- test/core/end2end/fuzzers/server_fuzzer.c | 5 ++- test/core/fling/server.c | 6 ++- test/core/surface/completion_queue_test.c | 6 +-- .../core/surface/completion_queue_threading_test.c | 4 +- test/cpp/microbenchmarks/bm_cq.cc | 7 ++-- test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 2 +- 15 files changed, 79 insertions(+), 47 deletions(-) (limited to 'src/core/lib/surface/alarm.c') diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index f1c457c027..bb3c90e303 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -333,7 +333,9 @@ typedef enum grpc_call_error { /** this batch of operations leads to more operations than allowed */ GRPC_CALL_ERROR_BATCH_TOO_BIG, /** payload type requested is not the type registered */ - GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH + GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH, + /** completion queue has been shutdown */ + GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN } grpc_call_error; /** Default send/receive message size limits in bytes. -1 for unlimited. */ diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index c3dca14305..e30dcc18f4 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state( 7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, cq, tag)); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0); gpr_mu_init(&w->mu); GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index ef8405cca8..632cd6e239 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -18,6 +18,7 @@ #include #include +#include #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" @@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->cq = cq; alarm->tag = tag; - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0); GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2365d27307..a173fadc43 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1422,7 +1422,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag) == 0); grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1723,7 +1723,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag) == 0); } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); @@ -1844,6 +1844,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) { return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; + case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN: + return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN"; case GRPC_CALL_OK: return "GRPC_CALL_OK"; } diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 80eb80af78..8b1150b8b4 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 3851993c92..ef1dfb1eb0 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -525,7 +525,16 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); + while (true) { + gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events.count); + if (count == 0) { + return 1; + } else if (gpr_atm_no_barrier_cas(&cqd->pending_events.count, count, + count + 1)) { + break; + } + } + return 0; } static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index af44482513..7f3d39824e 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. - \a tag is currently used only in debug builds. */ -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); + \a tag is currently used only in debug builds. Return 0 on success and 1 if + completion_queue has been shutdown. */ +int grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index fce7f8dca1..957593a1e2 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, } /* stay locked, and gather up some stuff to do */ - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag)) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } details->reserved = NULL; rc->cq_idx = cq_idx; rc->type = BATCH_CALL; @@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag)) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 92bacbe061..91d980494d 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -151,19 +151,24 @@ class Server::SyncRequest final : public CompletionQueueTag { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; if (tag_) { - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this)); + if (grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_, + notify_cq, this) != GRPC_CALL_OK) { + TeardownRequest(); + return; + } } else { if (!call_details_) { call_details_ = new grpc_call_details; grpc_call_details_init(call_details_); } - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( - server, &call_, call_details_, - &request_metadata_, cq_, notify_cq, this)); + if (grpc_server_request_call(server, &call_, call_details_, + &request_metadata_, cq_, notify_cq, + this) != GRPC_CALL_OK) { + TeardownRequest(); + return; + } } } @@ -286,12 +291,10 @@ class Server::SyncRequestThreadManager : public ThreadManager { if (ok) { // Calldata takes ownership of the completion queue inside sync_req SyncRequest::CallData cd(server_, sync_req); - { - // Prepare for the next request - if (!IsShutdown()) { - sync_req->SetupRequest(); // Create new completion queue for sync_req - sync_req->Request(server_->c_server(), server_cq_->cq()); - } + // Prepare for the next request + if (!IsShutdown()) { + sync_req->SetupRequest(); // Create new completion queue for sync_req + sync_req->Request(server_->c_server(), server_cq_->cq()); } GPR_TIMER_SCOPE("cd.Run()", 0); @@ -316,8 +319,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { } void Shutdown() override { - server_cq_->Shutdown(); ThreadManager::Shutdown(); + server_cq_->Shutdown(); } void Wait() override { @@ -652,10 +655,11 @@ ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, ServerCompletionQueue* notification_cq) { - grpc_server_request_registered_call( - server_->server(), registered_method, &call_, &context_->deadline_, - context_->client_metadata_.arr(), payload, call_cq_->cq(), - notification_cq->cq(), this); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( + server_->server(), registered_method, &call_, + &context_->deadline_, + context_->client_metadata_.arr(), payload, + call_cq_->cq(), notification_cq->cq(), this)); } ServerInterface::GenericAsyncRequest::GenericAsyncRequest( @@ -667,9 +671,10 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest( grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); GPR_ASSERT(call_cq); - grpc_server_request_call(server->server(), &call_, &call_details_, - context->client_metadata_.arr(), call_cq->cq(), - notification_cq->cq(), this); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server->server(), &call_, &call_details_, + context->client_metadata_.arr(), call_cq->cq(), + notification_cq->cq(), this)); } bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, diff --git a/test/core/end2end/fuzzers/server_fuzzer.c b/test/core/end2end/fuzzers/server_fuzzer.c index 3ddc1ae907..ef4c0a4bfd 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.c +++ b/test/core/end2end/fuzzers/server_fuzzer.c @@ -72,8 +72,9 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { grpc_metadata_array_init(&request_metadata1); int requested_calls = 0; - grpc_server_request_call(server, &call1, &call_details1, &request_metadata1, - cq, cq, tag(1)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(server, &call1, &call_details1, + &request_metadata1, cq, cq, tag(1))); requested_calls++; grpc_event ev; diff --git a/test/core/fling/server.c b/test/core/fling/server.c index 0f0f22ffcf..b3a7fa21ec 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -77,8 +77,10 @@ typedef struct { static void request_call(void) { grpc_metadata_array_init(&request_metadata_recv); - grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, - cq, cq, tag(FLING_SERVER_NEW_REQUEST)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(server, &call, &call_details, + &request_metadata_recv, cq, cq, + tag(FLING_SERVER_NEW_REQUEST))); } static void handle_unary_method(void) { diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index f9d88d6327..869ac96ed2 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -144,7 +144,7 @@ static void test_cq_end_op(void) { cc = grpc_completion_queue_create( grpc_completion_queue_factory_lookup(&attr), &attr, NULL); - grpc_cq_begin_op(cc, tag); + GPR_ASSERT(grpc_cq_begin_op(cc, tag) == 0); grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completion); @@ -233,7 +233,7 @@ static void test_pluck(void) { grpc_completion_queue_factory_lookup(&attr), &attr, NULL); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]) == 0); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } @@ -245,7 +245,7 @@ static void test_pluck(void) { } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]) == 0); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c index 99d0fa4980..9120cbb24c 100644 --- a/test/core/surface/completion_queue_threading_test.c +++ b/test/core/surface/completion_queue_threading_test.c @@ -107,7 +107,7 @@ static void test_too_many_plucks(void) { GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]) == 0); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } @@ -153,7 +153,7 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 1", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1); + GPR_ASSERT(grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1) == 0); } gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 18308a2e16..104fe51f26 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include "test/cpp/microbenchmarks/helpers.h" extern "C" { @@ -82,7 +83,7 @@ static void BM_Pass1Cpp(benchmark::State& state) { grpc_cq_completion completion; DummyTag dummy_tag; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_cq_begin_op(c_cq, &dummy_tag); + GPR_ASSERT(grpc_cq_begin_op(c_cq, &dummy_tag) == 0); grpc_cq_end_op(&exec_ctx, c_cq, &dummy_tag, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); @@ -102,7 +103,7 @@ static void BM_Pass1Core(benchmark::State& state) { while (state.KeepRunning()) { grpc_cq_completion completion; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_cq_begin_op(cq, NULL); + GPR_ASSERT(grpc_cq_begin_op(cq, NULL) == 0); grpc_cq_end_op(&exec_ctx, cq, NULL, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); @@ -121,7 +122,7 @@ static void BM_Pluck1Core(benchmark::State& state) { while (state.KeepRunning()) { grpc_cq_completion completion; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_cq_begin_op(cq, NULL); + GPR_ASSERT(grpc_cq_begin_op(cq, NULL) == 0); grpc_cq_end_op(&exec_ctx, cq, NULL, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index f79db15a47..c1439ba5e7 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -78,7 +78,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, } gpr_mu_unlock(&ps->mu); - grpc_cq_begin_op(g_cq, g_tag); + GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag) == 0); grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL, (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion))); grpc_exec_ctx_flush(exec_ctx); -- cgit v1.2.3 From 7d6b914f9836f99c199706274c4602030e3526dc Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 13 Jul 2017 11:48:56 -0700 Subject: Change return type to bool --- .../ext/filters/client_channel/channel_connectivity.c | 2 +- src/core/lib/surface/alarm.c | 2 +- src/core/lib/surface/call.c | 4 ++-- src/core/lib/surface/channel_ping.c | 2 +- src/core/lib/surface/completion_queue.c | 18 +++++++++--------- src/core/lib/surface/completion_queue.h | 6 +++--- src/core/lib/surface/server.c | 6 +++--- test/core/surface/completion_queue_test.c | 6 +++--- test/core/surface/completion_queue_threading_test.c | 4 ++-- test/cpp/microbenchmarks/bm_cq.cc | 6 +++--- test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 2 +- 11 files changed, 29 insertions(+), 29 deletions(-) (limited to 'src/core/lib/surface/alarm.c') diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index e30dcc18f4..b83c95275f 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state( 7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, cq, tag)); - GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); gpr_mu_init(&w->mu); GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 632cd6e239..55934964f3 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -50,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->cq = cq; alarm->tag = tag; - GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index a173fadc43..04613f17e3 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1422,7 +1422,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { if (!is_notify_tag_closure) { - GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1723,7 +1723,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { - GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 8b1150b8b4..e85b308850 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); - GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 98234e036a..237ea25943 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -196,7 +196,7 @@ typedef struct cq_vtable { void (*init)(void *data); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); void (*destroy)(void *data); - int (*begin_op)(grpc_completion_queue *cq, void *tag); + bool (*begin_op)(grpc_completion_queue *cq, void *tag); void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); -static int cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); -static int cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -549,28 +549,28 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif -static int cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { cq_next_data *cqd = DATA_FROM_CQ(cq); while (true) { gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events); if (count == 0) { cq_check_tag(cq, tag, true); /* Used in debug builds only */ - return 1; + return false; } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) { break; } } - return 0; + return true; } -static int cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); gpr_ref(&cqd->pending_events); - return 0; + return true; } -int grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { #ifndef NDEBUG gpr_mu_lock(cq->mu); if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 7f3d39824e..69d144bd95 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -72,9 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. - \a tag is currently used only in debug builds. Return 0 on success and 1 if - completion_queue has been shutdown. */ -int grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); + \a tag is currently used only in debug builds. Return true on success, and + false if completion_queue has been shutdown. */ +bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 957593a1e2..66dcc299aa 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, } /* stay locked, and gather up some stuff to do */ - GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1446,7 +1446,7 @@ grpc_call_error grpc_server_request_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - if (grpc_cq_begin_op(cq_for_notification, tag)) { + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { gpr_free(rc); error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; goto done; @@ -1500,7 +1500,7 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; goto done; } - if (grpc_cq_begin_op(cq_for_notification, tag)) { + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { gpr_free(rc); error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; goto done; diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index 869ac96ed2..e6372a379c 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -144,7 +144,7 @@ static void test_cq_end_op(void) { cc = grpc_completion_queue_create( grpc_completion_queue_factory_lookup(&attr), &attr, NULL); - GPR_ASSERT(grpc_cq_begin_op(cc, tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(cc, tag)); grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completion); @@ -233,7 +233,7 @@ static void test_pluck(void) { grpc_completion_queue_factory_lookup(&attr), &attr, NULL); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]) == 0); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } @@ -245,7 +245,7 @@ static void test_pluck(void) { } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]) == 0); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c index 9120cbb24c..9996b6b840 100644 --- a/test/core/surface/completion_queue_threading_test.c +++ b/test/core/surface/completion_queue_threading_test.c @@ -107,7 +107,7 @@ static void test_too_many_plucks(void) { GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]) == 0); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } @@ -153,7 +153,7 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 1", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - GPR_ASSERT(grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1) == 0); + GPR_ASSERT(grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1)); } gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 104fe51f26..a0c0414f2f 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -83,7 +83,7 @@ static void BM_Pass1Cpp(benchmark::State& state) { grpc_cq_completion completion; DummyTag dummy_tag; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_ASSERT(grpc_cq_begin_op(c_cq, &dummy_tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(c_cq, &dummy_tag)); grpc_cq_end_op(&exec_ctx, c_cq, &dummy_tag, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); @@ -103,7 +103,7 @@ static void BM_Pass1Core(benchmark::State& state) { while (state.KeepRunning()) { grpc_cq_completion completion; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_ASSERT(grpc_cq_begin_op(cq, NULL) == 0); + GPR_ASSERT(grpc_cq_begin_op(cq, NULL)); grpc_cq_end_op(&exec_ctx, cq, NULL, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); @@ -122,7 +122,7 @@ static void BM_Pluck1Core(benchmark::State& state) { while (state.KeepRunning()) { grpc_cq_completion completion; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_ASSERT(grpc_cq_begin_op(cq, NULL) == 0); + GPR_ASSERT(grpc_cq_begin_op(cq, NULL)); grpc_cq_end_op(&exec_ctx, cq, NULL, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index c1439ba5e7..f109fe6251 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -78,7 +78,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, } gpr_mu_unlock(&ps->mu); - GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag) == 0); + GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag)); grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL, (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion))); grpc_exec_ctx_flush(exec_ctx); -- cgit v1.2.3