diff options
author | Craig Tiller <ctiller@google.com> | 2017-08-31 12:23:53 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-08-31 12:23:53 -0700 |
commit | ccad38227f63797318d7cffcba8a2df783394ccd (patch) | |
tree | 64632504188bf906796763f585c11055098aab58 /src/core/lib/iomgr/call_combiner.c | |
parent | 6e739745e298bf205eb2553d3a3c8a3eed06fff1 (diff) | |
parent | 451b92a754900a800a7728a3d63462058026ac2c (diff) |
Merge github.com:grpc/grpc into stats
Diffstat (limited to 'src/core/lib/iomgr/call_combiner.c')
-rw-r--r-- | src/core/lib/iomgr/call_combiner.c | 180 |
1 files changed, 0 insertions, 180 deletions
diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c deleted file mode 100644 index 899f98552d..0000000000 --- a/src/core/lib/iomgr/call_combiner.c +++ /dev/null @@ -1,180 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/call_combiner.h" - -#include <grpc/support/log.h> - -grpc_tracer_flag grpc_call_combiner_trace = - GRPC_TRACER_INITIALIZER(false, "call_combiner"); - -static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) { - if (cancel_state & 1) { - return (grpc_error*)(cancel_state & ~(gpr_atm)1); - } - return GRPC_ERROR_NONE; -} - -static gpr_atm encode_cancel_state_error(grpc_error* error) { - return (gpr_atm)1 | (gpr_atm)error; -} - -void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { - gpr_mpscq_init(&call_combiner->queue); -} - -void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { - gpr_mpscq_destroy(&call_combiner->queue); - GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state)); -} - -#ifndef NDEBUG -#define DEBUG_ARGS , const char *file, int line -#define DEBUG_FMT_STR "%s:%d: " -#define DEBUG_FMT_ARGS , file, line -#else -#define DEBUG_ARGS -#define DEBUG_FMT_STR -#define DEBUG_FMT_ARGS -#endif - -void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure, - grpc_error* error DEBUG_ARGS, - const char* reason) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR - "%s] error=%s", - call_combiner, closure DEBUG_FMT_ARGS, reason, - grpc_error_string(error)); - } - size_t prev_size = - (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, - prev_size + 1); - } - if (prev_size == 0) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY"); - } - // Queue was empty, so execute this closure immediately. - GRPC_CLOSURE_SCHED(exec_ctx, closure, error); - } else { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_INFO, " QUEUING"); - } - // Queue was not empty, so add closure to queue. - closure->error_data.error = error; - gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure); - } -} - -void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner DEBUG_ARGS, - const char* reason) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", - call_combiner DEBUG_FMT_ARGS, reason); - } - size_t prev_size = - (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, - prev_size - 1); - } - GPR_ASSERT(prev_size >= 1); - if (prev_size > 1) { - while (true) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " checking queue"); - } - bool empty; - grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end( - &call_combiner->queue, &empty); - if (closure == NULL) { - // This can happen either due to a race condition within the mpscq - // code or because of a race with grpc_call_combiner_start(). - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " queue returned no result; checking again"); - } - continue; - } - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s", - closure, grpc_error_string(closure->error_data.error)); - } - GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error); - break; - } - } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " queue empty"); - } -} - -void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure) { - while (true) { - // Decode original state. - gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); - grpc_error* original_error = decode_cancel_state_error(original_state); - // If error is set, invoke the cancellation closure immediately. - // Otherwise, store the new closure. - if (original_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error)); - break; - } else { - if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, - (gpr_atm)closure)) { - break; - } - } - // cas failed, try again. - } -} - -void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_error* error) { - while (true) { - gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); - grpc_error* original_error = decode_cancel_state_error(original_state); - if (original_error != GRPC_ERROR_NONE) { - GRPC_ERROR_UNREF(error); - break; - } - if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, - encode_cancel_state_error(error))) { - if (original_state != 0) { - grpc_closure* notify_on_cancel = (grpc_closure*)original_state; - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "call_combiner=%p: scheduling notify_on_cancel callback=%p", - call_combiner, notify_on_cancel); - } - GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error)); - } - break; - } - // cas failed, try again. - } -} |