aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r--src/core/lib/surface/completion_queue.c122
1 files changed, 96 insertions, 26 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 1f82c3bad2..5978884db8 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -48,7 +48,8 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/event_string.h"
-#include "src/core/lib/surface/surface_trace.h"
+
+int grpc_trace_operation_failures;
typedef struct {
grpc_pollset_worker **worker;
@@ -70,6 +71,8 @@ struct grpc_completion_queue {
int shutdown;
int shutdown_called;
int is_server_cq;
+ /** Can the server cq accept incoming channels */
+ int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
@@ -84,12 +87,24 @@ struct grpc_completion_queue {
};
#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
+#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1)
static gpr_mu g_freelist_mu;
static grpc_completion_queue *g_freelist;
+int grpc_cq_pluck_trace;
+int grpc_cq_event_timeout_trace;
+
+#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
+ if (grpc_api_trace && \
+ (grpc_cq_pluck_trace || (event)->type != GRPC_QUEUE_TIMEOUT)) { \
+ char *_ev = grpc_event_string(event); \
+ gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
+ gpr_free(_ev); \
+ }
+
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
- bool success);
+ grpc_error *error);
void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); }
@@ -149,6 +164,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->shutdown = 0;
cc->shutdown_called = 0;
cc->is_server_cq = 0;
+ cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
@@ -172,7 +188,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
}
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
grpc_completion_queue *cc = arg;
GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
}
@@ -215,7 +231,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
event, then enter shutdown mode */
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
- void *tag, int success,
+ void *tag, grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
@@ -227,16 +243,24 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
#endif
GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
- GRPC_API_TRACE(
- "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, "
- "done_arg=%p, storage=%p)",
- 7, (exec_ctx, cc, tag, success, done, done_arg, storage));
+ if (grpc_api_trace ||
+ (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) {
+ const char *errmsg = grpc_error_string(error);
+ GRPC_API_TRACE(
+ "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, "
+ "done_arg=%p, storage=%p)",
+ 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+ if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
+ }
+ grpc_error_free_string(errmsg);
+ }
storage->tag = tag;
storage->done = done;
storage->done_arg = done_arg;
- storage->next =
- ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0));
+ storage->next = ((uintptr_t)&cc->completed_head) |
+ ((uintptr_t)(error == GRPC_ERROR_NONE));
gpr_mu_lock(cc->mu);
#ifndef NDEBUG
@@ -263,8 +287,15 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
break;
}
}
- grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
+ grpc_error *kick_error =
+ grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
gpr_mu_unlock(cc->mu);
+ if (kick_error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(kick_error);
+ gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(kick_error);
+ }
} else {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@@ -278,6 +309,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
}
GPR_TIMER_END("grpc_cq_end_op", 0);
+
+ GRPC_ERROR_UNREF(error);
}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
@@ -293,10 +326,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
GRPC_API_TRACE(
"grpc_completion_queue_next("
"cc=%p, "
- "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
+ "deadline=gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 5, (cc, (long long)deadline.tv_sec, (int)deadline.tv_nsec,
- (int)deadline.clock_type, reserved));
+ 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ reserved));
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -343,8 +377,18 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu);
continue;
} else {
- grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
- iteration_deadline);
+ grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ &worker, now, iteration_deadline);
+ if (err != GRPC_ERROR_NONE) {
+ gpr_mu_unlock(cc->mu);
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(err);
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_TIMEOUT;
+ break;
+ }
}
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
@@ -392,13 +436,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
- GRPC_API_TRACE(
- "grpc_completion_queue_pluck("
- "cc=%p, tag=%p, "
- "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
- "reserved=%p)",
- 6, (cc, tag, (long long)deadline.tv_sec, (int)deadline.tv_nsec,
- (int)deadline.clock_type, reserved));
+ if (grpc_cq_pluck_trace) {
+ GRPC_API_TRACE(
+ "grpc_completion_queue_pluck("
+ "cc=%p, tag=%p, "
+ "deadline=gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
+ "reserved=%p)",
+ 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec,
+ (int)deadline.clock_type, reserved));
+ }
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -460,8 +507,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(cc->mu);
} else {
- grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
- iteration_deadline);
+ grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ &worker, now, iteration_deadline);
+ if (err != GRPC_ERROR_NONE) {
+ del_plucker(cc, tag, &worker);
+ gpr_mu_unlock(cc->mu);
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(err);
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_TIMEOUT;
+ break;
+ }
}
del_plucker(cc, tag, &worker);
}
@@ -511,6 +569,18 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return POLLSET_FROM_CQ(cc);
}
+grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
+ return CQ_FROM_POLLSET(ps);
+}
+
+void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
+ cc->is_non_listening_server_cq = 1;
+}
+
+bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
+ return (cc->is_non_listening_server_cq == 1);
+}
+
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }