aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-10-12 16:43:02 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-10-12 16:43:02 -0700
commitf03a8f8e40587cff4af3d23be6b87e366fbc3659 (patch)
treee74ba393adeb45c85bae404fa82f7227b37649d0 /src/core
parent1c3dd002b9c1af714dc45050f49e9fd59e384abc (diff)
parentfba6475805eb85e781da0b1428869c72fe751b4e (diff)
Merge branch 'chttp2_timer' into timer
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc55
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc669
-rw-r--r--src/core/lib/debug/stats_data.cc18
-rw-r--r--src/core/lib/debug/stats_data.h27
-rw-r--r--src/core/lib/debug/stats_data.yaml21
-rw-r--r--src/core/lib/debug/stats_data_bq_schema.sql9
-rw-r--r--src/core/lib/iomgr/call_combiner.cc5
-rw-r--r--src/core/lib/iomgr/combiner.cc1
-rw-r--r--src/core/lib/iomgr/ev_posix.cc31
-rw-r--r--src/core/lib/iomgr/port.h10
-rw-r--r--src/core/lib/profiling/basic_timers.cc8
-rw-r--r--src/core/lib/support/time_posix.cc2
-rw-r--r--src/core/lib/surface/completion_queue.cc15
13 files changed, 445 insertions, 426 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 53fa0fff04..ffd58129c6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -345,9 +345,6 @@ typedef struct glb_lb_policy {
/** are we currently updating lb_call? */
bool updating_lb_call;
- /** are we currently updating lb_channel? */
- bool updating_lb_channel;
-
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
@@ -360,9 +357,6 @@ typedef struct glb_lb_policy {
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
- /** args from the latest update received while already updating, or NULL */
- grpc_lb_policy_args *pending_update_args;
-
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
@@ -982,10 +976,6 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
grpc_subchannel_index_unref();
- if (glb_policy->pending_update_args != NULL) {
- grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
- gpr_free(glb_policy->pending_update_args);
- }
gpr_free(glb_policy);
}
@@ -1752,45 +1742,22 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
const grpc_lb_addresses *addresses =
(const grpc_lb_addresses *)arg->value.pointer.p;
-
+ // If a non-empty serverlist hasn't been received from the balancer,
+ // propagate the update to fallback_backend_addresses.
if (glb_policy->serverlist == NULL) {
- // If a non-empty serverlist hasn't been received from the balancer,
- // propagate the update to fallback_backend_addresses.
fallback_update_locked(exec_ctx, glb_policy, addresses);
- } else if (glb_policy->updating_lb_channel) {
- // If we have recieved serverlist from the balancer, we need to defer update
- // when there is an in-progress one.
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO,
- "Update already in progress for grpclb %p. Deferring update.",
- (void *)glb_policy);
- }
- if (glb_policy->pending_update_args != NULL) {
- grpc_channel_args_destroy(exec_ctx,
- glb_policy->pending_update_args->args);
- gpr_free(glb_policy->pending_update_args);
- }
- glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc(
- sizeof(*glb_policy->pending_update_args));
- glb_policy->pending_update_args->client_channel_factory =
- args->client_channel_factory;
- glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
- glb_policy->pending_update_args->combiner = args->combiner;
- return;
}
-
- glb_policy->updating_lb_channel = true;
GPR_ASSERT(glb_policy->lb_channel != NULL);
+ // Propagate updates to the LB channel (pick_first) through the fake
+ // resolver.
grpc_channel_args *lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
- /* Propagate updates to the LB channel (pick first) through the fake resolver
- */
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
-
+ // Start watching the LB channel connectivity for connection, if not
+ // already doing so.
if (!glb_policy->watching_lb_channel) {
- // Watch the LB channel connectivity for connection.
glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
glb_policy->lb_channel, true /* try to connect */);
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
@@ -1842,18 +1809,10 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
/* fallthrough */
case GRPC_CHANNEL_READY:
if (glb_policy->lb_call != NULL) {
- glb_policy->updating_lb_channel = false;
glb_policy->updating_lb_call = true;
grpc_call_cancel(glb_policy->lb_call, NULL);
- // lb_on_server_status_received will pick up the cancel and reinit
+ // lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
- if (glb_policy->pending_update_args != NULL) {
- grpc_lb_policy_args *args = glb_policy->pending_update_args;
- glb_policy->pending_update_args = NULL;
- glb_update_locked(exec_ctx, &glb_policy->base, args);
- grpc_channel_args_destroy(exec_ctx, args->args);
- gpr_free(args);
- }
} else if (glb_policy->started_picking && !glb_policy->shutting_down) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 1001d74c22..67a8358927 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -62,96 +62,22 @@ typedef struct inproc_transport {
struct inproc_stream *stream_list;
} inproc_transport;
-typedef struct sb_list_entry {
- grpc_slice_buffer sb;
- struct sb_list_entry *next;
-} sb_list_entry;
-
-// Specialize grpc_byte_stream for our use case
-typedef struct {
- grpc_byte_stream base;
- sb_list_entry *le;
- grpc_error *shutdown_error;
-} inproc_slice_byte_stream;
-
-typedef struct {
- // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases
- sb_list_entry *head;
- sb_list_entry *tail;
-} slice_buffer_list;
-
-static void slice_buffer_list_init(slice_buffer_list *l) {
- l->head = NULL;
- l->tail = NULL;
-}
-
-static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) {
- grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb);
- gpr_free(le);
-}
-
-static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx,
- slice_buffer_list *l) {
- sb_list_entry *curr = l->head;
- while (curr != NULL) {
- sb_list_entry *le = curr;
- curr = curr->next;
- sb_list_entry_destroy(exec_ctx, le);
- }
- l->head = NULL;
- l->tail = NULL;
-}
-
-static bool slice_buffer_list_empty(slice_buffer_list *l) {
- return l->head == NULL;
-}
-
-static void slice_buffer_list_append_entry(slice_buffer_list *l,
- sb_list_entry *next) {
- next->next = NULL;
- if (l->tail) {
- l->tail->next = next;
- l->tail = next;
- } else {
- l->head = next;
- l->tail = next;
- }
-}
-
-static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) {
- sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next));
- grpc_slice_buffer_init(&next->sb);
- slice_buffer_list_append_entry(l, next);
- return &next->sb;
-}
-
-static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) {
- sb_list_entry *ret = l->head;
- l->head = l->head->next;
- if (l->head == NULL) {
- l->tail = NULL;
- }
- return ret;
-}
-
typedef struct inproc_stream {
inproc_transport *t;
grpc_metadata_batch to_read_initial_md;
uint32_t to_read_initial_md_flags;
bool to_read_initial_md_filled;
- slice_buffer_list to_read_message;
grpc_metadata_batch to_read_trailing_md;
bool to_read_trailing_md_filled;
- bool reads_needed;
- bool read_closure_scheduled;
- grpc_closure read_closure;
+ bool ops_needed;
+ bool op_closure_scheduled;
+ grpc_closure op_closure;
// Write buffer used only during gap at init time when client-side
// stream is set up but server side stream is not yet set up
grpc_metadata_batch write_buffer_initial_md;
bool write_buffer_initial_md_filled;
uint32_t write_buffer_initial_md_flags;
grpc_millis write_buffer_deadline;
- slice_buffer_list write_buffer_message;
grpc_metadata_batch write_buffer_trailing_md;
bool write_buffer_trailing_md_filled;
grpc_error *write_buffer_cancel_error;
@@ -164,11 +90,15 @@ typedef struct inproc_stream {
gpr_arena *arena;
+ grpc_transport_stream_op_batch *send_message_op;
+ grpc_transport_stream_op_batch *send_trailing_md_op;
grpc_transport_stream_op_batch *recv_initial_md_op;
grpc_transport_stream_op_batch *recv_message_op;
grpc_transport_stream_op_batch *recv_trailing_md_op;
- inproc_slice_byte_stream recv_message_stream;
+ grpc_slice_buffer recv_message;
+ grpc_slice_buffer_stream recv_stream;
+ bool recv_inited;
bool initial_md_sent;
bool trailing_md_sent;
@@ -187,54 +117,11 @@ typedef struct inproc_stream {
struct inproc_stream *stream_list_next;
} inproc_stream;
-static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs, size_t max,
- grpc_closure *on_complete) {
- // Because inproc transport always provides the entire message atomically,
- // the byte stream always has data available when this function is called.
- // Thus, this function always returns true (unlike other transports) and
- // there is never any need to schedule a closure
- return true;
-}
-
-static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs,
- grpc_slice *slice) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- if (stream->shutdown_error != GRPC_ERROR_NONE) {
- return GRPC_ERROR_REF(stream->shutdown_error);
- }
- *slice = grpc_slice_buffer_take_first(&stream->le->sb);
- return GRPC_ERROR_NONE;
-}
-
-static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs,
- grpc_error *error) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- GRPC_ERROR_UNREF(stream->shutdown_error);
- stream->shutdown_error = error;
-}
-
-static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- sb_list_entry_destroy(exec_ctx, stream->le);
- GRPC_ERROR_UNREF(stream->shutdown_error);
-}
-
-static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
- inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
- inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
-
-void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
- sb_list_entry *le) {
- s->base.length = (uint32_t)le->sb.length;
- s->base.flags = 0;
- s->base.vtable = &inproc_slice_byte_stream_vtable;
- s->le = le;
- s->shutdown_error = GRPC_ERROR_NONE;
-}
+static grpc_closure do_nothing_closure;
+static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
+ grpc_error *error);
+static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
static void ref_transport(inproc_transport *t) {
INPROC_LOG(GPR_DEBUG, "ref_transport %p", t);
@@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s,
static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s);
- slice_buffer_list_destroy(exec_ctx, &s->to_read_message);
- slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message);
GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
GRPC_ERROR_UNREF(s->cancel_self_error);
GRPC_ERROR_UNREF(s->cancel_other_error);
+ if (s->recv_inited) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message);
+ }
+
unref_transport(exec_ctx, s->t);
if (s->closure_at_destroy) {
@@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
}
}
-static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-
static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client,
bool is_initial) {
for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL;
@@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->write_buffer_initial_md_filled = false;
grpc_metadata_batch_init(&s->write_buffer_trailing_md);
s->write_buffer_trailing_md_filled = false;
- slice_buffer_list_init(&s->to_read_message);
- slice_buffer_list_init(&s->write_buffer_message);
- s->reads_needed = false;
- s->read_closure_scheduled = false;
- GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s,
+ s->ops_needed = false;
+ s->op_closure_scheduled = false;
+ GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
grpc_schedule_on_exec_ctx);
s->t = t;
s->closure_at_destroy = NULL;
@@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md);
cs->write_buffer_initial_md_filled = false;
}
- while (!slice_buffer_list_empty(&cs->write_buffer_message)) {
- slice_buffer_list_append_entry(
- &s->to_read_message,
- slice_buffer_list_pophead(&cs->write_buffer_message));
- }
if (cs->write_buffer_trailing_md_filled) {
fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0,
&s->to_read_trailing_md, NULL,
@@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
}
}
+// Call the on_complete closure associated with this stream_op_batch if
+// this stream_op_batch is only one of the pending operations for this
+// stream. This is called when one of the pending operations for the stream
+// is done and about to be NULLed out
+static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *s, grpc_error *error,
+ grpc_transport_stream_op_batch *op,
+ const char *msg) {
+ int is_sm = (int)(op == s->send_message_op);
+ int is_stm = (int)(op == s->send_trailing_md_op);
+ int is_rim = (int)(op == s->recv_initial_md_op);
+ int is_rm = (int)(op == s->recv_message_op);
+ int is_rtm = (int)(op == s->recv_trailing_md_op);
+
+ if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
+ INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error));
+ }
+}
+
+static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *s,
+ grpc_error *error) {
+ if (s && s->ops_needed && !s->op_closure_scheduled) {
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error));
+ s->op_closure_scheduled = true;
+ s->ops_needed = false;
+ }
+}
+
static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error) {
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s);
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
if (!s->trailing_md_sent) {
@@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(error);
}
- if (other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
- GRPC_ERROR_REF(error));
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, other, error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
}
@@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
err);
// Last use of err so no need to REF and then UNREF it
- if ((s->recv_initial_md_op != s->recv_message_op) &&
- (s->recv_initial_md_op != s->recv_trailing_md_op)) {
- INPROC_LOG(GPR_DEBUG,
- "fail_helper %p scheduling initial-metadata-on-complete %p",
- error, s);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
- GRPC_ERROR_REF(error));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_initial_md_op,
+ "fail_helper scheduling recv-initial-metadata-on-complete");
s->recv_initial_md_op = NULL;
}
if (s->recv_message_op) {
@@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p",
- s, error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(error));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_message_op,
+ "fail_helper scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
+ if (s->send_message_op) {
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->send_message_op,
+ "fail_helper scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ if (s->send_trailing_md_op) {
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->send_trailing_md_op,
+ "fail_helper scheduling send-trailng-md-on-complete");
+ s->send_trailing_md_op = NULL;
+ }
if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling trailing-md-on-complete %p", s,
error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_REF(error));
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_trailing_md_op,
+ "fail_helper scheduling recv-trailing-metadata-on-complete");
s->recv_trailing_md_op = NULL;
}
close_other_side_locked(exec_ctx, s, "fail_helper:other_side");
@@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_ERROR_UNREF(error);
}
-static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *sender,
+ inproc_stream *receiver) {
+ size_t remaining =
+ sender->send_message_op->payload->send_message.send_message->length;
+ if (receiver->recv_inited) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message);
+ }
+ grpc_slice_buffer_init(&receiver->recv_message);
+ receiver->recv_inited = true;
+ do {
+ grpc_slice message_slice;
+ grpc_closure unused;
+ GPR_ASSERT(grpc_byte_stream_next(
+ exec_ctx, sender->send_message_op->payload->send_message.send_message,
+ SIZE_MAX, &unused));
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, sender->send_message_op->payload->send_message.send_message,
+ &message_slice);
+ if (error != GRPC_ERROR_NONE) {
+ cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error));
+ break;
+ }
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ remaining -= GRPC_SLICE_LENGTH(message_slice);
+ grpc_slice_buffer_add(&receiver->recv_message, message_slice);
+ } while (remaining > 0);
+
+ grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
+ 0);
+ *receiver->recv_message_op->payload->recv_message.recv_message =
+ &receiver->recv_stream.base;
+ INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
+ receiver);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ receiver->recv_message_op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_NONE);
+ complete_if_batch_end_locked(
+ exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op,
+ "message_transfer scheduling sender on_complete");
+ complete_if_batch_end_locked(
+ exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
+ "message_transfer scheduling receiver on_complete");
+
+ receiver->recv_message_op = NULL;
+ sender->send_message_op = NULL;
+}
+
+static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
- // and then return to reads_needed state if still needed
+ // and then return to ops_needed state if still needed
// Since this is a closure directly invoked by the combiner, it should not
// unref the error parameter explicitly; the combiner will do that implicitly
@@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
bool needs_close = false;
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg);
inproc_stream *s = (inproc_stream *)arg;
gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed
gpr_mu_lock(mu);
- s->read_closure_scheduled = false;
+ s->op_closure_scheduled = false;
// cancellation takes precedence
+ inproc_stream *other = s->other_side;
+
if (s->cancel_self_error != GRPC_ERROR_NONE) {
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error));
goto done;
@@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
goto done;
}
- if (s->recv_initial_md_op) {
- if (!s->to_read_initial_md_filled) {
- // We entered the state machine on some other kind of read even though
- // we still haven't satisfied initial md . That's an error.
- new_err =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing");
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for no "
- "initial md %p",
- s, new_err);
+ if (s->send_message_op && other) {
+ if (other->recv_message_op) {
+ message_transfer_locked(exec_ctx, s, other);
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ } else if (!s->t->is_client &&
+ (s->trailing_md_sent || other->recv_trailing_md_op)) {
+ // A server send will never be matched if the client is waiting
+ // for trailing metadata already
+ complete_if_batch_end_locked(
+ exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ }
+ // Pause a send trailing metadata if there is still an outstanding
+ // send message unless we know that the send message will never get
+ // matched to a receive. This happens on the client if the server has
+ // already sent status.
+ if (s->send_trailing_md_op &&
+ (!s->send_message_op ||
+ (s->t->is_client &&
+ (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
+ grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
+ : &other->to_read_trailing_md;
+ bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
+ : &other->to_read_trailing_md_filled;
+ if (*destfilled || s->trailing_md_sent) {
+ // The buffer is already in use; that's an error!
+ INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
+ new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
- } else if (s->initial_md_recvd) {
+ } else {
+ if (other && !other->closed) {
+ fill_in_metadata(exec_ctx, s,
+ s->send_trailing_md_op->payload->send_trailing_metadata
+ .send_trailing_metadata,
+ 0, dest, NULL, destfilled);
+ }
+ s->trailing_md_sent = true;
+ if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling trailing-md-on-complete", s);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
+ GRPC_ERROR_NONE);
+ s->recv_trailing_md_op = NULL;
+ needs_close = true;
+ }
+ }
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ complete_if_batch_end_locked(
+ exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op,
+ "op_state_machine scheduling send-trailing-metadata-on-complete");
+ s->send_trailing_md_op = NULL;
+ }
+ if (s->recv_initial_md_op) {
+ if (s->initial_md_recvd) {
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for already "
+ "op_state_machine %p scheduling on_complete errors for already "
"recvd initial md %p",
s, new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
}
- s->initial_md_recvd = true;
- new_err = fill_in_metadata(
- exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
- s->recv_initial_md_op->payload->recv_initial_metadata
- .recv_initial_metadata,
- s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL);
- s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata
- ->deadline = s->deadline;
- grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
- s->to_read_initial_md_filled = false;
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling initial-metadata-ready %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx,
- s->recv_initial_md_op->payload->recv_initial_metadata
- .recv_initial_metadata_ready,
- GRPC_ERROR_REF(new_err));
- if ((s->recv_initial_md_op != s->recv_message_op) &&
- (s->recv_initial_md_op != s->recv_trailing_md_op)) {
- INPROC_LOG(
- GPR_DEBUG,
- "read_state_machine %p scheduling initial-metadata-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
- s->recv_initial_md_op = NULL;
-
- if (new_err != GRPC_ERROR_NONE) {
+ if (s->to_read_initial_md_filled) {
+ s->initial_md_recvd = true;
+ new_err = fill_in_metadata(
+ exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata,
+ s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
+ NULL);
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata->deadline = s->deadline;
+ grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
+ s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors2 %p", s,
+ "op_state_machine %p scheduling initial-metadata-ready %p", s,
new_err);
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
- goto done;
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata_ready,
+ GRPC_ERROR_REF(new_err));
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_initial_md_op,
+ "op_state_machine scheduling recv-initial-metadata-on-complete");
+ s->recv_initial_md_op = NULL;
+
+ if (new_err != GRPC_ERROR_NONE) {
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling on_complete errors2 %p", s,
+ new_err);
+ fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
+ goto done;
+ }
}
}
- if (s->to_read_initial_md_filled) {
- new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame");
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
- goto done;
- }
- if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) {
- inproc_slice_byte_stream_init(
- &s->recv_message_stream,
- slice_buffer_list_pophead(&s->to_read_message));
- *s->recv_message_op->payload->recv_message.recv_message =
- &s->recv_message_stream.base;
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
- GRPC_CLOSURE_SCHED(
- exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
- GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
+ if (s->recv_message_op) {
+ if (other && other->send_message_op) {
+ message_transfer_locked(exec_ctx, other, s);
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
}
- s->recv_message_op = NULL;
+ }
+ if (s->recv_trailing_md_op && s->t->is_client && other &&
+ other->send_message_op) {
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) {
@@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for already "
+ "op_state_machine %p scheduling on_complete errors for already "
"recvd trailing md %p",
s, new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
@@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
if (s->recv_message_op != NULL) {
// This message needs to be wrapped up because it will never be
// satisfied
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready",
- s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_message_op,
+ "op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
+ if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
+ // Nothing further will try to receive from this stream, so finish off
+ // any outstanding send_message op
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
if (s->recv_trailing_md_op != NULL) {
// We wanted trailing metadata and we got it
s->trailing_md_recvd = true;
@@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// (If the server hasn't already sent its trailing md, it doesn't have
// a final status, so don't mark this op complete)
if (s->t->is_client || s->trailing_md_sent) {
- INPROC_LOG(
- GPR_DEBUG,
- "read_state_machine %p scheduling trailing-md-on-complete %p", s,
- new_err);
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling trailing-md-on-complete %p",
+ s, new_err);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = NULL;
needs_close = true;
} else {
INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p server needs to delay handling "
+ "op_state_machine %p server needs to delay handling "
"trailing-md-on-complete %p",
s, new_err);
}
} else {
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p has trailing md but not yet waiting for it",
- s);
+ "op_state_machine %p has trailing md but not yet waiting for it", s);
}
}
if (s->trailing_md_recvd && s->recv_message_op) {
// No further message will come on this stream, so finish off the
// recv_message_op
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_message_op,
+ "op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
- if (s->recv_message_op || s->recv_trailing_md_op) {
+ if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
+ s->send_message_op) {
+ // Nothing further will try to receive from this stream, so finish off
+ // any outstanding send_message op
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
+ s->recv_message_op || s->recv_trailing_md_op) {
// Didn't get the item we wanted so we still need to get
// rescheduled
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s,
- s->recv_message_op, s->recv_trailing_md_op);
- s->reads_needed = true;
+ INPROC_LOG(
+ GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s,
+ s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
+ s->recv_message_op, s->recv_trailing_md_op);
+ s->ops_needed = true;
}
done:
if (needs_close) {
- close_other_side_locked(exec_ctx, s, "read_state_machine");
+ close_other_side_locked(exec_ctx, s, "op_state_machine");
close_stream_locked(exec_ctx, s);
}
gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(new_err);
}
-static grpc_closure do_nothing_closure;
-
static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error) {
bool ret = false; // was the cancel accepted
@@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (s->cancel_self_error == GRPC_ERROR_NONE) {
ret = true;
s->cancel_self_error = GRPC_ERROR_REF(error);
- if (s->reads_needed) {
- if (!s->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure,
- GRPC_ERROR_REF(s->cancel_self_error));
- s->read_closure_scheduled = true;
- }
- s->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error);
// Send trailing md to the other side indicating cancellation, even if we
// already have
s->trailing_md_sent = true;
@@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
}
- if (other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
- GRPC_ERROR_REF(other->cancel_other_error));
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, other,
+ other->cancel_other_error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
}
@@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "cancel_stream %p scheduling trailing-md-on-complete %p", s,
- s->cancel_self_error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_REF(s->cancel_self_error));
+ complete_if_batch_end_locked(
+ exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op,
+ "cancel_stream scheduling trailing-md-on-complete");
s->recv_trailing_md_op = NULL;
}
}
@@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// already self-canceled so still give it an error
error = GRPC_ERROR_REF(s->cancel_self_error);
} else {
- INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s,
+ INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s,
+ s->t->is_client ? "client" : "server",
op->send_initial_metadata ? " send_initial_metadata" : "",
op->send_message ? " send_message" : "",
op->send_trailing_metadata ? " send_trailing_metadata" : "",
@@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
bool needs_close = false;
+ inproc_stream *other = s->other_side;
if (error == GRPC_ERROR_NONE &&
- (op->send_initial_metadata || op->send_message ||
- op->send_trailing_metadata)) {
- inproc_stream *other = s->other_side;
+ (op->send_initial_metadata || op->send_trailing_metadata)) {
if (s->t->is_closed) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
}
@@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->initial_md_sent = true;
}
}
- }
- if (error == GRPC_ERROR_NONE && op->send_message) {
- size_t remaining = op->payload->send_message.send_message->length;
- grpc_slice_buffer *dest = slice_buffer_list_append(
- (other == NULL) ? &s->write_buffer_message : &other->to_read_message);
- do {
- grpc_slice message_slice;
- grpc_closure unused;
- GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
- op->payload->send_message.send_message,
- SIZE_MAX, &unused));
- error = grpc_byte_stream_pull(
- exec_ctx, op->payload->send_message.send_message, &message_slice);
- if (error != GRPC_ERROR_NONE) {
- cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
- break;
- }
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- remaining -= GRPC_SLICE_LENGTH(message_slice);
- grpc_slice_buffer_add(dest, message_slice);
- } while (remaining != 0);
- grpc_byte_stream_destroy(exec_ctx,
- op->payload->send_message.send_message);
- }
- if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
- grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
- : &other->to_read_trailing_md;
- bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
- : &other->to_read_trailing_md_filled;
- if (*destfilled || s->trailing_md_sent) {
- // The buffer is already in use; that's an error!
- INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
- } else {
- if (!other->closed) {
- fill_in_metadata(
- exec_ctx, s,
- op->payload->send_trailing_metadata.send_trailing_metadata, 0,
- dest, NULL, destfilled);
- }
- s->trailing_md_sent = true;
- if (!s->t->is_client && s->trailing_md_recvd &&
- s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "perform_stream_op %p scheduling trailing-md-on-complete",
- s);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_NONE);
- s->recv_trailing_md_op = NULL;
- needs_close = true;
- }
- }
- }
- if (other != NULL && other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error);
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
+ maybe_schedule_op_closure_locked(exec_ctx, other, error);
}
}
+
if (error == GRPC_ERROR_NONE &&
- (op->recv_initial_metadata || op->recv_message ||
+ (op->send_message || op->send_trailing_metadata ||
+ op->recv_initial_metadata || op->recv_message ||
op->recv_trailing_metadata)) {
- // If there are any reads, mark it so that the read closure will react to
- // them
+ // Mark ops that need to be processed by the closure
+ if (op->send_message) {
+ s->send_message_op = op;
+ }
+ if (op->send_trailing_metadata) {
+ s->send_trailing_md_op = op;
+ }
if (op->recv_initial_metadata) {
s->recv_initial_md_op = op;
}
@@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
// We want to initiate the closure if:
- // 1. There is initial metadata and something ready to take that
- // 2. There is a message and something ready to take it
- // 3. There is trailing metadata, even if nothing specifically wants
- // that because that can shut down the message as well
- if ((s->to_read_initial_md_filled && op->recv_initial_metadata) ||
- ((!slice_buffer_list_empty(&s->to_read_message) ||
- s->trailing_md_recvd) &&
- op->recv_message) ||
- (s->to_read_trailing_md_filled)) {
- if (!s->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE);
- s->read_closure_scheduled = true;
+ // 1. We want to send a message and the other side wants to receive or end
+ // 2. We want to send trailing metadata and there isn't an unmatched send
+ // 3. We want initial metadata and the other side has sent it
+ // 4. We want to receive a message and there is a message ready
+ // 5. There is trailing metadata, even if nothing specifically wants
+ // that because that can shut down the receive message as well
+ if ((op->send_message && other && ((other->recv_message_op != NULL) ||
+ (other->recv_trailing_md_op != NULL))) ||
+ (op->send_trailing_metadata && !op->send_message) ||
+ (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
+ (op->recv_message && (other && other->send_message_op != NULL)) ||
+ (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
+ if (!s->op_closure_scheduled) {
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE);
+ s->op_closure_scheduled = true;
}
} else {
- s->reads_needed = true;
+ s->ops_needed = true;
}
} else {
if (error != GRPC_ERROR_NONE) {
- // Schedule op's read closures that we didn't push to read state machine
+ // Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
INPROC_LOG(
GPR_DEBUG,
diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc
index 5bd7884e28..5d737c56cb 100644
--- a/src/core/lib/debug/stats_data.cc
+++ b/src/core/lib/debug/stats_data.cc
@@ -104,6 +104,10 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"combiner_locks_scheduled_items",
"combiner_locks_scheduled_final_items",
"combiner_locks_offloaded",
+ "call_combiner_locks_initiated",
+ "call_combiner_locks_scheduled_items",
+ "call_combiner_set_notify_on_cancel",
+ "call_combiner_cancelled",
"executor_scheduled_short_items",
"executor_scheduled_long_items",
"executor_scheduled_to_self",
@@ -112,6 +116,9 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"executor_push_retries",
"server_requested_calls",
"server_slowpath_requests_queued",
+ "cq_ev_queue_trylock_failures",
+ "cq_ev_queue_trylock_successes",
+ "cq_ev_queue_transient_pop_failures",
};
const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of client side calls created by this process",
@@ -210,6 +217,11 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of items scheduled against combiner locks",
"Number of final items scheduled against combiner locks",
"Number of combiner locks offloaded to different threads",
+ "Number of call combiner lock entries by process (first items queued to a "
+ "call combiner)",
+ "Number of items scheduled against call combiner locks",
+ "Number of times a cancellation callback was set on a call combiner",
+ "Number of times a call combiner was cancelled",
"Number of finite runtime closures scheduled against the executor (gRPC "
"thread pool)",
"Number of potentially infinite runtime closures scheduled against the "
@@ -222,6 +234,12 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"How many calls were requested (not necessarily received) by the server",
"How many times was the server slow path taken (indicates too few "
"outstanding requests)",
+ "Number of lock (trylock) acquisition failures on completion queue event "
+ "queue. High value here indicates high contention on completion queues",
+ "Number of lock (trylock) acquisition successes on completion queue event "
+ "queue.",
+ "Number of times NULL was popped out of completion queue's event queue "
+ "even though the event queue was not empty",
};
const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
"call_initial_size",
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index d8e4e7d264..031942df5c 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -110,6 +110,10 @@ typedef enum {
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED,
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED,
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS,
+ GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL,
+ GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS,
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF,
@@ -118,6 +122,9 @@ typedef enum {
GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES,
GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS,
GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED,
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES,
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES,
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES,
GRPC_STATS_COUNTER_COUNT
} grpc_stats_counters;
extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT];
@@ -404,6 +411,17 @@ typedef enum {
#define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED)
+#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED)
+#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS)
+#define GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL)
+#define GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED)
#define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS)
@@ -425,6 +443,15 @@ typedef enum {
#define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES)
+#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES)
#define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \
grpc_stats_inc_call_initial_size((exec_ctx), (int)(value))
void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x);
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index 5c0ab2262e..af4553028e 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -245,6 +245,16 @@
doc: Number of final items scheduled against combiner locks
- counter: combiner_locks_offloaded
doc: Number of combiner locks offloaded to different threads
+# call combiner locks
+- counter: call_combiner_locks_initiated
+ doc: Number of call combiner lock entries by process
+ (first items queued to a call combiner)
+- counter: call_combiner_locks_scheduled_items
+ doc: Number of items scheduled against call combiner locks
+- counter: call_combiner_set_notify_on_cancel
+ doc: Number of times a cancellation callback was set on a call combiner
+- counter: call_combiner_cancelled
+ doc: Number of times a call combiner was cancelled
# executor
- counter: executor_scheduled_short_items
doc: Number of finite runtime closures scheduled against the executor
@@ -272,4 +282,13 @@
- counter: server_slowpath_requests_queued
doc: How many times was the server slow path taken (indicates too few
outstanding requests)
-
+# cq
+- counter: cq_ev_queue_trylock_failures
+ doc: Number of lock (trylock) acquisition failures on completion queue event
+ queue. High value here indicates high contention on completion queues
+- counter: cq_ev_queue_trylock_successes
+ doc: Number of lock (trylock) acquisition successes on completion queue event
+ queue.
+- counter: cq_ev_queue_transient_pop_failures
+ doc: Number of times NULL was popped out of completion queue's event queue
+ even though the event queue was not empty
diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql
index 54869977b0..04b6d471f6 100644
--- a/src/core/lib/debug/stats_data_bq_schema.sql
+++ b/src/core/lib/debug/stats_data_bq_schema.sql
@@ -79,6 +79,10 @@ combiner_locks_initiated_per_iteration:FLOAT,
combiner_locks_scheduled_items_per_iteration:FLOAT,
combiner_locks_scheduled_final_items_per_iteration:FLOAT,
combiner_locks_offloaded_per_iteration:FLOAT,
+call_combiner_locks_initiated_per_iteration:FLOAT,
+call_combiner_locks_scheduled_items_per_iteration:FLOAT,
+call_combiner_set_notify_on_cancel_per_iteration:FLOAT,
+call_combiner_cancelled_per_iteration:FLOAT,
executor_scheduled_short_items_per_iteration:FLOAT,
executor_scheduled_long_items_per_iteration:FLOAT,
executor_scheduled_to_self_per_iteration:FLOAT,
@@ -86,4 +90,7 @@ executor_wakeup_initiated_per_iteration:FLOAT,
executor_queue_drained_per_iteration:FLOAT,
executor_push_retries_per_iteration:FLOAT,
server_requested_calls_per_iteration:FLOAT,
-server_slowpath_requests_queued_per_iteration:FLOAT
+server_slowpath_requests_queued_per_iteration:FLOAT,
+cq_ev_queue_trylock_failures_per_iteration:FLOAT,
+cq_ev_queue_trylock_successes_per_iteration:FLOAT,
+cq_ev_queue_transient_pop_failures_per_iteration:FLOAT
diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc
index bab3df021a..0ba32a1e46 100644
--- a/src/core/lib/iomgr/call_combiner.cc
+++ b/src/core/lib/iomgr/call_combiner.cc
@@ -21,6 +21,7 @@
#include <inttypes.h>
#include <grpc/support/log.h>
+#include "src/core/lib/debug/stats.h"
grpc_tracer_flag grpc_call_combiner_trace =
GRPC_TRACER_INITIALIZER(false, "call_combiner");
@@ -73,7 +74,9 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
prev_size + 1);
}
+ GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx);
if (prev_size == 0) {
+ GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx);
if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
}
@@ -135,6 +138,7 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_closure* closure) {
+ GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx);
while (true) {
// Decode original state.
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
@@ -179,6 +183,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
grpc_call_combiner* call_combiner,
grpc_error* error) {
+ GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx);
while (true) {
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
grpc_error* original_error = decode_cancel_state_error(original_state);
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index 0e707ef839..53f4b7eaa7 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -165,6 +165,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
lock, cl, last));
if (last == 1) {
GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx);
+ GPR_TIMER_MARK("combiner.initiated", 0);
gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
(gpr_atm)exec_ctx);
// first element on this list: add it to the list of combiner locks
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index e4033fab1d..3a1dd8d30b 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -61,12 +61,43 @@ typedef struct {
event_engine_factory_fn factory;
} event_engine_factory;
+namespace {
+
+extern "C" {
+
+grpc_poll_function_type real_poll_function;
+
+int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
+ if (timeout == 0) {
+ return real_poll_function(fds, nfds, 0);
+ } else {
+ gpr_log(GPR_ERROR, "Attempted a blocking poll when declared non-polling.");
+ GPR_ASSERT(false);
+ return -1;
+ }
+}
+} // extern "C"
+
+const grpc_event_engine_vtable *init_non_polling(bool explicit_request) {
+ if (!explicit_request) {
+ return nullptr;
+ }
+ // return the simplest engine as a dummy but also override the poller
+ auto ret = grpc_init_poll_posix(explicit_request);
+ real_poll_function = grpc_poll_function;
+ grpc_poll_function = dummy_poll;
+
+ return ret;
+}
+} // namespace
+
static const event_engine_factory g_factories[] = {
{"epoll1", grpc_init_epoll1_linux},
{"epollsig", grpc_init_epollsig_linux},
{"poll", grpc_init_poll_posix},
{"poll-cv", grpc_init_poll_cv_posix},
{"epollex", grpc_init_epollex_linux},
+ {"none", init_non_polling},
};
static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 42033d0ba4..1cc6d98491 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -109,6 +109,16 @@
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
+#elif defined(GPR_OPENBSD)
+#define GRPC_HAVE_IFADDRS 1
+#define GRPC_HAVE_IPV6_RECVPKTINFO 1
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETADDR 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc
index ab9d60481c..0ae7d7f600 100644
--- a/src/core/lib/profiling/basic_timers.cc
+++ b/src/core/lib/profiling/basic_timers.cc
@@ -209,9 +209,9 @@ static void init_output() {
static void rotate_log() {
/* Using malloc here, as this code could end up being called by gpr_malloc */
- gpr_timer_log *new = malloc(sizeof(*new));
+ gpr_timer_log *log = static_cast<gpr_timer_log *>(malloc(sizeof(*log)));
gpr_once_init(&g_once_init, init_output);
- new->num_entries = 0;
+ log->num_entries = 0;
pthread_mutex_lock(&g_mu);
if (g_thread_log != NULL) {
timer_log_remove(&g_in_progress_logs, g_thread_log);
@@ -221,9 +221,9 @@ static void rotate_log() {
} else {
g_thread_id = g_next_thread_id++;
}
- timer_log_push_back(&g_in_progress_logs, new);
+ timer_log_push_back(&g_in_progress_logs, log);
pthread_mutex_unlock(&g_mu);
- g_thread_log = new;
+ g_thread_log = log;
}
static void gpr_timers_log_add(const char *tagstr, marker_type type,
diff --git a/src/core/lib/support/time_posix.cc b/src/core/lib/support/time_posix.cc
index 3267ea6b54..3f8a9094fd 100644
--- a/src/core/lib/support/time_posix.cc
+++ b/src/core/lib/support/time_posix.cc
@@ -42,7 +42,7 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) {
return rv;
}
-#if _POSIX_TIMERS > 0
+#if _POSIX_TIMERS > 0 || defined(__OpenBSD__)
static gpr_timespec gpr_from_timespec(struct timespec ts,
gpr_clock_type clock_type) {
/*
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 36b4b835f8..21664f03c8 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -362,11 +362,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
grpc_cq_completion *c = NULL;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
if (gpr_spinlock_trylock(&q->queue_lock)) {
- c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
+
+ bool is_empty = false;
+ c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
+
+ if (c == NULL && !is_empty) {
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
+ }
+ } else {
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
}
+ grpc_exec_ctx_finish(&exec_ctx);
+
if (c) {
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
}