diff options
author | Craig Tiller <ctiller@google.com> | 2017-08-31 12:24:15 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-08-31 12:24:15 -0700 |
commit | 890f542498a8af4c05f22e42d818f3b0eeafaea8 (patch) | |
tree | 4ccf813375c1744629ed43c5c5eaa62fbdffe7ac /src/core/lib/iomgr | |
parent | 5489d41c15926abbf12a5b8d27b24d1d605d7f0f (diff) | |
parent | ccad38227f63797318d7cffcba8a2df783394ccd (diff) |
Merge branch 'stats' into stats_histo
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/call_combiner.c | 180 | ||||
-rw-r--r-- | src/core/lib/iomgr/call_combiner.h | 104 |
2 files changed, 0 insertions, 284 deletions
diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c deleted file mode 100644 index 899f98552d..0000000000 --- a/src/core/lib/iomgr/call_combiner.c +++ /dev/null @@ -1,180 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/call_combiner.h" - -#include <grpc/support/log.h> - -grpc_tracer_flag grpc_call_combiner_trace = - GRPC_TRACER_INITIALIZER(false, "call_combiner"); - -static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) { - if (cancel_state & 1) { - return (grpc_error*)(cancel_state & ~(gpr_atm)1); - } - return GRPC_ERROR_NONE; -} - -static gpr_atm encode_cancel_state_error(grpc_error* error) { - return (gpr_atm)1 | (gpr_atm)error; -} - -void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { - gpr_mpscq_init(&call_combiner->queue); -} - -void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { - gpr_mpscq_destroy(&call_combiner->queue); - GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state)); -} - -#ifndef NDEBUG -#define DEBUG_ARGS , const char *file, int line -#define DEBUG_FMT_STR "%s:%d: " -#define DEBUG_FMT_ARGS , file, line -#else -#define DEBUG_ARGS -#define DEBUG_FMT_STR -#define DEBUG_FMT_ARGS -#endif - -void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure, - grpc_error* error DEBUG_ARGS, - const char* reason) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR - "%s] error=%s", - call_combiner, closure DEBUG_FMT_ARGS, reason, - grpc_error_string(error)); - } - size_t prev_size = - (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, - prev_size + 1); - } - if (prev_size == 0) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY"); - } - // Queue was empty, so execute this closure immediately. - GRPC_CLOSURE_SCHED(exec_ctx, closure, error); - } else { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_INFO, " QUEUING"); - } - // Queue was not empty, so add closure to queue. - closure->error_data.error = error; - gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure); - } -} - -void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner DEBUG_ARGS, - const char* reason) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", - call_combiner DEBUG_FMT_ARGS, reason); - } - size_t prev_size = - (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1); - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, - prev_size - 1); - } - GPR_ASSERT(prev_size >= 1); - if (prev_size > 1) { - while (true) { - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " checking queue"); - } - bool empty; - grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end( - &call_combiner->queue, &empty); - if (closure == NULL) { - // This can happen either due to a race condition within the mpscq - // code or because of a race with grpc_call_combiner_start(). - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " queue returned no result; checking again"); - } - continue; - } - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s", - closure, grpc_error_string(closure->error_data.error)); - } - GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error); - break; - } - } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, " queue empty"); - } -} - -void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure) { - while (true) { - // Decode original state. - gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); - grpc_error* original_error = decode_cancel_state_error(original_state); - // If error is set, invoke the cancellation closure immediately. - // Otherwise, store the new closure. - if (original_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error)); - break; - } else { - if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, - (gpr_atm)closure)) { - break; - } - } - // cas failed, try again. - } -} - -void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_error* error) { - while (true) { - gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); - grpc_error* original_error = decode_cancel_state_error(original_state); - if (original_error != GRPC_ERROR_NONE) { - GRPC_ERROR_UNREF(error); - break; - } - if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, - encode_cancel_state_error(error))) { - if (original_state != 0) { - grpc_closure* notify_on_cancel = (grpc_closure*)original_state; - if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { - gpr_log(GPR_DEBUG, - "call_combiner=%p: scheduling notify_on_cancel callback=%p", - call_combiner, notify_on_cancel); - } - GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error)); - } - break; - } - // cas failed, try again. - } -} diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h deleted file mode 100644 index 621e2c3669..0000000000 --- a/src/core/lib/iomgr/call_combiner.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H -#define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H - -#include <stddef.h> - -#include <grpc/support/atm.h> - -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/support/mpscq.h" - -// A simple, lock-free mechanism for serializing activity related to a -// single call. This is similar to a combiner but is more lightweight. -// -// It requires the callback (or, in the common case where the callback -// actually kicks off a chain of callbacks, the last callback in that -// chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP()) -// when it is done with the action that was kicked off by the original -// callback. - -extern grpc_tracer_flag grpc_call_combiner_trace; - -typedef struct { - gpr_atm size; // size_t, num closures in queue or currently executing - gpr_mpscq queue; - // Either 0 (if not cancelled and no cancellation closure set), - // a grpc_closure* (if the lowest bit is 0), - // or a grpc_error* (if the lowest bit is 1). - gpr_atm cancel_state; -} grpc_call_combiner; - -// Assumes memory was initialized to zero. -void grpc_call_combiner_init(grpc_call_combiner* call_combiner); - -void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner); - -#ifndef NDEBUG -#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \ - reason) \ - grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \ - __FILE__, __LINE__, (reason)) -#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \ - grpc_call_combiner_stop((exec_ctx), (call_combiner), __FILE__, __LINE__, \ - (reason)) -/// Starts processing \a closure on \a call_combiner. -void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure, grpc_error* error, - const char* file, int line, const char* reason); -/// Yields the call combiner to the next closure in the queue, if any. -void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - const char* file, int line, const char* reason); -#else -#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \ - reason) \ - grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \ - (reason)) -#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \ - grpc_call_combiner_stop((exec_ctx), (call_combiner), (reason)) -/// Starts processing \a closure on \a call_combiner. -void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure, grpc_error* error, - const char* reason); -/// Yields the call combiner to the next closure in the queue, if any. -void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - const char* reason); -#endif - -/// Tells \a call_combiner to invoke \a closure when -/// grpc_call_combiner_cancel() is called. If grpc_call_combiner_cancel() -/// was previously called, \a closure will be invoked immediately. -/// If \a closure is NULL, then no closure will be invoked on -/// cancellation; this effectively unregisters the previously set closure. -void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_closure* closure); - -/// Indicates that the call has been cancelled. -void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, - grpc_call_combiner* call_combiner, - grpc_error* error); - -#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ |