aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/call_combiner.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-31 12:23:53 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-31 12:23:53 -0700
commitccad38227f63797318d7cffcba8a2df783394ccd (patch)
tree64632504188bf906796763f585c11055098aab58 /src/core/lib/iomgr/call_combiner.c
parent6e739745e298bf205eb2553d3a3c8a3eed06fff1 (diff)
parent451b92a754900a800a7728a3d63462058026ac2c (diff)
Merge github.com:grpc/grpc into stats
Diffstat (limited to 'src/core/lib/iomgr/call_combiner.c')
-rw-r--r--src/core/lib/iomgr/call_combiner.c180
1 files changed, 0 insertions, 180 deletions
diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c
deleted file mode 100644
index 899f98552d..0000000000
--- a/src/core/lib/iomgr/call_combiner.c
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/iomgr/call_combiner.h"
-
-#include <grpc/support/log.h>
-
-grpc_tracer_flag grpc_call_combiner_trace =
- GRPC_TRACER_INITIALIZER(false, "call_combiner");
-
-static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
- if (cancel_state & 1) {
- return (grpc_error*)(cancel_state & ~(gpr_atm)1);
- }
- return GRPC_ERROR_NONE;
-}
-
-static gpr_atm encode_cancel_state_error(grpc_error* error) {
- return (gpr_atm)1 | (gpr_atm)error;
-}
-
-void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
- gpr_mpscq_init(&call_combiner->queue);
-}
-
-void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
- gpr_mpscq_destroy(&call_combiner->queue);
- GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
-}
-
-#ifndef NDEBUG
-#define DEBUG_ARGS , const char *file, int line
-#define DEBUG_FMT_STR "%s:%d: "
-#define DEBUG_FMT_ARGS , file, line
-#else
-#define DEBUG_ARGS
-#define DEBUG_FMT_STR
-#define DEBUG_FMT_ARGS
-#endif
-
-void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure,
- grpc_error* error DEBUG_ARGS,
- const char* reason) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
- "%s] error=%s",
- call_combiner, closure DEBUG_FMT_ARGS, reason,
- grpc_error_string(error));
- }
- size_t prev_size =
- (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
- prev_size + 1);
- }
- if (prev_size == 0) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
- }
- // Queue was empty, so execute this closure immediately.
- GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
- } else {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_INFO, " QUEUING");
- }
- // Queue was not empty, so add closure to queue.
- closure->error_data.error = error;
- gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
- }
-}
-
-void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner DEBUG_ARGS,
- const char* reason) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
- call_combiner DEBUG_FMT_ARGS, reason);
- }
- size_t prev_size =
- (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
- prev_size - 1);
- }
- GPR_ASSERT(prev_size >= 1);
- if (prev_size > 1) {
- while (true) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " checking queue");
- }
- bool empty;
- grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end(
- &call_combiner->queue, &empty);
- if (closure == NULL) {
- // This can happen either due to a race condition within the mpscq
- // code or because of a race with grpc_call_combiner_start().
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " queue returned no result; checking again");
- }
- continue;
- }
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s",
- closure, grpc_error_string(closure->error_data.error));
- }
- GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error);
- break;
- }
- } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " queue empty");
- }
-}
-
-void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure) {
- while (true) {
- // Decode original state.
- gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
- grpc_error* original_error = decode_cancel_state_error(original_state);
- // If error is set, invoke the cancellation closure immediately.
- // Otherwise, store the new closure.
- if (original_error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error));
- break;
- } else {
- if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
- (gpr_atm)closure)) {
- break;
- }
- }
- // cas failed, try again.
- }
-}
-
-void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_error* error) {
- while (true) {
- gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
- grpc_error* original_error = decode_cancel_state_error(original_state);
- if (original_error != GRPC_ERROR_NONE) {
- GRPC_ERROR_UNREF(error);
- break;
- }
- if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
- encode_cancel_state_error(error))) {
- if (original_state != 0) {
- grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "call_combiner=%p: scheduling notify_on_cancel callback=%p",
- call_combiner, notify_on_cancel);
- }
- GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error));
- }
- break;
- }
- // cas failed, try again.
- }
-}