aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-31 12:24:15 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-31 12:24:15 -0700
commit890f542498a8af4c05f22e42d818f3b0eeafaea8 (patch)
tree4ccf813375c1744629ed43c5c5eaa62fbdffe7ac /src/core/lib/iomgr
parent5489d41c15926abbf12a5b8d27b24d1d605d7f0f (diff)
parentccad38227f63797318d7cffcba8a2df783394ccd (diff)
Merge branch 'stats' into stats_histo
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/call_combiner.c180
-rw-r--r--src/core/lib/iomgr/call_combiner.h104
2 files changed, 0 insertions, 284 deletions
diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c
deleted file mode 100644
index 899f98552d..0000000000
--- a/src/core/lib/iomgr/call_combiner.c
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/iomgr/call_combiner.h"
-
-#include <grpc/support/log.h>
-
-grpc_tracer_flag grpc_call_combiner_trace =
- GRPC_TRACER_INITIALIZER(false, "call_combiner");
-
-static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
- if (cancel_state & 1) {
- return (grpc_error*)(cancel_state & ~(gpr_atm)1);
- }
- return GRPC_ERROR_NONE;
-}
-
-static gpr_atm encode_cancel_state_error(grpc_error* error) {
- return (gpr_atm)1 | (gpr_atm)error;
-}
-
-void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
- gpr_mpscq_init(&call_combiner->queue);
-}
-
-void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
- gpr_mpscq_destroy(&call_combiner->queue);
- GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
-}
-
-#ifndef NDEBUG
-#define DEBUG_ARGS , const char *file, int line
-#define DEBUG_FMT_STR "%s:%d: "
-#define DEBUG_FMT_ARGS , file, line
-#else
-#define DEBUG_ARGS
-#define DEBUG_FMT_STR
-#define DEBUG_FMT_ARGS
-#endif
-
-void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure,
- grpc_error* error DEBUG_ARGS,
- const char* reason) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
- "%s] error=%s",
- call_combiner, closure DEBUG_FMT_ARGS, reason,
- grpc_error_string(error));
- }
- size_t prev_size =
- (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
- prev_size + 1);
- }
- if (prev_size == 0) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
- }
- // Queue was empty, so execute this closure immediately.
- GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
- } else {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_INFO, " QUEUING");
- }
- // Queue was not empty, so add closure to queue.
- closure->error_data.error = error;
- gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
- }
-}
-
-void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner DEBUG_ARGS,
- const char* reason) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
- call_combiner DEBUG_FMT_ARGS, reason);
- }
- size_t prev_size =
- (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
- prev_size - 1);
- }
- GPR_ASSERT(prev_size >= 1);
- if (prev_size > 1) {
- while (true) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " checking queue");
- }
- bool empty;
- grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end(
- &call_combiner->queue, &empty);
- if (closure == NULL) {
- // This can happen either due to a race condition within the mpscq
- // code or because of a race with grpc_call_combiner_start().
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " queue returned no result; checking again");
- }
- continue;
- }
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s",
- closure, grpc_error_string(closure->error_data.error));
- }
- GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error);
- break;
- }
- } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " queue empty");
- }
-}
-
-void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure) {
- while (true) {
- // Decode original state.
- gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
- grpc_error* original_error = decode_cancel_state_error(original_state);
- // If error is set, invoke the cancellation closure immediately.
- // Otherwise, store the new closure.
- if (original_error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error));
- break;
- } else {
- if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
- (gpr_atm)closure)) {
- break;
- }
- }
- // cas failed, try again.
- }
-}
-
-void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_error* error) {
- while (true) {
- gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
- grpc_error* original_error = decode_cancel_state_error(original_state);
- if (original_error != GRPC_ERROR_NONE) {
- GRPC_ERROR_UNREF(error);
- break;
- }
- if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
- encode_cancel_state_error(error))) {
- if (original_state != 0) {
- grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "call_combiner=%p: scheduling notify_on_cancel callback=%p",
- call_combiner, notify_on_cancel);
- }
- GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error));
- }
- break;
- }
- // cas failed, try again.
- }
-}
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
deleted file mode 100644
index 621e2c3669..0000000000
--- a/src/core/lib/iomgr/call_combiner.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
-#define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
-
-#include <stddef.h>
-
-#include <grpc/support/atm.h>
-
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/support/mpscq.h"
-
-// A simple, lock-free mechanism for serializing activity related to a
-// single call. This is similar to a combiner but is more lightweight.
-//
-// It requires the callback (or, in the common case where the callback
-// actually kicks off a chain of callbacks, the last callback in that
-// chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
-// when it is done with the action that was kicked off by the original
-// callback.
-
-extern grpc_tracer_flag grpc_call_combiner_trace;
-
-typedef struct {
- gpr_atm size; // size_t, num closures in queue or currently executing
- gpr_mpscq queue;
- // Either 0 (if not cancelled and no cancellation closure set),
- // a grpc_closure* (if the lowest bit is 0),
- // or a grpc_error* (if the lowest bit is 1).
- gpr_atm cancel_state;
-} grpc_call_combiner;
-
-// Assumes memory was initialized to zero.
-void grpc_call_combiner_init(grpc_call_combiner* call_combiner);
-
-void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner);
-
-#ifndef NDEBUG
-#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \
- reason) \
- grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
- __FILE__, __LINE__, (reason))
-#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \
- grpc_call_combiner_stop((exec_ctx), (call_combiner), __FILE__, __LINE__, \
- (reason))
-/// Starts processing \a closure on \a call_combiner.
-void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure, grpc_error* error,
- const char* file, int line, const char* reason);
-/// Yields the call combiner to the next closure in the queue, if any.
-void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- const char* file, int line, const char* reason);
-#else
-#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \
- reason) \
- grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
- (reason))
-#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \
- grpc_call_combiner_stop((exec_ctx), (call_combiner), (reason))
-/// Starts processing \a closure on \a call_combiner.
-void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure, grpc_error* error,
- const char* reason);
-/// Yields the call combiner to the next closure in the queue, if any.
-void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- const char* reason);
-#endif
-
-/// Tells \a call_combiner to invoke \a closure when
-/// grpc_call_combiner_cancel() is called. If grpc_call_combiner_cancel()
-/// was previously called, \a closure will be invoked immediately.
-/// If \a closure is NULL, then no closure will be invoked on
-/// cancellation; this effectively unregisters the previously set closure.
-void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure);
-
-/// Indicates that the call has been cancelled.
-void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_error* error);
-
-#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */