diff options
author | Craig Tiller <ctiller@google.com> | 2017-10-12 16:45:21 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-10-12 16:45:21 -0700 |
commit | 26a0c98ee6b70e9f02ddfc6d0193ec6051911fa0 (patch) | |
tree | f5c35fe43f59af6ca384c3ac9e7dd81724b7090d /src/core/ext | |
parent | a4ea9186313bba3e80db61dea703681a1841cac8 (diff) | |
parent | 19346558cb578f8b75444c5057837f6d8ad2732e (diff) |
Merge branch 'pid++' into flow++
Diffstat (limited to 'src/core/ext')
5 files changed, 348 insertions, 443 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 53fa0fff04..ffd58129c6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -345,9 +345,6 @@ typedef struct glb_lb_policy { /** are we currently updating lb_call? */ bool updating_lb_call; - /** are we currently updating lb_channel? */ - bool updating_lb_channel; - /** are we already watching the LB channel's connectivity? */ bool watching_lb_channel; @@ -360,9 +357,6 @@ typedef struct glb_lb_policy { /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; - /** args from the latest update received while already updating, or NULL */ - grpc_lb_policy_args *pending_update_args; - /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ @@ -982,10 +976,6 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); - if (glb_policy->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); - gpr_free(glb_policy->pending_update_args); - } gpr_free(glb_policy); } @@ -1752,45 +1742,22 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } const grpc_lb_addresses *addresses = (const grpc_lb_addresses *)arg->value.pointer.p; - + // If a non-empty serverlist hasn't been received from the balancer, + // propagate the update to fallback_backend_addresses. if (glb_policy->serverlist == NULL) { - // If a non-empty serverlist hasn't been received from the balancer, - // propagate the update to fallback_backend_addresses. fallback_update_locked(exec_ctx, glb_policy, addresses); - } else if (glb_policy->updating_lb_channel) { - // If we have recieved serverlist from the balancer, we need to defer update - // when there is an in-progress one. - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Update already in progress for grpclb %p. Deferring update.", - (void *)glb_policy); - } - if (glb_policy->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, - glb_policy->pending_update_args->args); - gpr_free(glb_policy->pending_update_args); - } - glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc( - sizeof(*glb_policy->pending_update_args)); - glb_policy->pending_update_args->client_channel_factory = - args->client_channel_factory; - glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); - glb_policy->pending_update_args->combiner = args->combiner; - return; } - - glb_policy->updating_lb_channel = true; GPR_ASSERT(glb_policy->lb_channel != NULL); + // Propagate updates to the LB channel (pick_first) through the fake + // resolver. grpc_channel_args *lb_channel_args = build_lb_channel_args( exec_ctx, addresses, glb_policy->response_generator, args->args); - /* Propagate updates to the LB channel (pick first) through the fake resolver - */ grpc_fake_resolver_response_generator_set_response( exec_ctx, glb_policy->response_generator, lb_channel_args); grpc_channel_args_destroy(exec_ctx, lb_channel_args); - + // Start watching the LB channel connectivity for connection, if not + // already doing so. if (!glb_policy->watching_lb_channel) { - // Watch the LB channel connectivity for connection. glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state( glb_policy->lb_channel, true /* try to connect */); grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( @@ -1842,18 +1809,10 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, /* fallthrough */ case GRPC_CHANNEL_READY: if (glb_policy->lb_call != NULL) { - glb_policy->updating_lb_channel = false; glb_policy->updating_lb_call = true; grpc_call_cancel(glb_policy->lb_call, NULL); - // lb_on_server_status_received will pick up the cancel and reinit + // lb_on_server_status_received() will pick up the cancel and reinit // lb_call. - if (glb_policy->pending_update_args != NULL) { - grpc_lb_policy_args *args = glb_policy->pending_update_args; - glb_policy->pending_update_args = NULL; - glb_update_locked(exec_ctx, &glb_policy->base, args); - grpc_channel_args_destroy(exec_ctx, args->args); - gpr_free(args); - } } else if (glb_policy->started_picking && !glb_policy->shutting_down) { if (glb_policy->retry_timer_active) { grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index fe67e46402..7815ff123e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -223,6 +223,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, t->flow_control.Destroy(); + GRPC_ERROR_UNREF(t->closed_with_error); gpr_free(t->ping_acks); gpr_free(t->peer_string); gpr_free(t); @@ -564,8 +565,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - schedule_bdp_ping_locked(exec_ctx, t); + if (t->flow_control.enable_bdp_probe) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + schedule_bdp_ping_locked(exec_ctx, t); + } grpc_chttp2_act_on_flowctl_action( exec_ctx, t->flow_control->MakeAction(exec_ctx), t, NULL); @@ -598,7 +601,9 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { - if (!t->closed) { + end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); + cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + if (t->closed_with_error == nullptr) { if (!grpc_error_has_clear_grpc_status(error)) { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); @@ -613,10 +618,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - t->closed = 1; + GPR_ASSERT(error != GRPC_ERROR_NONE); + t->closed_with_error = GRPC_ERROR_REF(error); connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); - grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); if (t->ping_state.is_delayed_ping_timer_set) { grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); } @@ -642,8 +647,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, while (grpc_chttp2_list_pop_writable_stream(t, &s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } - end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); - cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + if (t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) { + grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); + } } GRPC_ERROR_UNREF(error); } @@ -839,6 +845,10 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->close_transport_on_writes_finished = NULL; close_transport_locked(exec_ctx, t, err); } + if (t->closed_with_error != GRPC_ERROR_NONE) { + grpc_endpoint_shutdown(exec_ctx, t->ep, + GRPC_ERROR_REF(t->closed_with_error)); + } } } @@ -946,7 +956,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { + if (t->closed_with_error == GRPC_ERROR_NONE && + grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } } @@ -999,7 +1010,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; - if (t->closed) { + if (t->closed_with_error != GRPC_ERROR_NONE) { r.writing = false; } else { r = grpc_chttp2_begin_write(exec_ctx, t); @@ -1462,7 +1473,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (!s->write_closed) { if (t->is_client) { - if (!t->closed) { + if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_ASSERT(s->id == 0); grpc_chttp2_list_add_waiting_for_concurrency(t, s); maybe_start_some_streams(exec_ctx, t); @@ -1470,7 +1481,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_cancel_stream( exec_ctx, t, s, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"), + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } else { @@ -1756,7 +1768,10 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); /*The transport will be closed after the write is done */ close_transport_locked( - exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings")); + exec_ctx, t, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } @@ -2479,7 +2494,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } GPR_SWAP(grpc_error *, err, error); GRPC_ERROR_UNREF(err); - if (!t->closed) { + if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, @@ -2519,13 +2534,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("post_reading_action_locked", 0); bool keep_reading = false; - if (error == GRPC_ERROR_NONE && t->closed) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"); + if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1); } if (error != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; - } else if (!t->closed) { + } else if (t->closed_with_error == GRPC_ERROR_NONE) { keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } @@ -2662,7 +2678,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (t->destroying || t->closed) { + if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; } else if (error == GRPC_ERROR_NONE) { if (t->keepalive_permit_without_calls || @@ -2720,8 +2736,8 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout")); + close_transport_locked(exec_ctx, t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL)); } } else { /* The watchdog timer should have been cancelled by diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index fca03b762c..b2d2c1ad90 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -255,7 +255,7 @@ struct grpc_chttp2_transport { /** is the transport destroying itself? */ uint8_t destroying; /** has the upper layer closed the transport? */ - uint8_t closed; + grpc_error *closed_with_error; /** is there a read request to the endpoint outstanding? */ uint8_t endpoint_reading; @@ -297,7 +297,7 @@ struct grpc_chttp2_transport { /** hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; /** is this a client? */ - uint8_t is_client; + bool is_client; /** data to write next write */ grpc_slice_buffer qbuf; @@ -307,14 +307,14 @@ struct grpc_chttp2_transport { uint32_t write_buffer_size; /** have we seen a goaway */ - uint8_t seen_goaway; + bool seen_goaway; /** have we sent a goaway */ grpc_chttp2_sent_goaway_state sent_goaway_state; /** are the local settings dirty and need to be sent? */ - uint8_t dirtied_local_settings; + bool dirtied_local_settings; /** have local settings been sent? */ - uint8_t sent_local_settings; + bool sent_local_settings; /** bitmask of setting indexes to send out */ uint32_t force_send_settings; /** settings values */ diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index fe82b6fd60..ff76a5fcdb 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -244,7 +244,8 @@ class WriteContext { void UpdateStreamsNoLongerStalled() { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) { - if (!t_->closed && grpc_chttp2_list_add_writable_stream(t_, s)) { + if (t_->closed_with_error == GRPC_ERROR_NONE && + grpc_chttp2_list_add_writable_stream(t_, s)) { if (!stream_ref_if_not_destroyed(&s->refcount->refs)) { grpc_chttp2_list_remove_writable_stream(t_, s); } diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1001d74c22..67a8358927 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -62,96 +62,22 @@ typedef struct inproc_transport { struct inproc_stream *stream_list; } inproc_transport; -typedef struct sb_list_entry { - grpc_slice_buffer sb; - struct sb_list_entry *next; -} sb_list_entry; - -// Specialize grpc_byte_stream for our use case -typedef struct { - grpc_byte_stream base; - sb_list_entry *le; - grpc_error *shutdown_error; -} inproc_slice_byte_stream; - -typedef struct { - // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases - sb_list_entry *head; - sb_list_entry *tail; -} slice_buffer_list; - -static void slice_buffer_list_init(slice_buffer_list *l) { - l->head = NULL; - l->tail = NULL; -} - -static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) { - grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb); - gpr_free(le); -} - -static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx, - slice_buffer_list *l) { - sb_list_entry *curr = l->head; - while (curr != NULL) { - sb_list_entry *le = curr; - curr = curr->next; - sb_list_entry_destroy(exec_ctx, le); - } - l->head = NULL; - l->tail = NULL; -} - -static bool slice_buffer_list_empty(slice_buffer_list *l) { - return l->head == NULL; -} - -static void slice_buffer_list_append_entry(slice_buffer_list *l, - sb_list_entry *next) { - next->next = NULL; - if (l->tail) { - l->tail->next = next; - l->tail = next; - } else { - l->head = next; - l->tail = next; - } -} - -static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) { - sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next)); - grpc_slice_buffer_init(&next->sb); - slice_buffer_list_append_entry(l, next); - return &next->sb; -} - -static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) { - sb_list_entry *ret = l->head; - l->head = l->head->next; - if (l->head == NULL) { - l->tail = NULL; - } - return ret; -} - typedef struct inproc_stream { inproc_transport *t; grpc_metadata_batch to_read_initial_md; uint32_t to_read_initial_md_flags; bool to_read_initial_md_filled; - slice_buffer_list to_read_message; grpc_metadata_batch to_read_trailing_md; bool to_read_trailing_md_filled; - bool reads_needed; - bool read_closure_scheduled; - grpc_closure read_closure; + bool ops_needed; + bool op_closure_scheduled; + grpc_closure op_closure; // Write buffer used only during gap at init time when client-side // stream is set up but server side stream is not yet set up grpc_metadata_batch write_buffer_initial_md; bool write_buffer_initial_md_filled; uint32_t write_buffer_initial_md_flags; grpc_millis write_buffer_deadline; - slice_buffer_list write_buffer_message; grpc_metadata_batch write_buffer_trailing_md; bool write_buffer_trailing_md_filled; grpc_error *write_buffer_cancel_error; @@ -164,11 +90,15 @@ typedef struct inproc_stream { gpr_arena *arena; + grpc_transport_stream_op_batch *send_message_op; + grpc_transport_stream_op_batch *send_trailing_md_op; grpc_transport_stream_op_batch *recv_initial_md_op; grpc_transport_stream_op_batch *recv_message_op; grpc_transport_stream_op_batch *recv_trailing_md_op; - inproc_slice_byte_stream recv_message_stream; + grpc_slice_buffer recv_message; + grpc_slice_buffer_stream recv_stream; + bool recv_inited; bool initial_md_sent; bool trailing_md_sent; @@ -187,54 +117,11 @@ typedef struct inproc_stream { struct inproc_stream *stream_list_next; } inproc_stream; -static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, size_t max, - grpc_closure *on_complete) { - // Because inproc transport always provides the entire message atomically, - // the byte stream always has data available when this function is called. - // Thus, this function always returns true (unlike other transports) and - // there is never any need to schedule a closure - return true; -} - -static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, - grpc_slice *slice) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - if (stream->shutdown_error != GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(stream->shutdown_error); - } - *slice = grpc_slice_buffer_take_first(&stream->le->sb); - return GRPC_ERROR_NONE; -} - -static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, - grpc_error *error) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - GRPC_ERROR_UNREF(stream->shutdown_error); - stream->shutdown_error = error; -} - -static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - sb_list_entry_destroy(exec_ctx, stream->le); - GRPC_ERROR_UNREF(stream->shutdown_error); -} - -static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = { - inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull, - inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy}; - -void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, - sb_list_entry *le) { - s->base.length = (uint32_t)le->sb.length; - s->base.flags = 0; - s->base.vtable = &inproc_slice_byte_stream_vtable; - s->le = le; - s->shutdown_error = GRPC_ERROR_NONE; -} +static grpc_closure do_nothing_closure; +static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, + grpc_error *error); +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void ref_transport(inproc_transport *t) { INPROC_LOG(GPR_DEBUG, "ref_transport %p", t); @@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s, static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s); - slice_buffer_list_destroy(exec_ctx, &s->to_read_message); - slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message); GRPC_ERROR_UNREF(s->write_buffer_cancel_error); GRPC_ERROR_UNREF(s->cancel_self_error); GRPC_ERROR_UNREF(s->cancel_other_error); + if (s->recv_inited) { + grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message); + } + unref_transport(exec_ctx, s->t); if (s->closure_at_destroy) { @@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { } } -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client, bool is_initial) { for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL; @@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->write_buffer_initial_md_filled = false; grpc_metadata_batch_init(&s->write_buffer_trailing_md); s->write_buffer_trailing_md_filled = false; - slice_buffer_list_init(&s->to_read_message); - slice_buffer_list_init(&s->write_buffer_message); - s->reads_needed = false; - s->read_closure_scheduled = false; - GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s, + s->ops_needed = false; + s->op_closure_scheduled = false; + GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s, grpc_schedule_on_exec_ctx); s->t = t; s->closure_at_destroy = NULL; @@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md); cs->write_buffer_initial_md_filled = false; } - while (!slice_buffer_list_empty(&cs->write_buffer_message)) { - slice_buffer_list_append_entry( - &s->to_read_message, - slice_buffer_list_pophead(&cs->write_buffer_message)); - } if (cs->write_buffer_trailing_md_filled) { fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0, &s->to_read_trailing_md, NULL, @@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, } } +// Call the on_complete closure associated with this stream_op_batch if +// this stream_op_batch is only one of the pending operations for this +// stream. This is called when one of the pending operations for the stream +// is done and about to be NULLed out +static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *s, grpc_error *error, + grpc_transport_stream_op_batch *op, + const char *msg) { + int is_sm = (int)(op == s->send_message_op); + int is_stm = (int)(op == s->send_trailing_md_op); + int is_rim = (int)(op == s->recv_initial_md_op); + int is_rm = (int)(op == s->recv_message_op); + int is_rtm = (int)(op == s->recv_trailing_md_op); + + if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { + INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error)); + } +} + +static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *s, + grpc_error *error) { + if (s && s->ops_needed && !s->op_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error)); + s->op_closure_scheduled = true; + s->ops_needed = false; + } +} + static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_error *error) { - INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s); // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata if (!s->trailing_md_sent) { @@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(error); } - if (other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, - GRPC_ERROR_REF(error)); - other->read_closure_scheduled = true; - } - other->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, other, error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(error); } @@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, err); // Last use of err so no need to REF and then UNREF it - if ((s->recv_initial_md_op != s->recv_message_op) && - (s->recv_initial_md_op != s->recv_trailing_md_op)) { - INPROC_LOG(GPR_DEBUG, - "fail_helper %p scheduling initial-metadata-on-complete %p", - error, s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, - GRPC_ERROR_REF(error)); - } + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_initial_md_op, + "fail_helper scheduling recv-initial-metadata-on-complete"); s->recv_initial_md_op = NULL; } if (s->recv_message_op) { @@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p", - s, error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(error)); - } + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_message_op, + "fail_helper scheduling recv-message-on-complete"); s->recv_message_op = NULL; } + if (s->send_message_op) { + complete_if_batch_end_locked( + exec_ctx, s, error, s->send_message_op, + "fail_helper scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + if (s->send_trailing_md_op) { + complete_if_batch_end_locked( + exec_ctx, s, error, s->send_trailing_md_op, + "fail_helper scheduling send-trailng-md-on-complete"); + s->send_trailing_md_op = NULL; + } if (s->recv_trailing_md_op) { INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(error)); + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_trailing_md_op, + "fail_helper scheduling recv-trailing-metadata-on-complete"); s->recv_trailing_md_op = NULL; } close_other_side_locked(exec_ctx, s, "fail_helper:other_side"); @@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_ERROR_UNREF(error); } -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void message_transfer_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *sender, + inproc_stream *receiver) { + size_t remaining = + sender->send_message_op->payload->send_message.send_message->length; + if (receiver->recv_inited) { + grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message); + } + grpc_slice_buffer_init(&receiver->recv_message); + receiver->recv_inited = true; + do { + grpc_slice message_slice; + grpc_closure unused; + GPR_ASSERT(grpc_byte_stream_next( + exec_ctx, sender->send_message_op->payload->send_message.send_message, + SIZE_MAX, &unused)); + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, sender->send_message_op->payload->send_message.send_message, + &message_slice); + if (error != GRPC_ERROR_NONE) { + cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error)); + break; + } + GPR_ASSERT(error == GRPC_ERROR_NONE); + remaining -= GRPC_SLICE_LENGTH(message_slice); + grpc_slice_buffer_add(&receiver->recv_message, message_slice); + } while (remaining > 0); + + grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, + 0); + *receiver->recv_message_op->payload->recv_message.recv_message = + &receiver->recv_stream.base; + INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready", + receiver); + GRPC_CLOSURE_SCHED( + exec_ctx, + receiver->recv_message_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); + complete_if_batch_end_locked( + exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op, + "message_transfer scheduling sender on_complete"); + complete_if_batch_end_locked( + exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op, + "message_transfer scheduling receiver on_complete"); + + receiver->recv_message_op = NULL; + sender->send_message_op = NULL; +} + +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures - // and then return to reads_needed state if still needed + // and then return to ops_needed state if still needed // Since this is a closure directly invoked by the combiner, it should not // unref the error parameter explicitly; the combiner will do that implicitly @@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, bool needs_close = false; - INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg); inproc_stream *s = (inproc_stream *)arg; gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed gpr_mu_lock(mu); - s->read_closure_scheduled = false; + s->op_closure_scheduled = false; // cancellation takes precedence + inproc_stream *other = s->other_side; + if (s->cancel_self_error != GRPC_ERROR_NONE) { fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error)); goto done; @@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, goto done; } - if (s->recv_initial_md_op) { - if (!s->to_read_initial_md_filled) { - // We entered the state machine on some other kind of read even though - // we still haven't satisfied initial md . That's an error. - new_err = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing"); - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for no " - "initial md %p", - s, new_err); + if (s->send_message_op && other) { + if (other->recv_message_op) { + message_transfer_locked(exec_ctx, s, other); + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + } else if (!s->t->is_client && + (s->trailing_md_sent || other->recv_trailing_md_op)) { + // A server send will never be matched if the client is waiting + // for trailing metadata already + complete_if_batch_end_locked( + exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + } + // Pause a send trailing metadata if there is still an outstanding + // send message unless we know that the send message will never get + // matched to a receive. This happens on the client if the server has + // already sent status. + if (s->send_trailing_md_op && + (!s->send_message_op || + (s->t->is_client && + (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { + grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + : &other->to_read_trailing_md; + bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + : &other->to_read_trailing_md_filled; + if (*destfilled || s->trailing_md_sent) { + // The buffer is already in use; that's an error! + INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); + new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; - } else if (s->initial_md_recvd) { + } else { + if (other && !other->closed) { + fill_in_metadata(exec_ctx, s, + s->send_trailing_md_op->payload->send_trailing_metadata + .send_trailing_metadata, + 0, dest, NULL, destfilled); + } + s->trailing_md_sent = true; + if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling trailing-md-on-complete", s); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_ERROR_NONE); + s->recv_trailing_md_op = NULL; + needs_close = true; + } + } + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + complete_if_batch_end_locked( + exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op, + "op_state_machine scheduling send-trailing-metadata-on-complete"); + s->send_trailing_md_op = NULL; + } + if (s->recv_initial_md_op) { + if (s->initial_md_recvd) { new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md"); INPROC_LOG( GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for already " + "op_state_machine %p scheduling on_complete errors for already " "recvd initial md %p", s, new_err); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; } - s->initial_md_recvd = true; - new_err = fill_in_metadata( - exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, - s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata, - s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL); - s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata - ->deadline = s->deadline; - grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); - s->to_read_initial_md_filled = false; - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling initial-metadata-ready %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, - s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata_ready, - GRPC_ERROR_REF(new_err)); - if ((s->recv_initial_md_op != s->recv_message_op) && - (s->recv_initial_md_op != s->recv_trailing_md_op)) { - INPROC_LOG( - GPR_DEBUG, - "read_state_machine %p scheduling initial-metadata-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, - GRPC_ERROR_REF(new_err)); - } - s->recv_initial_md_op = NULL; - - if (new_err != GRPC_ERROR_NONE) { + if (s->to_read_initial_md_filled) { + s->initial_md_recvd = true; + new_err = fill_in_metadata( + exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata, + s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, + NULL); + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata->deadline = s->deadline; + grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); + s->to_read_initial_md_filled = false; INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors2 %p", s, + "op_state_machine %p scheduling initial-metadata-ready %p", s, new_err); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); - goto done; + GRPC_CLOSURE_SCHED(exec_ctx, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata_ready, + GRPC_ERROR_REF(new_err)); + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_initial_md_op, + "op_state_machine scheduling recv-initial-metadata-on-complete"); + s->recv_initial_md_op = NULL; + + if (new_err != GRPC_ERROR_NONE) { + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling on_complete errors2 %p", s, + new_err); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } } } - if (s->to_read_initial_md_filled) { - new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame"); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); - goto done; - } - if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) { - inproc_slice_byte_stream_init( - &s->recv_message_stream, - slice_buffer_list_pophead(&s->to_read_message)); - *s->recv_message_op->payload->recv_message.recv_message = - &s->recv_message_stream.base; - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); - GRPC_CLOSURE_SCHED( - exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); + if (s->recv_message_op) { + if (other && other->send_message_op) { + message_transfer_locked(exec_ctx, other, s); + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); } - s->recv_message_op = NULL; + } + if (s->recv_trailing_md_op && s->t->is_client && other && + other->send_message_op) { + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); } if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { @@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md"); INPROC_LOG( GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for already " + "op_state_machine %p scheduling on_complete errors for already " "recvd trailing md %p", s, new_err); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); @@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, if (s->recv_message_op != NULL) { // This message needs to be wrapped up because it will never be // satisfied - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", - s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); - } + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_message_op, + "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = NULL; } + if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { + // Nothing further will try to receive from this stream, so finish off + // any outstanding send_message op + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } if (s->recv_trailing_md_op != NULL) { // We wanted trailing metadata and we got it s->trailing_md_recvd = true; @@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, // (If the server hasn't already sent its trailing md, it doesn't have // a final status, so don't mark this op complete) if (s->t->is_client || s->trailing_md_sent) { - INPROC_LOG( - GPR_DEBUG, - "read_state_machine %p scheduling trailing-md-on-complete %p", s, - new_err); + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling trailing-md-on-complete %p", + s, new_err); GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = NULL; needs_close = true; } else { INPROC_LOG(GPR_DEBUG, - "read_state_machine %p server needs to delay handling " + "op_state_machine %p server needs to delay handling " "trailing-md-on-complete %p", s, new_err); } } else { INPROC_LOG( GPR_DEBUG, - "read_state_machine %p has trailing md but not yet waiting for it", - s); + "op_state_machine %p has trailing md but not yet waiting for it", s); } } if (s->trailing_md_recvd && s->recv_message_op) { // No further message will come on this stream, so finish off the // recv_message_op - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); - } + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_message_op, + "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = NULL; } - if (s->recv_message_op || s->recv_trailing_md_op) { + if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) && + s->send_message_op) { + // Nothing further will try to receive from this stream, so finish off + // any outstanding send_message op + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || + s->recv_message_op || s->recv_trailing_md_op) { // Didn't get the item we wanted so we still need to get // rescheduled - INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s, - s->recv_message_op, s->recv_trailing_md_op); - s->reads_needed = true; + INPROC_LOG( + GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s, + s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, + s->recv_message_op, s->recv_trailing_md_op); + s->ops_needed = true; } done: if (needs_close) { - close_other_side_locked(exec_ctx, s, "read_state_machine"); + close_other_side_locked(exec_ctx, s, "op_state_machine"); close_stream_locked(exec_ctx, s); } gpr_mu_unlock(mu); GRPC_ERROR_UNREF(new_err); } -static grpc_closure do_nothing_closure; - static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_error *error) { bool ret = false; // was the cancel accepted @@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (s->cancel_self_error == GRPC_ERROR_NONE) { ret = true; s->cancel_self_error = GRPC_ERROR_REF(error); - if (s->reads_needed) { - if (!s->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, - GRPC_ERROR_REF(s->cancel_self_error)); - s->read_closure_scheduled = true; - } - s->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error); // Send trailing md to the other side indicating cancellation, even if we // already have s->trailing_md_sent = true; @@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); } - if (other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, - GRPC_ERROR_REF(other->cancel_other_error)); - other->read_closure_scheduled = true; - } - other->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, other, + other->cancel_other_error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); } @@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, // couldn't complete that because we hadn't yet sent out trailing // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "cancel_stream %p scheduling trailing-md-on-complete %p", s, - s->cancel_self_error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(s->cancel_self_error)); + complete_if_batch_end_locked( + exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op, + "cancel_stream scheduling trailing-md-on-complete"); s->recv_trailing_md_op = NULL; } } @@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, // already self-canceled so still give it an error error = GRPC_ERROR_REF(s->cancel_self_error); } else { - INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s, + INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s, + s->t->is_client ? "client" : "server", op->send_initial_metadata ? " send_initial_metadata" : "", op->send_message ? " send_message" : "", op->send_trailing_metadata ? " send_trailing_metadata" : "", @@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, bool needs_close = false; + inproc_stream *other = s->other_side; if (error == GRPC_ERROR_NONE && - (op->send_initial_metadata || op->send_message || - op->send_trailing_metadata)) { - inproc_stream *other = s->other_side; + (op->send_initial_metadata || op->send_trailing_metadata)) { if (s->t->is_closed) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); } @@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->initial_md_sent = true; } } - } - if (error == GRPC_ERROR_NONE && op->send_message) { - size_t remaining = op->payload->send_message.send_message->length; - grpc_slice_buffer *dest = slice_buffer_list_append( - (other == NULL) ? &s->write_buffer_message : &other->to_read_message); - do { - grpc_slice message_slice; - grpc_closure unused; - GPR_ASSERT(grpc_byte_stream_next(exec_ctx, - op->payload->send_message.send_message, - SIZE_MAX, &unused)); - error = grpc_byte_stream_pull( - exec_ctx, op->payload->send_message.send_message, &message_slice); - if (error != GRPC_ERROR_NONE) { - cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error)); - break; - } - GPR_ASSERT(error == GRPC_ERROR_NONE); - remaining -= GRPC_SLICE_LENGTH(message_slice); - grpc_slice_buffer_add(dest, message_slice); - } while (remaining != 0); - grpc_byte_stream_destroy(exec_ctx, - op->payload->send_message.send_message); - } - if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { - grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md - : &other->to_read_trailing_md; - bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled - : &other->to_read_trailing_md_filled; - if (*destfilled || s->trailing_md_sent) { - // The buffer is already in use; that's an error! - INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); - } else { - if (!other->closed) { - fill_in_metadata( - exec_ctx, s, - op->payload->send_trailing_metadata.send_trailing_metadata, 0, - dest, NULL, destfilled); - } - s->trailing_md_sent = true; - if (!s->t->is_client && s->trailing_md_recvd && - s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "perform_stream_op %p scheduling trailing-md-on-complete", - s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_NONE); - s->recv_trailing_md_op = NULL; - needs_close = true; - } - } - } - if (other != NULL && other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error); - other->read_closure_scheduled = true; - } - other->reads_needed = false; + maybe_schedule_op_closure_locked(exec_ctx, other, error); } } + if (error == GRPC_ERROR_NONE && - (op->recv_initial_metadata || op->recv_message || + (op->send_message || op->send_trailing_metadata || + op->recv_initial_metadata || op->recv_message || op->recv_trailing_metadata)) { - // If there are any reads, mark it so that the read closure will react to - // them + // Mark ops that need to be processed by the closure + if (op->send_message) { + s->send_message_op = op; + } + if (op->send_trailing_metadata) { + s->send_trailing_md_op = op; + } if (op->recv_initial_metadata) { s->recv_initial_md_op = op; } @@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } // We want to initiate the closure if: - // 1. There is initial metadata and something ready to take that - // 2. There is a message and something ready to take it - // 3. There is trailing metadata, even if nothing specifically wants - // that because that can shut down the message as well - if ((s->to_read_initial_md_filled && op->recv_initial_metadata) || - ((!slice_buffer_list_empty(&s->to_read_message) || - s->trailing_md_recvd) && - op->recv_message) || - (s->to_read_trailing_md_filled)) { - if (!s->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE); - s->read_closure_scheduled = true; + // 1. We want to send a message and the other side wants to receive or end + // 2. We want to send trailing metadata and there isn't an unmatched send + // 3. We want initial metadata and the other side has sent it + // 4. We want to receive a message and there is a message ready + // 5. There is trailing metadata, even if nothing specifically wants + // that because that can shut down the receive message as well + if ((op->send_message && other && ((other->recv_message_op != NULL) || + (other->recv_trailing_md_op != NULL))) || + (op->send_trailing_metadata && !op->send_message) || + (op->recv_initial_metadata && s->to_read_initial_md_filled) || + (op->recv_message && (other && other->send_message_op != NULL)) || + (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { + if (!s->op_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE); + s->op_closure_scheduled = true; } } else { - s->reads_needed = true; + s->ops_needed = true; } } else { if (error != GRPC_ERROR_NONE) { - // Schedule op's read closures that we didn't push to read state machine + // Schedule op's closures that we didn't push to op state machine if (op->recv_initial_metadata) { INPROC_LOG( GPR_DEBUG, |