aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/call_combiner.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/call_combiner.cc')
-rw-r--r--src/core/lib/iomgr/call_combiner.cc204
1 files changed, 204 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc
new file mode 100644
index 0000000000..bab3df021a
--- /dev/null
+++ b/src/core/lib/iomgr/call_combiner.cc
@@ -0,0 +1,204 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/call_combiner.h"
+
+#include <inttypes.h>
+
+#include <grpc/support/log.h>
+
+grpc_tracer_flag grpc_call_combiner_trace =
+ GRPC_TRACER_INITIALIZER(false, "call_combiner");
+
+static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
+ if (cancel_state & 1) {
+ return (grpc_error*)(cancel_state & ~(gpr_atm)1);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static gpr_atm encode_cancel_state_error(grpc_error* error) {
+ return (gpr_atm)1 | (gpr_atm)error;
+}
+
+void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
+ gpr_mpscq_init(&call_combiner->queue);
+}
+
+void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
+ gpr_mpscq_destroy(&call_combiner->queue);
+ GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
+}
+
+#ifndef NDEBUG
+#define DEBUG_ARGS , const char *file, int line
+#define DEBUG_FMT_STR "%s:%d: "
+#define DEBUG_FMT_ARGS , file, line
+#else
+#define DEBUG_ARGS
+#define DEBUG_FMT_STR
+#define DEBUG_FMT_ARGS
+#endif
+
+void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure,
+ grpc_error* error DEBUG_ARGS,
+ const char* reason) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
+ "%s] error=%s",
+ call_combiner, closure DEBUG_FMT_ARGS, reason,
+ grpc_error_string(error));
+ }
+ size_t prev_size =
+ (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1);
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
+ prev_size + 1);
+ }
+ if (prev_size == 0) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
+ }
+ // Queue was empty, so execute this closure immediately.
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
+ } else {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_INFO, " QUEUING");
+ }
+ // Queue was not empty, so add closure to queue.
+ closure->error_data.error = error;
+ gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
+ }
+}
+
+void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner DEBUG_ARGS,
+ const char* reason) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
+ call_combiner DEBUG_FMT_ARGS, reason);
+ }
+ size_t prev_size =
+ (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1);
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
+ prev_size - 1);
+ }
+ GPR_ASSERT(prev_size >= 1);
+ if (prev_size > 1) {
+ while (true) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " checking queue");
+ }
+ bool empty;
+ grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end(
+ &call_combiner->queue, &empty);
+ if (closure == NULL) {
+ // This can happen either due to a race condition within the mpscq
+ // code or because of a race with grpc_call_combiner_start().
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " queue returned no result; checking again");
+ }
+ continue;
+ }
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s",
+ closure, grpc_error_string(closure->error_data.error));
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error);
+ break;
+ }
+ } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " queue empty");
+ }
+}
+
+void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure) {
+ while (true) {
+ // Decode original state.
+ gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
+ grpc_error* original_error = decode_cancel_state_error(original_state);
+ // If error is set, invoke the cancellation closure immediately.
+ // Otherwise, store the new closure.
+ if (original_error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "call_combiner=%p: scheduling notify_on_cancel callback=%p "
+ "for pre-existing cancellation",
+ call_combiner, closure);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error));
+ break;
+ } else {
+ if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
+ (gpr_atm)closure)) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, "call_combiner=%p: setting notify_on_cancel=%p",
+ call_combiner, closure);
+ }
+ // If we replaced an earlier closure, invoke the original
+ // closure with GRPC_ERROR_NONE. This allows callers to clean
+ // up any resources they may be holding for the callback.
+ if (original_state != 0) {
+ closure = (grpc_closure*)original_state;
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "call_combiner=%p: scheduling old cancel callback=%p",
+ call_combiner, closure);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
+ }
+ break;
+ }
+ }
+ // cas failed, try again.
+ }
+}
+
+void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_error* error) {
+ while (true) {
+ gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
+ grpc_error* original_error = decode_cancel_state_error(original_state);
+ if (original_error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(error);
+ break;
+ }
+ if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
+ encode_cancel_state_error(error))) {
+ if (original_state != 0) {
+ grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "call_combiner=%p: scheduling notify_on_cancel callback=%p",
+ call_combiner, notify_on_cancel);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error));
+ }
+ break;
+ }
+ // cas failed, try again.
+ }
+}