aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c123
-rw-r--r--src/core/lib/surface/call.h32
-rw-r--r--src/core/lib/surface/channel.c18
-rw-r--r--src/core/lib/surface/completion_queue.c167
-rw-r--r--src/core/lib/surface/completion_queue.h3
-rw-r--r--src/core/lib/surface/init.c3
-rw-r--r--src/core/lib/surface/server.c33
7 files changed, 283 insertions, 96 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 119f5e82ab..97bfd587d2 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -230,33 +230,33 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error);
-grpc_call *grpc_call_create(
- grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
- grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative,
- const void *server_transport_data, grpc_mdelem **add_initial_metadata,
- size_t add_initial_metadata_count, gpr_timespec send_deadline) {
+grpc_error *grpc_call_create(const grpc_call_create_args *args,
+ grpc_call **out_call) {
size_t i, j;
- grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
+ grpc_channel_stack *channel_stack =
+ grpc_channel_get_channel_stack(args->channel);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_call *call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
- call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
+ *out_call = call =
+ gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
memset(call, 0, sizeof(grpc_call));
gpr_mu_init(&call->mu);
- call->channel = channel;
- call->cq = cq;
- call->parent = parent_call;
+ call->channel = args->channel;
+ call->cq = args->cq;
+ call->parent = args->parent_call;
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
- call->is_client = server_transport_data == NULL;
+ call->is_client = args->server_transport_data == NULL;
if (call->is_client) {
- GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT);
- for (i = 0; i < add_initial_metadata_count; i++) {
- call->send_extra_metadata[i].md = add_initial_metadata[i];
+ GPR_ASSERT(args->add_initial_metadata_count <
+ MAX_SEND_EXTRA_METADATA_COUNT);
+ for (i = 0; i < args->add_initial_metadata_count; i++) {
+ call->send_extra_metadata[i].md = args->add_initial_metadata[i];
}
- call->send_extra_metadata_count = (int)add_initial_metadata_count;
+ call->send_extra_metadata_count = (int)args->add_initial_metadata_count;
} else {
- GPR_ASSERT(add_initial_metadata_count == 0);
+ GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0;
}
for (i = 0; i < 2; i++) {
@@ -265,78 +265,79 @@ grpc_call *grpc_call_create(
}
}
call->send_deadline =
- gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CHANNEL_INTERNAL_REF(channel, "call");
+ gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
+ GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
grpc_error *error = grpc_call_stack_init(
&exec_ctx, channel_stack, 1, destroy_call, call, call->context,
- server_transport_data, CALL_STACK_FROM_CALL(call));
+ args->server_transport_data, CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) {
intptr_t status;
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status))
+ if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
status = GRPC_STATUS_UNKNOWN;
+ }
const char *error_str =
grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION);
close_with_status(&exec_ctx, call, (grpc_status_code)status,
error_str == NULL ? "unknown error" : error_str);
- GRPC_ERROR_UNREF(error);
}
- if (cq != NULL) {
+ if (args->cq != NULL) {
GPR_ASSERT(
- pollset_set_alternative == NULL &&
+ args->pollset_set_alternative == NULL &&
"Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
- GRPC_CQ_INTERNAL_REF(cq, "bind");
+ GRPC_CQ_INTERNAL_REF(args->cq, "bind");
call->pollent =
- grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
}
- if (pollset_set_alternative != NULL) {
- call->pollent =
- grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
+ if (args->pollset_set_alternative != NULL) {
+ call->pollent = grpc_polling_entity_create_from_pollset_set(
+ args->pollset_set_alternative);
}
if (!grpc_polling_entity_is_empty(&call->pollent)) {
grpc_call_stack_set_pollset_or_pollset_set(
&exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
- if (parent_call != NULL) {
- GRPC_CALL_INTERNAL_REF(parent_call, "child");
+ gpr_timespec send_deadline = args->send_deadline;
+ if (args->parent_call != NULL) {
+ GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
GPR_ASSERT(call->is_client);
- GPR_ASSERT(!parent_call->is_client);
+ GPR_ASSERT(!args->parent_call->is_client);
- gpr_mu_lock(&parent_call->mu);
+ gpr_mu_lock(&args->parent_call->mu);
- if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
+ if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
gpr_convert_clock_type(send_deadline,
- parent_call->send_deadline.clock_type),
- parent_call->send_deadline);
+ args->parent_call->send_deadline.clock_type),
+ args->parent_call->send_deadline);
}
/* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */
/* TODO(ctiller): This should change to use the appropriate census start_op
* call. */
- if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
- GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
- grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
- parent_call->context[GRPC_CONTEXT_TRACING].value,
- NULL);
+ if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
+ GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
+ grpc_call_context_set(
+ call, GRPC_CONTEXT_TRACING,
+ args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
} else {
- GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
+ GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
}
- if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
+ if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1;
}
- if (parent_call->first_child == NULL) {
- parent_call->first_child = call;
+ if (args->parent_call->first_child == NULL) {
+ args->parent_call->first_child = call;
call->sibling_next = call->sibling_prev = call;
} else {
- call->sibling_next = parent_call->first_child;
- call->sibling_prev = parent_call->first_child->sibling_prev;
+ call->sibling_next = args->parent_call->first_child;
+ call->sibling_prev = args->parent_call->first_child->sibling_prev;
call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
call;
}
- gpr_mu_unlock(&parent_call->mu);
+ gpr_mu_unlock(&args->parent_call->mu);
}
if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
0) {
@@ -344,7 +345,7 @@ grpc_call *grpc_call_create(
}
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_create", 0);
- return call;
+ return error;
}
void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
@@ -1089,9 +1090,14 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
+ grpc_error *error = bctl->error;
+ if (bctl->recv_final_op) {
+ GRPC_ERROR_UNREF(error);
+ error = GRPC_ERROR_NONE;
+ }
if (bctl->is_notify_tag_closure) {
/* unrefs bctl->error */
- grpc_exec_ctx_sched(exec_ctx, bctl->notify_tag, bctl->error, NULL);
+ grpc_closure_run(exec_ctx, bctl->notify_tag, error);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(uint8_t)(bctl->call->used_batches &
@@ -1100,7 +1106,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
- grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->error,
+ grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error,
finish_batch_completion, bctl, &bctl->cq_completion);
}
}
@@ -1257,6 +1263,14 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
}
}
+static void add_batch_error(batch_control *bctl, grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) return;
+ if (bctl->error == GRPC_ERROR_NONE) {
+ bctl->error = GRPC_ERROR_CREATE("Call batch operation failed");
+ }
+ bctl->error = grpc_error_add_child(bctl->error, error);
+}
+
static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
void *bctlp, grpc_error *error) {
batch_control *bctl = bctlp;
@@ -1264,9 +1278,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
- if (error != GRPC_ERROR_NONE) {
- bctl->error = GRPC_ERROR_REF(error);
- } else {
+ add_batch_error(bctl, GRPC_ERROR_REF(error));
+ if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_initial_filter, call);
@@ -1359,8 +1372,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
- GRPC_ERROR_UNREF(bctl->error);
- bctl->error = GRPC_ERROR_REF(error);
+ add_batch_error(bctl, GRPC_ERROR_REF(error));
gpr_mu_unlock(&call->mu);
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
@@ -1396,6 +1408,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *stream_op = &bctl->op;
memset(stream_op, 0, sizeof(*stream_op));
+ stream_op->covered_by_poller = true;
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 3a78fe3aa3..18af41b7fb 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -49,15 +49,29 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
grpc_call *call, int success,
void *user_data);
-grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
- uint32_t propagation_mask,
- grpc_completion_queue *cq,
- /* if not NULL, it'll be used in lieu of \a cq */
- grpc_pollset_set *pollset_set_alternative,
- const void *server_transport_data,
- grpc_mdelem **add_initial_metadata,
- size_t add_initial_metadata_count,
- gpr_timespec send_deadline);
+typedef struct grpc_call_create_args {
+ grpc_channel *channel;
+
+ grpc_call *parent_call;
+ uint32_t propagation_mask;
+
+ grpc_completion_queue *cq;
+ /* if not NULL, it'll be used in lieu of cq */
+ grpc_pollset_set *pollset_set_alternative;
+
+ const void *server_transport_data;
+
+ grpc_mdelem **add_initial_metadata;
+ size_t add_initial_metadata_count;
+
+ gpr_timespec send_deadline;
+} grpc_call_create_args;
+
+/* Create a new call based on \a args.
+ Regardless of success or failure, always returns a valid new call into *call
+ */
+grpc_error *grpc_call_create(const grpc_call_create_args *args,
+ grpc_call **call);
void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_completion_queue *cq);
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 52e78567bd..aa8c052b41 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -208,9 +208,21 @@ static grpc_call *grpc_channel_create_call_internal(
send_metadata[num_metadata++] = GRPC_MDELEM_REF(channel->default_authority);
}
- return grpc_call_create(channel, parent_call, propagation_mask, cq,
- pollset_set_alternative, NULL, send_metadata,
- num_metadata, deadline);
+ grpc_call_create_args args;
+ memset(&args, 0, sizeof(args));
+ args.channel = channel;
+ args.parent_call = parent_call;
+ args.propagation_mask = propagation_mask;
+ args.cq = cq;
+ args.pollset_set_alternative = pollset_set_alternative;
+ args.server_transport_data = NULL;
+ args.add_initial_metadata = send_metadata;
+ args.add_initial_metadata_count = num_metadata;
+ args.send_deadline = deadline;
+
+ grpc_call *call;
+ GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
+ return call;
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 5978884db8..4e0feb56ac 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -39,6 +39,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/pollset.h"
@@ -50,6 +51,9 @@
#include "src/core/lib/surface/event_string.h"
int grpc_trace_operation_failures;
+#ifndef NDEBUG
+int grpc_trace_pending_tags;
+#endif
typedef struct {
grpc_pollset_worker **worker;
@@ -67,6 +71,9 @@ struct grpc_completion_queue {
gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
+ /** counter of how many things have ever been queued on this completion queue
+ useful for avoiding locks to check the queue */
+ gpr_atm things_queued_ever;
/** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
@@ -121,15 +128,6 @@ void grpc_cq_global_shutdown(void) {
}
}
-struct grpc_cq_alarm {
- grpc_timer alarm;
- grpc_cq_completion completion;
- /** completion queue where events about this alarm will be posted */
- grpc_completion_queue *cq;
- /** user supplied tag */
- void *tag;
-};
-
grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc;
GPR_ASSERT(!reserved);
@@ -166,6 +164,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
+ gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
#endif
@@ -276,6 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(found);
#endif
shutdown = gpr_unref(&cc->pending_events);
+ gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1);
if (!shutdown) {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@@ -313,13 +313,66 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GRPC_ERROR_UNREF(error);
}
+typedef struct {
+ gpr_atm last_seen_things_queued_ever;
+ grpc_completion_queue *cq;
+ gpr_timespec deadline;
+ grpc_cq_completion *stolen_completion;
+ void *tag; /* for pluck */
+ bool first_loop;
+} cq_is_finished_arg;
+
+static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
+ cq_is_finished_arg *a = arg;
+ grpc_completion_queue *cq = a->cq;
+ GPR_ASSERT(a->stolen_completion == NULL);
+ gpr_atm current_last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (cq->completed_tail != &cq->completed_head) {
+ a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
+ cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
+ if (a->stolen_completion == cq->completed_tail) {
+ cq->completed_tail = &cq->completed_head;
+ }
+ gpr_mu_unlock(cq->mu);
+ return true;
+ }
+ gpr_mu_unlock(cq->mu);
+ }
+ return !a->first_loop &&
+ gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+}
+
+#ifndef NDEBUG
+static void dump_pending_tags(grpc_completion_queue *cc) {
+ if (!grpc_trace_pending_tags) return;
+
+ gpr_strvec v;
+ gpr_strvec_init(&v);
+ gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
+ for (size_t i = 0; i < cc->outstanding_tag_count; i++) {
+ char *s;
+ gpr_asprintf(&s, " %p", cc->outstanding_tags[i]);
+ gpr_strvec_add(&v, s);
+ }
+ char *out = gpr_strvec_flatten(&v, NULL);
+ gpr_strvec_destroy(&v);
+ gpr_log(GPR_DEBUG, "%s", out);
+ gpr_free(out);
+}
+#else
+static void dump_pending_tags(grpc_completion_queue *cc) {}
+#endif
+
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker *worker = NULL;
- int first_loop = 1;
gpr_timespec now;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -333,11 +386,33 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
reserved));
GPR_ASSERT(!reserved);
+ dump_pending_tags(cc);
+
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(cc->mu);
+ cq_is_finished_arg is_finished_arg = {
+ .last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cc->things_queued_ever),
+ .cq = cc,
+ .deadline = deadline,
+ .stolen_completion = NULL,
+ .tag = NULL,
+ .first_loop = true};
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
+ cq_is_next_finished, &is_finished_arg);
for (;;) {
+ if (is_finished_arg.stolen_completion != NULL) {
+ gpr_mu_unlock(cc->mu);
+ grpc_cq_completion *c = is_finished_arg.stolen_completion;
+ is_finished_arg.stolen_completion = NULL;
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(&exec_ctx, c->done_arg, c);
+ break;
+ }
if (cc->completed_tail != &cc->completed_head) {
grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
cc->completed_head.next = c->next & ~(uintptr_t)1;
@@ -358,13 +433,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
- first_loop = 0;
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
@@ -387,13 +462,16 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
}
+ is_finished_arg.first_loop = false;
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_next", 0);
@@ -424,6 +502,37 @@ static void del_plucker(grpc_completion_queue *cc, void *tag,
GPR_UNREACHABLE_CODE(return );
}
+static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
+ cq_is_finished_arg *a = arg;
+ grpc_completion_queue *cq = a->cq;
+ GPR_ASSERT(a->stolen_completion == NULL);
+ gpr_atm current_last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cq->things_queued_ever);
+ grpc_cq_completion *c;
+ grpc_cq_completion *prev = &cq->completed_head;
+ while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
+ &cq->completed_head) {
+ if (c->tag == a->tag) {
+ prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
+ if (c == cq->completed_tail) {
+ cq->completed_tail = prev;
+ }
+ gpr_mu_unlock(cq->mu);
+ a->stolen_completion = c;
+ return true;
+ }
+ prev = c;
+ }
+ gpr_mu_unlock(cq->mu);
+ }
+ return !a->first_loop &&
+ gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
+}
+
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
@@ -431,8 +540,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
- int first_loop = 1;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -448,11 +555,33 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
}
GPR_ASSERT(!reserved);
+ dump_pending_tags(cc);
+
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(cc->mu);
+ cq_is_finished_arg is_finished_arg = {
+ .last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cc->things_queued_ever),
+ .cq = cc,
+ .deadline = deadline,
+ .stolen_completion = NULL,
+ .tag = tag,
+ .first_loop = true};
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
+ cq_is_pluck_finished, &is_finished_arg);
for (;;) {
+ if (is_finished_arg.stolen_completion != NULL) {
+ gpr_mu_unlock(cc->mu);
+ c = is_finished_arg.stolen_completion;
+ is_finished_arg.stolen_completion = NULL;
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(&exec_ctx, c->done_arg, c);
+ break;
+ }
prev = &cc->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
&cc->completed_head) {
@@ -485,17 +614,18 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
- if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
+ if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
- first_loop = 0;
/* Check alarms - these are a global resource so we just ping
each time through on every pollset.
May update deadline to ensure timely wakeups.
@@ -518,15 +648,18 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
+ dump_pending_tags(cc);
break;
}
}
+ is_finished_arg.first_loop = false;
del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_pluck", 0);
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 3049284f68..e9d840df77 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -44,6 +44,9 @@
extern int grpc_cq_pluck_trace;
extern int grpc_cq_event_timeout_trace;
extern int grpc_trace_operation_failures;
+#ifndef NDEBUG
+extern int grpc_trace_pending_tags;
+#endif
typedef struct grpc_cq_completion {
/** user supplied tag */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index edda0c85fa..ac111253ef 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -173,6 +173,9 @@ void grpc_init(void) {
// Default timeout trace to 1
grpc_cq_event_timeout_trace = 1;
grpc_register_tracer("op_failure", &grpc_trace_operation_failures);
+#ifndef NDEBUG
+ grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
+#endif
grpc_security_pre_init();
grpc_iomgr_init();
grpc_executor_init();
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 56fb80e92e..9a9fcddb6e 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -280,6 +280,7 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
grpc_channel_element *elem;
op->send_goaway = send_goaway;
+ op->set_accept_stream = true;
sc->slice = gpr_slice_from_copied_string("Server shutdown");
op->goaway_message = &sc->slice;
op->goaway_status = GRPC_STATUS_OK;
@@ -439,6 +440,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
+ if (error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Disconnected client: %s", msg);
+ grpc_error_free_string(msg);
+ }
+ GRPC_ERROR_UNREF(error);
+
grpc_transport_op *op =
grpc_make_transport_op(&chand->finish_destroy_channel_closure);
op->set_accept_stream = true;
@@ -446,13 +454,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
op);
-
- if (error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "Disconnected client: %s", msg);
- grpc_error_free_string(msg);
- }
- GRPC_ERROR_UNREF(error);
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
@@ -773,8 +774,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1);
}
- grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv_initial_metadata, error,
- NULL);
+ grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error);
}
static void server_mutate_op(grpc_call_element *elem,
@@ -829,11 +829,20 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
const void *transport_server_data) {
channel_data *chand = cd;
/* create a call */
- grpc_call *call = grpc_call_create(chand->channel, NULL, 0, NULL, NULL,
- transport_server_data, NULL, 0,
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ grpc_call_create_args args;
+ memset(&args, 0, sizeof(args));
+ args.channel = chand->channel;
+ args.server_transport_data = transport_server_data;
+ args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ grpc_call *call;
+ grpc_error *error = grpc_call_create(&args, &call);
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ if (error != GRPC_ERROR_NONE) {
+ got_initial_metadata(exec_ctx, elem, error);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
call_data *calld = elem->call_data;
grpc_op op;
memset(&op, 0, sizeof(op));