aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc669
-rwxr-xr-xtest/core/end2end/gen_build_yaml.py19
-rwxr-xr-xtest/core/end2end/generate_tests.bzl20
-rw-r--r--test/core/end2end/tests/streaming_error_response.c1
-rw-r--r--test/cpp/end2end/async_end2end_test.cc182
-rw-r--r--tools/run_tests/generated/tests.json92
6 files changed, 445 insertions, 538 deletions
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 1001d74c22..67a8358927 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -62,96 +62,22 @@ typedef struct inproc_transport {
struct inproc_stream *stream_list;
} inproc_transport;
-typedef struct sb_list_entry {
- grpc_slice_buffer sb;
- struct sb_list_entry *next;
-} sb_list_entry;
-
-// Specialize grpc_byte_stream for our use case
-typedef struct {
- grpc_byte_stream base;
- sb_list_entry *le;
- grpc_error *shutdown_error;
-} inproc_slice_byte_stream;
-
-typedef struct {
- // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases
- sb_list_entry *head;
- sb_list_entry *tail;
-} slice_buffer_list;
-
-static void slice_buffer_list_init(slice_buffer_list *l) {
- l->head = NULL;
- l->tail = NULL;
-}
-
-static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) {
- grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb);
- gpr_free(le);
-}
-
-static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx,
- slice_buffer_list *l) {
- sb_list_entry *curr = l->head;
- while (curr != NULL) {
- sb_list_entry *le = curr;
- curr = curr->next;
- sb_list_entry_destroy(exec_ctx, le);
- }
- l->head = NULL;
- l->tail = NULL;
-}
-
-static bool slice_buffer_list_empty(slice_buffer_list *l) {
- return l->head == NULL;
-}
-
-static void slice_buffer_list_append_entry(slice_buffer_list *l,
- sb_list_entry *next) {
- next->next = NULL;
- if (l->tail) {
- l->tail->next = next;
- l->tail = next;
- } else {
- l->head = next;
- l->tail = next;
- }
-}
-
-static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) {
- sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next));
- grpc_slice_buffer_init(&next->sb);
- slice_buffer_list_append_entry(l, next);
- return &next->sb;
-}
-
-static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) {
- sb_list_entry *ret = l->head;
- l->head = l->head->next;
- if (l->head == NULL) {
- l->tail = NULL;
- }
- return ret;
-}
-
typedef struct inproc_stream {
inproc_transport *t;
grpc_metadata_batch to_read_initial_md;
uint32_t to_read_initial_md_flags;
bool to_read_initial_md_filled;
- slice_buffer_list to_read_message;
grpc_metadata_batch to_read_trailing_md;
bool to_read_trailing_md_filled;
- bool reads_needed;
- bool read_closure_scheduled;
- grpc_closure read_closure;
+ bool ops_needed;
+ bool op_closure_scheduled;
+ grpc_closure op_closure;
// Write buffer used only during gap at init time when client-side
// stream is set up but server side stream is not yet set up
grpc_metadata_batch write_buffer_initial_md;
bool write_buffer_initial_md_filled;
uint32_t write_buffer_initial_md_flags;
grpc_millis write_buffer_deadline;
- slice_buffer_list write_buffer_message;
grpc_metadata_batch write_buffer_trailing_md;
bool write_buffer_trailing_md_filled;
grpc_error *write_buffer_cancel_error;
@@ -164,11 +90,15 @@ typedef struct inproc_stream {
gpr_arena *arena;
+ grpc_transport_stream_op_batch *send_message_op;
+ grpc_transport_stream_op_batch *send_trailing_md_op;
grpc_transport_stream_op_batch *recv_initial_md_op;
grpc_transport_stream_op_batch *recv_message_op;
grpc_transport_stream_op_batch *recv_trailing_md_op;
- inproc_slice_byte_stream recv_message_stream;
+ grpc_slice_buffer recv_message;
+ grpc_slice_buffer_stream recv_stream;
+ bool recv_inited;
bool initial_md_sent;
bool trailing_md_sent;
@@ -187,54 +117,11 @@ typedef struct inproc_stream {
struct inproc_stream *stream_list_next;
} inproc_stream;
-static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs, size_t max,
- grpc_closure *on_complete) {
- // Because inproc transport always provides the entire message atomically,
- // the byte stream always has data available when this function is called.
- // Thus, this function always returns true (unlike other transports) and
- // there is never any need to schedule a closure
- return true;
-}
-
-static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs,
- grpc_slice *slice) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- if (stream->shutdown_error != GRPC_ERROR_NONE) {
- return GRPC_ERROR_REF(stream->shutdown_error);
- }
- *slice = grpc_slice_buffer_take_first(&stream->le->sb);
- return GRPC_ERROR_NONE;
-}
-
-static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs,
- grpc_error *error) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- GRPC_ERROR_UNREF(stream->shutdown_error);
- stream->shutdown_error = error;
-}
-
-static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *bs) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- sb_list_entry_destroy(exec_ctx, stream->le);
- GRPC_ERROR_UNREF(stream->shutdown_error);
-}
-
-static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
- inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
- inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
-
-void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
- sb_list_entry *le) {
- s->base.length = (uint32_t)le->sb.length;
- s->base.flags = 0;
- s->base.vtable = &inproc_slice_byte_stream_vtable;
- s->le = le;
- s->shutdown_error = GRPC_ERROR_NONE;
-}
+static grpc_closure do_nothing_closure;
+static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
+ grpc_error *error);
+static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
static void ref_transport(inproc_transport *t) {
INPROC_LOG(GPR_DEBUG, "ref_transport %p", t);
@@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s,
static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s);
- slice_buffer_list_destroy(exec_ctx, &s->to_read_message);
- slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message);
GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
GRPC_ERROR_UNREF(s->cancel_self_error);
GRPC_ERROR_UNREF(s->cancel_other_error);
+ if (s->recv_inited) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message);
+ }
+
unref_transport(exec_ctx, s->t);
if (s->closure_at_destroy) {
@@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
}
}
-static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-
static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client,
bool is_initial) {
for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL;
@@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->write_buffer_initial_md_filled = false;
grpc_metadata_batch_init(&s->write_buffer_trailing_md);
s->write_buffer_trailing_md_filled = false;
- slice_buffer_list_init(&s->to_read_message);
- slice_buffer_list_init(&s->write_buffer_message);
- s->reads_needed = false;
- s->read_closure_scheduled = false;
- GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s,
+ s->ops_needed = false;
+ s->op_closure_scheduled = false;
+ GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
grpc_schedule_on_exec_ctx);
s->t = t;
s->closure_at_destroy = NULL;
@@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md);
cs->write_buffer_initial_md_filled = false;
}
- while (!slice_buffer_list_empty(&cs->write_buffer_message)) {
- slice_buffer_list_append_entry(
- &s->to_read_message,
- slice_buffer_list_pophead(&cs->write_buffer_message));
- }
if (cs->write_buffer_trailing_md_filled) {
fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0,
&s->to_read_trailing_md, NULL,
@@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
}
}
+// Call the on_complete closure associated with this stream_op_batch if
+// this stream_op_batch is only one of the pending operations for this
+// stream. This is called when one of the pending operations for the stream
+// is done and about to be NULLed out
+static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *s, grpc_error *error,
+ grpc_transport_stream_op_batch *op,
+ const char *msg) {
+ int is_sm = (int)(op == s->send_message_op);
+ int is_stm = (int)(op == s->send_trailing_md_op);
+ int is_rim = (int)(op == s->recv_initial_md_op);
+ int is_rm = (int)(op == s->recv_message_op);
+ int is_rtm = (int)(op == s->recv_trailing_md_op);
+
+ if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
+ INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error));
+ }
+}
+
+static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *s,
+ grpc_error *error) {
+ if (s && s->ops_needed && !s->op_closure_scheduled) {
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error));
+ s->op_closure_scheduled = true;
+ s->ops_needed = false;
+ }
+}
+
static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error) {
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s);
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
if (!s->trailing_md_sent) {
@@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(error);
}
- if (other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
- GRPC_ERROR_REF(error));
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, other, error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
}
@@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
err);
// Last use of err so no need to REF and then UNREF it
- if ((s->recv_initial_md_op != s->recv_message_op) &&
- (s->recv_initial_md_op != s->recv_trailing_md_op)) {
- INPROC_LOG(GPR_DEBUG,
- "fail_helper %p scheduling initial-metadata-on-complete %p",
- error, s);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
- GRPC_ERROR_REF(error));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_initial_md_op,
+ "fail_helper scheduling recv-initial-metadata-on-complete");
s->recv_initial_md_op = NULL;
}
if (s->recv_message_op) {
@@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p",
- s, error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(error));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_message_op,
+ "fail_helper scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
+ if (s->send_message_op) {
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->send_message_op,
+ "fail_helper scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ if (s->send_trailing_md_op) {
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->send_trailing_md_op,
+ "fail_helper scheduling send-trailng-md-on-complete");
+ s->send_trailing_md_op = NULL;
+ }
if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling trailing-md-on-complete %p", s,
error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_REF(error));
+ complete_if_batch_end_locked(
+ exec_ctx, s, error, s->recv_trailing_md_op,
+ "fail_helper scheduling recv-trailing-metadata-on-complete");
s->recv_trailing_md_op = NULL;
}
close_other_side_locked(exec_ctx, s, "fail_helper:other_side");
@@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_ERROR_UNREF(error);
}
-static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
+ inproc_stream *sender,
+ inproc_stream *receiver) {
+ size_t remaining =
+ sender->send_message_op->payload->send_message.send_message->length;
+ if (receiver->recv_inited) {
+ grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message);
+ }
+ grpc_slice_buffer_init(&receiver->recv_message);
+ receiver->recv_inited = true;
+ do {
+ grpc_slice message_slice;
+ grpc_closure unused;
+ GPR_ASSERT(grpc_byte_stream_next(
+ exec_ctx, sender->send_message_op->payload->send_message.send_message,
+ SIZE_MAX, &unused));
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, sender->send_message_op->payload->send_message.send_message,
+ &message_slice);
+ if (error != GRPC_ERROR_NONE) {
+ cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error));
+ break;
+ }
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ remaining -= GRPC_SLICE_LENGTH(message_slice);
+ grpc_slice_buffer_add(&receiver->recv_message, message_slice);
+ } while (remaining > 0);
+
+ grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
+ 0);
+ *receiver->recv_message_op->payload->recv_message.recv_message =
+ &receiver->recv_stream.base;
+ INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
+ receiver);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ receiver->recv_message_op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_NONE);
+ complete_if_batch_end_locked(
+ exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op,
+ "message_transfer scheduling sender on_complete");
+ complete_if_batch_end_locked(
+ exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
+ "message_transfer scheduling receiver on_complete");
+
+ receiver->recv_message_op = NULL;
+ sender->send_message_op = NULL;
+}
+
+static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
- // and then return to reads_needed state if still needed
+ // and then return to ops_needed state if still needed
// Since this is a closure directly invoked by the combiner, it should not
// unref the error parameter explicitly; the combiner will do that implicitly
@@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
bool needs_close = false;
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg);
inproc_stream *s = (inproc_stream *)arg;
gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed
gpr_mu_lock(mu);
- s->read_closure_scheduled = false;
+ s->op_closure_scheduled = false;
// cancellation takes precedence
+ inproc_stream *other = s->other_side;
+
if (s->cancel_self_error != GRPC_ERROR_NONE) {
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error));
goto done;
@@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
goto done;
}
- if (s->recv_initial_md_op) {
- if (!s->to_read_initial_md_filled) {
- // We entered the state machine on some other kind of read even though
- // we still haven't satisfied initial md . That's an error.
- new_err =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing");
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for no "
- "initial md %p",
- s, new_err);
+ if (s->send_message_op && other) {
+ if (other->recv_message_op) {
+ message_transfer_locked(exec_ctx, s, other);
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ } else if (!s->t->is_client &&
+ (s->trailing_md_sent || other->recv_trailing_md_op)) {
+ // A server send will never be matched if the client is waiting
+ // for trailing metadata already
+ complete_if_batch_end_locked(
+ exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ }
+ // Pause a send trailing metadata if there is still an outstanding
+ // send message unless we know that the send message will never get
+ // matched to a receive. This happens on the client if the server has
+ // already sent status.
+ if (s->send_trailing_md_op &&
+ (!s->send_message_op ||
+ (s->t->is_client &&
+ (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
+ grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
+ : &other->to_read_trailing_md;
+ bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
+ : &other->to_read_trailing_md_filled;
+ if (*destfilled || s->trailing_md_sent) {
+ // The buffer is already in use; that's an error!
+ INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
+ new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
- } else if (s->initial_md_recvd) {
+ } else {
+ if (other && !other->closed) {
+ fill_in_metadata(exec_ctx, s,
+ s->send_trailing_md_op->payload->send_trailing_metadata
+ .send_trailing_metadata,
+ 0, dest, NULL, destfilled);
+ }
+ s->trailing_md_sent = true;
+ if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling trailing-md-on-complete", s);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
+ GRPC_ERROR_NONE);
+ s->recv_trailing_md_op = NULL;
+ needs_close = true;
+ }
+ }
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
+ complete_if_batch_end_locked(
+ exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op,
+ "op_state_machine scheduling send-trailing-metadata-on-complete");
+ s->send_trailing_md_op = NULL;
+ }
+ if (s->recv_initial_md_op) {
+ if (s->initial_md_recvd) {
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for already "
+ "op_state_machine %p scheduling on_complete errors for already "
"recvd initial md %p",
s, new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
goto done;
}
- s->initial_md_recvd = true;
- new_err = fill_in_metadata(
- exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
- s->recv_initial_md_op->payload->recv_initial_metadata
- .recv_initial_metadata,
- s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL);
- s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata
- ->deadline = s->deadline;
- grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
- s->to_read_initial_md_filled = false;
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling initial-metadata-ready %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx,
- s->recv_initial_md_op->payload->recv_initial_metadata
- .recv_initial_metadata_ready,
- GRPC_ERROR_REF(new_err));
- if ((s->recv_initial_md_op != s->recv_message_op) &&
- (s->recv_initial_md_op != s->recv_trailing_md_op)) {
- INPROC_LOG(
- GPR_DEBUG,
- "read_state_machine %p scheduling initial-metadata-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
- s->recv_initial_md_op = NULL;
-
- if (new_err != GRPC_ERROR_NONE) {
+ if (s->to_read_initial_md_filled) {
+ s->initial_md_recvd = true;
+ new_err = fill_in_metadata(
+ exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata,
+ s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
+ NULL);
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata->deadline = s->deadline;
+ grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
+ s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors2 %p", s,
+ "op_state_machine %p scheduling initial-metadata-ready %p", s,
new_err);
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
- goto done;
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ s->recv_initial_md_op->payload->recv_initial_metadata
+ .recv_initial_metadata_ready,
+ GRPC_ERROR_REF(new_err));
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_initial_md_op,
+ "op_state_machine scheduling recv-initial-metadata-on-complete");
+ s->recv_initial_md_op = NULL;
+
+ if (new_err != GRPC_ERROR_NONE) {
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling on_complete errors2 %p", s,
+ new_err);
+ fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
+ goto done;
+ }
}
}
- if (s->to_read_initial_md_filled) {
- new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame");
- fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
- goto done;
- }
- if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) {
- inproc_slice_byte_stream_init(
- &s->recv_message_stream,
- slice_buffer_list_pophead(&s->to_read_message));
- *s->recv_message_op->payload->recv_message.recv_message =
- &s->recv_message_stream.base;
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
- GRPC_CLOSURE_SCHED(
- exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
- GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
+ if (s->recv_message_op) {
+ if (other && other->send_message_op) {
+ message_transfer_locked(exec_ctx, other, s);
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
}
- s->recv_message_op = NULL;
+ }
+ if (s->recv_trailing_md_op && s->t->is_client && other &&
+ other->send_message_op) {
+ maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) {
@@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p scheduling on_complete errors for already "
+ "op_state_machine %p scheduling on_complete errors for already "
"recvd trailing md %p",
s, new_err);
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
@@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
if (s->recv_message_op != NULL) {
// This message needs to be wrapped up because it will never be
// satisfied
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready",
- s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_message_op,
+ "op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
+ if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
+ // Nothing further will try to receive from this stream, so finish off
+ // any outstanding send_message op
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
if (s->recv_trailing_md_op != NULL) {
// We wanted trailing metadata and we got it
s->trailing_md_recvd = true;
@@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// (If the server hasn't already sent its trailing md, it doesn't have
// a final status, so don't mark this op complete)
if (s->t->is_client || s->trailing_md_sent) {
- INPROC_LOG(
- GPR_DEBUG,
- "read_state_machine %p scheduling trailing-md-on-complete %p", s,
- new_err);
+ INPROC_LOG(GPR_DEBUG,
+ "op_state_machine %p scheduling trailing-md-on-complete %p",
+ s, new_err);
GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = NULL;
needs_close = true;
} else {
INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p server needs to delay handling "
+ "op_state_machine %p server needs to delay handling "
"trailing-md-on-complete %p",
s, new_err);
}
} else {
INPROC_LOG(
GPR_DEBUG,
- "read_state_machine %p has trailing md but not yet waiting for it",
- s);
+ "op_state_machine %p has trailing md but not yet waiting for it", s);
}
}
if (s->trailing_md_recvd && s->recv_message_op) {
// No further message will come on this stream, so finish off the
// recv_message_op
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s);
+ INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
- if (s->recv_message_op != s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "read_state_machine %p scheduling message-on-complete %p", s,
- new_err);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete,
- GRPC_ERROR_REF(new_err));
- }
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->recv_message_op,
+ "op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = NULL;
}
- if (s->recv_message_op || s->recv_trailing_md_op) {
+ if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
+ s->send_message_op) {
+ // Nothing further will try to receive from this stream, so finish off
+ // any outstanding send_message op
+ complete_if_batch_end_locked(
+ exec_ctx, s, new_err, s->send_message_op,
+ "op_state_machine scheduling send-message-on-complete");
+ s->send_message_op = NULL;
+ }
+ if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
+ s->recv_message_op || s->recv_trailing_md_op) {
// Didn't get the item we wanted so we still need to get
// rescheduled
- INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s,
- s->recv_message_op, s->recv_trailing_md_op);
- s->reads_needed = true;
+ INPROC_LOG(
+ GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s,
+ s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
+ s->recv_message_op, s->recv_trailing_md_op);
+ s->ops_needed = true;
}
done:
if (needs_close) {
- close_other_side_locked(exec_ctx, s, "read_state_machine");
+ close_other_side_locked(exec_ctx, s, "op_state_machine");
close_stream_locked(exec_ctx, s);
}
gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(new_err);
}
-static grpc_closure do_nothing_closure;
-
static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_error *error) {
bool ret = false; // was the cancel accepted
@@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (s->cancel_self_error == GRPC_ERROR_NONE) {
ret = true;
s->cancel_self_error = GRPC_ERROR_REF(error);
- if (s->reads_needed) {
- if (!s->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure,
- GRPC_ERROR_REF(s->cancel_self_error));
- s->read_closure_scheduled = true;
- }
- s->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error);
// Send trailing md to the other side indicating cancellation, even if we
// already have
s->trailing_md_sent = true;
@@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (other->cancel_other_error == GRPC_ERROR_NONE) {
other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
}
- if (other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure,
- GRPC_ERROR_REF(other->cancel_other_error));
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
- }
+ maybe_schedule_op_closure_locked(exec_ctx, other,
+ other->cancel_other_error);
} else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
}
@@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "cancel_stream %p scheduling trailing-md-on-complete %p", s,
- s->cancel_self_error);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_REF(s->cancel_self_error));
+ complete_if_batch_end_locked(
+ exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op,
+ "cancel_stream scheduling trailing-md-on-complete");
s->recv_trailing_md_op = NULL;
}
}
@@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// already self-canceled so still give it an error
error = GRPC_ERROR_REF(s->cancel_self_error);
} else {
- INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s,
+ INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s,
+ s->t->is_client ? "client" : "server",
op->send_initial_metadata ? " send_initial_metadata" : "",
op->send_message ? " send_message" : "",
op->send_trailing_metadata ? " send_trailing_metadata" : "",
@@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
bool needs_close = false;
+ inproc_stream *other = s->other_side;
if (error == GRPC_ERROR_NONE &&
- (op->send_initial_metadata || op->send_message ||
- op->send_trailing_metadata)) {
- inproc_stream *other = s->other_side;
+ (op->send_initial_metadata || op->send_trailing_metadata)) {
if (s->t->is_closed) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
}
@@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->initial_md_sent = true;
}
}
- }
- if (error == GRPC_ERROR_NONE && op->send_message) {
- size_t remaining = op->payload->send_message.send_message->length;
- grpc_slice_buffer *dest = slice_buffer_list_append(
- (other == NULL) ? &s->write_buffer_message : &other->to_read_message);
- do {
- grpc_slice message_slice;
- grpc_closure unused;
- GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
- op->payload->send_message.send_message,
- SIZE_MAX, &unused));
- error = grpc_byte_stream_pull(
- exec_ctx, op->payload->send_message.send_message, &message_slice);
- if (error != GRPC_ERROR_NONE) {
- cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
- break;
- }
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- remaining -= GRPC_SLICE_LENGTH(message_slice);
- grpc_slice_buffer_add(dest, message_slice);
- } while (remaining != 0);
- grpc_byte_stream_destroy(exec_ctx,
- op->payload->send_message.send_message);
- }
- if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
- grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
- : &other->to_read_trailing_md;
- bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
- : &other->to_read_trailing_md_filled;
- if (*destfilled || s->trailing_md_sent) {
- // The buffer is already in use; that's an error!
- INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
- } else {
- if (!other->closed) {
- fill_in_metadata(
- exec_ctx, s,
- op->payload->send_trailing_metadata.send_trailing_metadata, 0,
- dest, NULL, destfilled);
- }
- s->trailing_md_sent = true;
- if (!s->t->is_client && s->trailing_md_recvd &&
- s->recv_trailing_md_op) {
- INPROC_LOG(GPR_DEBUG,
- "perform_stream_op %p scheduling trailing-md-on-complete",
- s);
- GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
- GRPC_ERROR_NONE);
- s->recv_trailing_md_op = NULL;
- needs_close = true;
- }
- }
- }
- if (other != NULL && other->reads_needed) {
- if (!other->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error);
- other->read_closure_scheduled = true;
- }
- other->reads_needed = false;
+ maybe_schedule_op_closure_locked(exec_ctx, other, error);
}
}
+
if (error == GRPC_ERROR_NONE &&
- (op->recv_initial_metadata || op->recv_message ||
+ (op->send_message || op->send_trailing_metadata ||
+ op->recv_initial_metadata || op->recv_message ||
op->recv_trailing_metadata)) {
- // If there are any reads, mark it so that the read closure will react to
- // them
+ // Mark ops that need to be processed by the closure
+ if (op->send_message) {
+ s->send_message_op = op;
+ }
+ if (op->send_trailing_metadata) {
+ s->send_trailing_md_op = op;
+ }
if (op->recv_initial_metadata) {
s->recv_initial_md_op = op;
}
@@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
// We want to initiate the closure if:
- // 1. There is initial metadata and something ready to take that
- // 2. There is a message and something ready to take it
- // 3. There is trailing metadata, even if nothing specifically wants
- // that because that can shut down the message as well
- if ((s->to_read_initial_md_filled && op->recv_initial_metadata) ||
- ((!slice_buffer_list_empty(&s->to_read_message) ||
- s->trailing_md_recvd) &&
- op->recv_message) ||
- (s->to_read_trailing_md_filled)) {
- if (!s->read_closure_scheduled) {
- GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE);
- s->read_closure_scheduled = true;
+ // 1. We want to send a message and the other side wants to receive or end
+ // 2. We want to send trailing metadata and there isn't an unmatched send
+ // 3. We want initial metadata and the other side has sent it
+ // 4. We want to receive a message and there is a message ready
+ // 5. There is trailing metadata, even if nothing specifically wants
+ // that because that can shut down the receive message as well
+ if ((op->send_message && other && ((other->recv_message_op != NULL) ||
+ (other->recv_trailing_md_op != NULL))) ||
+ (op->send_trailing_metadata && !op->send_message) ||
+ (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
+ (op->recv_message && (other && other->send_message_op != NULL)) ||
+ (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
+ if (!s->op_closure_scheduled) {
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE);
+ s->op_closure_scheduled = true;
}
} else {
- s->reads_needed = true;
+ s->ops_needed = true;
}
} else {
if (error != GRPC_ERROR_NONE) {
- // Schedule op's read closures that we didn't push to read state machine
+ // Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
INPROC_LOG(
GPR_DEBUG,
diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py
index e1dc69994c..f7f996d5c1 100755
--- a/test/core/end2end/gen_build_yaml.py
+++ b/test/core/end2end/gen_build_yaml.py
@@ -24,15 +24,15 @@ import hashlib
FixtureOptions = collections.namedtuple(
'FixtureOptions',
- 'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2 supports_proxy_auth')
+ 'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2 supports_proxy_auth supports_write_buffering')
default_unsecure_fixture_options = FixtureOptions(
- True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True, False)
+ True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True, False, True)
socketpair_unsecure_fixture_options = default_unsecure_fixture_options._replace(fullstack=False, dns_resolver=False)
default_secure_fixture_options = default_unsecure_fixture_options._replace(secure=True)
uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, platforms=['linux', 'mac', 'posix'], exclude_iomgrs=['uv'])
fd_unsecure_fixture_options = default_unsecure_fixture_options._replace(
dns_resolver=False, fullstack=False, platforms=['linux', 'mac', 'posix'], exclude_iomgrs=['uv'])
-inproc_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, fullstack=False, name_resolution=False, supports_compression=False, is_inproc=True, is_http2=False)
+inproc_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, fullstack=False, name_resolution=False, supports_compression=False, is_inproc=True, is_http2=False, supports_write_buffering=False)
# maps fixture name to whether it requires the security library
END2END_FIXTURES = {
@@ -68,8 +68,8 @@ END2END_FIXTURES = {
TestOptions = collections.namedtuple(
'TestOptions',
- 'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2 needs_proxy_auth')
-default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False, False)
+ 'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2 needs_proxy_auth needs_write_buffering')
+default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False, False, False)
connectivity_test_options = default_test_options._replace(needs_fullstack=True)
LOWCPU = 0.1
@@ -146,8 +146,10 @@ END2END_TESTS = {
'streaming_error_response': default_test_options._replace(cpu_cost=LOWCPU),
'trailing_metadata': default_test_options,
'workaround_cronet_compression': default_test_options,
- 'write_buffering': default_test_options._replace(cpu_cost=LOWCPU),
- 'write_buffering_at_end': default_test_options._replace(cpu_cost=LOWCPU),
+ 'write_buffering': default_test_options._replace(cpu_cost=LOWCPU,
+ needs_write_buffering=True),
+ 'write_buffering_at_end': default_test_options._replace(cpu_cost=LOWCPU,
+ needs_write_buffering=True),
}
@@ -185,6 +187,9 @@ def compatible(f, t):
if END2END_TESTS[t].needs_proxy_auth:
if not END2END_FIXTURES[f].supports_proxy_auth:
return False
+ if END2END_TESTS[t].needs_write_buffering:
+ if not END2END_FIXTURES[f].supports_write_buffering:
+ return False
return True
diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl
index d48ddb4606..89a95edfd7 100755
--- a/test/core/end2end/generate_tests.bzl
+++ b/test/core/end2end/generate_tests.bzl
@@ -21,7 +21,8 @@ load("//bazel:grpc_build_system.bzl", "grpc_sh_test", "grpc_cc_binary", "grpc_cc
def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True,
name_resolution=True, secure=True, tracing=False,
platforms=['windows', 'linux', 'mac', 'posix'],
- is_inproc=False, is_http2=True, supports_proxy_auth=False):
+ is_inproc=False, is_http2=True, supports_proxy_auth=False,
+ supports_write_buffering=True):
return struct(
fullstack=fullstack,
includes_proxy=includes_proxy,
@@ -31,7 +32,8 @@ def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True,
tracing=tracing,
is_inproc=is_inproc,
is_http2=is_http2,
- supports_proxy_auth=supports_proxy_auth
+ supports_proxy_auth=supports_proxy_auth,
+ supports_write_buffering=supports_write_buffering
#platforms=platforms
)
@@ -61,14 +63,14 @@ END2END_FIXTURES = {
platforms=['linux', 'mac', 'posix']),
'inproc': fixture_options(fullstack=False, dns_resolver=False,
name_resolution=False, is_inproc=True,
- is_http2=False),
+ is_http2=False, supports_write_buffering=False),
}
def test_options(needs_fullstack=False, needs_dns=False, needs_names=False,
proxyable=True, secure=False, traceable=False,
exclude_inproc=False, needs_http2=False,
- needs_proxy_auth=False):
+ needs_proxy_auth=False, needs_write_buffering=False):
return struct(
needs_fullstack=needs_fullstack,
needs_dns=needs_dns,
@@ -78,7 +80,8 @@ def test_options(needs_fullstack=False, needs_dns=False, needs_names=False,
traceable=traceable,
exclude_inproc=exclude_inproc,
needs_http2=needs_http2,
- needs_proxy_auth=needs_proxy_auth
+ needs_proxy_auth=needs_proxy_auth,
+ needs_write_buffering=needs_write_buffering
)
@@ -144,8 +147,8 @@ END2END_TESTS = {
'authority_not_supported': test_options(),
'filter_latency': test_options(),
'workaround_cronet_compression': test_options(),
- 'write_buffering': test_options(),
- 'write_buffering_at_end': test_options(),
+ 'write_buffering': test_options(needs_write_buffering=True),
+ 'write_buffering_at_end': test_options(needs_write_buffering=True),
}
@@ -174,6 +177,9 @@ def compatible(fopt, topt):
if topt.needs_proxy_auth:
if not fopt.supports_proxy_auth:
return False
+ if topt.needs_write_buffering:
+ if not fopt.supports_write_buffering:
+ return False
return True
diff --git a/test/core/end2end/tests/streaming_error_response.c b/test/core/end2end/tests/streaming_error_response.c
index 9d562b9090..8891b8674c 100644
--- a/test/core/end2end/tests/streaming_error_response.c
+++ b/test/core/end2end/tests/streaming_error_response.c
@@ -183,7 +183,6 @@ static void test(grpc_end2end_test_config config, bool request_status_early) {
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(103), 1);
- cq_verify(cqv);
if (!request_status_early) {
memset(ops, 0, sizeof(ops));
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index a14b4d5295..2a33e8ae11 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -1304,7 +1304,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
ServerTryCancelRequestPhase server_try_cancel) {
ResetStub();
- EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
@@ -1315,31 +1314,24 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
// Initiate the 'RequestStream' call on client
+ CompletionQueue cli_cq;
+
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
- stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
+ stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
+ std::thread t1([this, &cli_cq] {
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq);
+ });
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
-
- // Client sends 3 messages (tags 3, 4 and 5)
- for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
- send_request.set_message("Ping " + grpc::to_string(tag_idx));
- cli_stream->Write(send_request, tag(tag_idx));
- Verifier(GetParam().disable_blocking)
- .Expect(tag_idx, true)
- .Verify(cq_.get());
- }
- cli_stream->WritesDone(tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ t1.join();
bool expected_server_cq_result = true;
- bool ignore_cq_result = false;
- bool want_done_tag = false;
+ bool expected_client_cq_result = true;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
@@ -1347,10 +1339,36 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know
- // for sure that all cq results will return false from this point forward
+ // for sure that all server cq results will return false from this
+ // point forward
expected_server_cq_result = false;
+ expected_client_cq_result = false;
}
+ bool ignore_client_cq_result =
+ (server_try_cancel == CANCEL_DURING_PROCESSING) ||
+ (server_try_cancel == CANCEL_BEFORE_PROCESSING);
+
+ std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
+ &ignore_client_cq_result, this] {
+ EchoRequest send_request;
+ // Client sends 3 messages (tags 3, 4 and 5)
+ for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
+ send_request.set_message("Ping " + grpc::to_string(tag_idx));
+ cli_stream->Write(send_request, tag(tag_idx));
+ Verifier(GetParam().disable_blocking)
+ .Expect(tag_idx, expected_client_cq_result)
+ .Verify(&cli_cq, ignore_client_cq_result);
+ }
+ cli_stream->WritesDone(tag(6));
+ // Ignore ok on WritesDone since cancel can affect it
+ Verifier(GetParam().disable_blocking)
+ .Expect(6, expected_client_cq_result)
+ .Verify(&cli_cq, ignore_client_cq_result);
+ });
+
+ bool ignore_cq_result = false;
+ bool want_done_tag = false;
std::thread* server_try_cancel_thd = nullptr;
auto verif = Verifier(GetParam().disable_blocking);
@@ -1387,6 +1405,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
}
}
+ cli_thread.join();
+
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
@@ -1415,9 +1435,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq);
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
+
+ cli_cq.Shutdown();
+ void* dummy_tag;
+ bool dummy_ok;
+ while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
+ }
}
// Helper for testing server-streaming RPCs which are cancelled on the server.
@@ -1439,7 +1465,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
- EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
@@ -1447,20 +1472,29 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
send_request.set_message("Ping");
// Initiate the 'ResponseStream' call on the client
+ CompletionQueue cli_cq;
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
- stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
+ stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
+
+ std::thread t1([this, &cli_cq] {
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq);
+ });
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ t1.join();
+
EXPECT_EQ(send_request.message(), recv_request.message());
bool expected_cq_result = true;
bool ignore_cq_result = false;
bool want_done_tag = false;
+ bool expected_client_cq_result = true;
+ bool ignore_client_cq_result =
+ (server_try_cancel != CANCEL_BEFORE_PROCESSING);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
@@ -1470,8 +1504,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// We know for sure that all cq results will be false from this point
// since the server cancelled the RPC
expected_cq_result = false;
+ expected_client_cq_result = false;
}
+ std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
+ &ignore_client_cq_result, this] {
+ // Client attempts to read the three messages from the server
+ for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
+ EchoResponse recv_response;
+ cli_stream->Read(&recv_response, tag(tag_idx));
+ Verifier(GetParam().disable_blocking)
+ .Expect(tag_idx, expected_client_cq_result)
+ .Verify(&cli_cq, ignore_client_cq_result);
+ }
+ });
+
std::thread* server_try_cancel_thd = nullptr;
auto verif = Verifier(GetParam().disable_blocking);
@@ -1519,10 +1566,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
srv_ctx.TryCancel();
want_done_tag = true;
verif.Expect(11, true);
-
- // Client reads may fail bacause it is notified that the stream is
- // cancelled.
- ignore_cq_result = true;
}
if (want_done_tag) {
@@ -1531,13 +1574,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
want_done_tag = false;
}
- // Client attemts to read the three messages from the server
- for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
- cli_stream->Read(&recv_response, tag(tag_idx));
- Verifier(GetParam().disable_blocking)
- .Expect(tag_idx, expected_cq_result)
- .Verify(cq_.get(), ignore_cq_result);
- }
+ cli_thread.join();
// The RPC has been cancelled at this point for sure (i.e irrespective of
// the value of `server_try_cancel` is). So, from this point forward, we
@@ -1549,9 +1586,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq);
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
+
+ cli_cq.Shutdown();
+ void* dummy_tag;
+ bool dummy_ok;
+ while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
+ }
}
// Helper for testing bidirectinal-streaming RPCs which are cancelled on the
@@ -1584,38 +1627,52 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the call from the client side
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
+
+ auto verif = Verifier(GetParam().disable_blocking);
// Client sends the first and the only message
send_request.set_message("Ping");
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
+ verif.Expect(3, true);
bool expected_cq_result = true;
bool ignore_cq_result = false;
bool want_done_tag = false;
+ int got_tag, got_tag2;
+ bool tag_3_done = false;
+
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
- EXPECT_TRUE(srv_ctx.IsCancelled());
-
- // We know for sure that all cq results will be false from this point
- // since the server cancelled the RPC
+ verif.Expect(11, true);
+ // We know for sure that all server cq results will be false from
+ // this point since the server cancelled the RPC. However, we can't
+ // say for sure about the client
expected_cq_result = false;
+ ignore_cq_result = true;
+
+ do {
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
+ if (got_tag == 3) {
+ tag_3_done = true;
+ }
+ } while (got_tag != 11);
+ EXPECT_TRUE(srv_ctx.IsCancelled());
}
std::thread* server_try_cancel_thd = nullptr;
- auto verif = Verifier(GetParam().disable_blocking);
-
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread(&ServerContext::TryCancel, &srv_ctx);
@@ -1630,39 +1687,42 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
verif.Expect(11, true);
}
- int got_tag;
srv_stream.Read(&recv_request, tag(4));
verif.Expect(4, expected_cq_result);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
+ got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
+ got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
+ (got_tag == 11 && want_done_tag));
+ GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
+ (got_tag2 == 11 && want_done_tag));
+ // If we get 3 and 4, we don't need to wait for 11, but if
+ // we get 11, we should also clear 3 and 4
+ if (got_tag + got_tag2 != 7) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 3) || (got_tag == 4));
}
send_response.set_message("Pong");
srv_stream.Write(send_response, tag(5));
verif.Expect(5, expected_cq_result);
- got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
- EXPECT_TRUE(srv_ctx.IsCancelled());
- want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
- }
cli_stream->Read(&recv_response, tag(6));
verif.Expect(6, expected_cq_result);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
- GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
- if (got_tag == 11) {
+ got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
+ (got_tag == 11 && want_done_tag));
+ GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
+ (got_tag2 == 11 && want_done_tag));
+ // If we get 5 and 6, we don't need to wait for 11, but if
+ // we get 11, we should also clear 5 and 6
+ if (got_tag + got_tag2 != 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
- // Now get the other entry that we were waiting on
- EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
+ got_tag = verif.Next(cq_.get(), ignore_cq_result);
+ GPR_ASSERT((got_tag == 5) || (got_tag == 6));
}
// This is expected to succeed in all cases
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 91c7d4c38e..a1644dfa09 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -29978,52 +29978,6 @@
},
{
"args": [
- "write_buffering"
- ],
- "ci_platforms": [
- "windows",
- "linux",
- "mac",
- "posix"
- ],
- "cpu_cost": 0.1,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
- "language": "c",
- "name": "inproc_test",
- "platforms": [
- "windows",
- "linux",
- "mac",
- "posix"
- ]
- },
- {
- "args": [
- "write_buffering_at_end"
- ],
- "ci_platforms": [
- "windows",
- "linux",
- "mac",
- "posix"
- ],
- "cpu_cost": 0.1,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
- "language": "c",
- "name": "inproc_test",
- "platforms": [
- "windows",
- "linux",
- "mac",
- "posix"
- ]
- },
- {
- "args": [
"authority_not_supported"
],
"ci_platforms": [
@@ -48361,52 +48315,6 @@
},
{
"args": [
- "write_buffering"
- ],
- "ci_platforms": [
- "windows",
- "linux",
- "mac",
- "posix"
- ],
- "cpu_cost": 0.1,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
- "language": "c",
- "name": "inproc_nosec_test",
- "platforms": [
- "windows",
- "linux",
- "mac",
- "posix"
- ]
- },
- {
- "args": [
- "write_buffering_at_end"
- ],
- "ci_platforms": [
- "windows",
- "linux",
- "mac",
- "posix"
- ],
- "cpu_cost": 0.1,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
- "language": "c",
- "name": "inproc_nosec_test",
- "platforms": [
- "windows",
- "linux",
- "mac",
- "posix"
- ]
- },
- {
- "args": [
"--scenarios_json",
"{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_1channel_100rpcs_1MB\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"channel_args\": [{\"str_value\": \"throughput\", \"name\": \"grpc.optimization_target\"}], \"security_params\": null, \"threads_per_cq\": 0, \"server_type\": \"ASYNC_SERVER\"}, \"num_clients\": 1, \"client_config\": {\"security_params\": null, \"channel_args\": [{\"str_value\": \"throughput\", \"name\": \"grpc.optimization_target\"}], \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"payload_config\": {\"simple_params\": {\"resp_size\": 1048576, \"req_size\": 1048576}}, \"client_channels\": 1, \"threads_per_cq\": 0, \"load_params\": {\"closed_loop\": {}}, \"client_type\": \"ASYNC_CLIENT\", \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}}]}"
],