diff options
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 5 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 504 | ||||
-rw-r--r-- | src/core/transport/stream_op.c | 38 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 46 | ||||
-rw-r--r-- | src/core/transport/transport.c | 42 | ||||
-rw-r--r-- | src/core/transport/transport.h | 107 | ||||
-rw-r--r-- | src/core/transport/transport_impl.h | 16 | ||||
-rw-r--r-- | src/core/transport/transport_op_string.c | 164 |
8 files changed, 569 insertions, 353 deletions
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 5ca31d6bc7..cf1e66bf8b 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -481,7 +481,6 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, break; case GRPC_OP_METADATA: grpc_metadata_batch_assert_ok(&op->data.metadata); - case GRPC_OP_FLOW_CTL_CB: /* these just get copied as they don't impact the number of flow controlled bytes */ grpc_sopb_append(outops, op, 1); @@ -567,10 +566,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, GPR_ERROR, "These stream ops should be filtered out by grpc_chttp2_preencode"); abort(); - case GRPC_OP_FLOW_CTL_CB: - op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK); - curop++; - break; case GRPC_OP_METADATA: /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index e32ee284e0..3ae693176e 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -91,10 +91,9 @@ typedef enum { /* streams that are waiting to start because there are too many concurrent streams on the connection */ WAITING_FOR_CONCURRENCY, - /* streams that want to callback the application */ - PENDING_CALLBACKS, - /* streams that *ARE* calling back to the application */ - EXECUTING_CALLBACKS, + /* streams that have finished reading: we wait until unlock to coalesce + all changes into one callback */ + FINISHED_READ_OP, STREAM_LIST_COUNT /* must be last */ } stream_list_id; @@ -141,6 +140,12 @@ typedef enum { DTS_FRAME } deframe_transport_state; +typedef enum { + WRITE_STATE_OPEN, + WRITE_STATE_QUEUED_CLOSE, + WRITE_STATE_SENT_CLOSE +} WRITE_STATE; + typedef struct { stream *head; stream *tail; @@ -182,6 +187,18 @@ typedef struct { gpr_slice debug; } pending_goaway; +typedef struct { + void (*cb)(void *user_data, int success); + void *user_data; + int success; +} op_closure; + +typedef struct { + op_closure *callbacks; + size_t count; + size_t capacity; +} op_closure_array; + struct transport { grpc_transport base; /* must be first */ const grpc_transport_callbacks *cb; @@ -202,6 +219,10 @@ struct transport { gpr_uint8 closed; error_state error_state; + /* queued callbacks */ + op_closure_array pending_callbacks; + op_closure_array executing_callbacks; + /* stream indexing */ gpr_uint32 next_stream_id; gpr_uint32 last_incoming_stream_id; @@ -281,13 +302,13 @@ struct stream { /* when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - gpr_uint8 queued_write_closed; - gpr_uint8 sending_write_closed; - gpr_uint8 sent_write_closed; + WRITE_STATE write_state; + gpr_uint8 send_closed; gpr_uint8 read_closed; gpr_uint8 cancelled; - gpr_uint8 allow_window_updates; - gpr_uint8 published_close; + + op_closure send_done_closure; + op_closure recv_done_closure; stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; @@ -296,10 +317,14 @@ struct stream { grpc_linked_mdelem *incoming_metadata; size_t incoming_metadata_count; size_t incoming_metadata_capacity; + grpc_linked_mdelem *old_incoming_metadata; gpr_timespec incoming_deadline; /* sops from application */ - grpc_stream_op_buffer outgoing_sopb; + grpc_stream_op_buffer *outgoing_sopb; + grpc_stream_op_buffer *incoming_sopb; + grpc_stream_state *publish_state; + grpc_stream_state published_state; /* sops that have passed flow control to be written */ grpc_stream_op_buffer writing_sopb; @@ -337,7 +362,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id, grpc_chttp2_error_code error_code, int send_rst); static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, - grpc_chttp2_error_code error_code, int send_rst); + grpc_chttp2_error_code error_code, + grpc_mdstr *optional_message, int send_rst); static void finalize_cancellations(transport *t); static stream *lookup_stream(transport *t, gpr_uint32 id); static void remove_from_stream_map(transport *t, stream *s); @@ -348,6 +374,14 @@ static void become_skip_parser(transport *t); static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error); +static void schedule_cb(transport *t, op_closure closure, int success); +static void maybe_finish_read(transport *t, stream *s); +static void maybe_join_window_updates(transport *t, stream *s); +static void finish_reads(transport *t); +static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); +static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op); +static void add_metadata_batch(transport *t, stream *s); + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -387,6 +421,9 @@ static void destruct_transport(transport *t) { } gpr_free(t->pings); + gpr_free(t->pending_callbacks.callbacks); + gpr_free(t->executing_callbacks.callbacks); + for (i = 0; i < t->num_pending_goaways; i++) { gpr_slice_unref(t->pending_goaways[i].debug); } @@ -416,6 +453,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); + memset(t, 0, sizeof(*t)); + t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ @@ -427,27 +466,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); t->reading = 1; - t->writing = 0; t->error_state = ERROR_STATE_NONE; t->next_stream_id = is_client ? 1 : 2; - t->last_incoming_stream_id = 0; - t->destroying = 0; - t->closed = 0; t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; - t->expect_continuation_stream_id = 0; - t->pings = NULL; - t->ping_count = 0; - t->ping_capacity = 0; t->ping_counter = gpr_now().tv_nsec; grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); grpc_chttp2_goaway_parser_init(&t->goaway_parser); - t->pending_goaways = NULL; - t->num_pending_goaways = 0; - t->cap_pending_goaways = 0; gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->qbuf); grpc_sopb_init(&t->nuke_later_sopb); @@ -462,7 +490,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, needed. TODO(ctiller): tune this */ grpc_chttp2_stream_map_init(&t->stream_map, 8); - memset(&t->lists, 0, sizeof(t->lists)); /* copy in initial settings to all setting sets */ for (i = 0; i < NUM_SETTING_SETS; i++) { @@ -503,7 +530,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, gpr_mu_lock(&t->mu); t->calling_back = 1; - ref_transport(t); + ref_transport(t); /* matches unref at end of this function */ gpr_mu_unlock(&t->mu); sr = setup(arg, &t->base, t->metadata_context); @@ -515,7 +542,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); - ref_transport(t); + ref_transport(t); /* matches unref inside recv_data */ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); unref_transport(t); @@ -573,16 +600,19 @@ static void goaway(grpc_transport *gt, grpc_status_code status, } static int init_stream(grpc_transport *gt, grpc_stream *gs, - const void *server_data) { + const void *server_data, grpc_transport_op *initial_op) { transport *t = (transport *)gt; stream *s = (stream *)gs; + memset(s, 0, sizeof(*s)); + ref_transport(t); if (!server_data) { lock(t); s->id = 0; } else { + /* already locked */ s->id = (gpr_uint32)(gpr_uintptr)server_data; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); @@ -592,24 +622,13 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->incoming_window = t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->queued_write_closed = 0; - s->sending_write_closed = 0; - s->sent_write_closed = 0; - s->read_closed = 0; - s->cancelled = 0; - s->allow_window_updates = 0; - s->published_close = 0; - s->incoming_metadata_count = 0; - s->incoming_metadata_capacity = 0; - s->incoming_metadata = NULL; s->incoming_deadline = gpr_inf_future; - memset(&s->links, 0, sizeof(s->links)); - memset(&s->included, 0, sizeof(s->included)); - grpc_sopb_init(&s->outgoing_sopb); grpc_sopb_init(&s->writing_sopb); grpc_sopb_init(&s->callback_sopb); grpc_chttp2_data_parser_init(&s->parser); + if (initial_op) perform_op_locked(t, s, initial_op); + if (!server_data) { unlock(t); } @@ -642,10 +661,16 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_unlock(&t->mu); - grpc_sopb_destroy(&s->outgoing_sopb); + GPR_ASSERT(s->outgoing_sopb == NULL); + GPR_ASSERT(s->incoming_sopb == NULL); grpc_sopb_destroy(&s->writing_sopb); grpc_sopb_destroy(&s->callback_sopb); grpc_chttp2_data_parser_destroy(&s->parser); + for (i = 0; i < s->incoming_metadata_count; i++) { + grpc_mdelem_unref(s->incoming_metadata[i].md); + } + gpr_free(s->incoming_metadata); + gpr_free(s->old_incoming_metadata); unref_transport(t); } @@ -708,8 +733,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { } static void stream_list_join(transport *t, stream *s, stream_list_id id) { - if (id == PENDING_CALLBACKS) - GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE); if (s->included[id]) { return; } @@ -718,6 +741,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) { static void remove_from_stream_map(transport *t, stream *s) { if (s->id == 0) return; + IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d", + t->is_client ? "CLI" : "SVR", s->id)); if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) { maybe_start_some_streams(t); } @@ -762,6 +787,8 @@ static void unlock(transport *t) { finalize_cancellations(t); } + finish_reads(t); + /* gather any callbacks that need to be made */ if (!t->calling_back && cb) { perform_callbacks = prepare_callbacks(t); @@ -865,21 +892,24 @@ static int prepare_write(transport *t) { while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) && s->outgoing_window > 0) { window_delta = grpc_chttp2_preencode( - s->outgoing_sopb.ops, &s->outgoing_sopb.nops, + s->outgoing_sopb->ops, &s->outgoing_sopb->nops, GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); t->outgoing_window -= window_delta; s->outgoing_window -= window_delta; - s->sending_write_closed = - s->queued_write_closed && s->outgoing_sopb.nops == 0; - if (s->writing_sopb.nops > 0 || s->sending_write_closed) { + if (s->write_state == WRITE_STATE_QUEUED_CLOSE && + s->outgoing_sopb->nops == 0) { + s->send_closed = 1; + } + if (s->writing_sopb.nops > 0 || s->send_closed) { stream_list_join(t, s, WRITING); } - /* if there are still writes to do and the stream still has window - available, then schedule a further write */ - if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) { - GPR_ASSERT(!t->outgoing_window); + /* we should either exhaust window or have no ops left, but not both */ + if (s->outgoing_sopb->nops == 0) { + s->outgoing_sopb = NULL; + schedule_cb(t, s->send_done_closure, 1); + } else if (s->outgoing_window) { stream_list_add_tail(t, s, WRITABLE); } } @@ -912,10 +942,9 @@ static void finalize_outbuf(transport *t) { while ((s = stream_list_remove_head(t, WRITING))) { grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, - s->sending_write_closed, s->id, &t->hpack_compressor, - &t->outbuf); + s->send_closed, s->id, &t->hpack_compressor, &t->outbuf); s->writing_sopb.nops = 0; - if (s->sending_write_closed) { + if (s->send_closed) { stream_list_join(t, s, WRITTEN_CLOSED); } } @@ -929,8 +958,10 @@ static void finish_write_common(transport *t, int success) { drop_connection(t); } while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { - s->sent_write_closed = 1; - if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS); + s->write_state = WRITE_STATE_SENT_CLOSE; + if (1||!s->cancelled) { + maybe_finish_read(t, s); + } } t->outbuf.count = 0; t->outbuf.length = 0; @@ -980,6 +1011,9 @@ static void maybe_start_some_streams(transport *t) { stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); if (!s) break; + IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", + t->is_client ? "CLI" : "SVR", s, t->next_stream_id)); + GPR_ASSERT(s->id == 0); s->id = t->next_stream_id; t->next_stream_id += 2; @@ -988,43 +1022,63 @@ static void maybe_start_some_streams(transport *t) { } } -static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, - size_t ops_count, int is_last) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; - - lock(t); - - if (is_last) { - s->queued_write_closed = 1; - } - if (!s->cancelled) { - grpc_sopb_append(&s->outgoing_sopb, ops, ops_count); - if (s->id == 0) { - stream_list_join(t, s, WAITING_FOR_CONCURRENCY); - maybe_start_some_streams(t); +static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { + if (op->cancel_with_status != GRPC_STATUS_OK) { + cancel_stream( + t, s, op->cancel_with_status, + grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), + op->cancel_message, 1); + } + + if (op->send_ops) { + GPR_ASSERT(s->outgoing_sopb == NULL); + s->send_done_closure.cb = op->on_done_send; + s->send_done_closure.user_data = op->send_user_data; + if (!s->cancelled) { + s->outgoing_sopb = op->send_ops; + if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) { + s->write_state = WRITE_STATE_QUEUED_CLOSE; + } + if (s->id == 0) { + IF_TRACING(gpr_log(GPR_DEBUG, + "HTTP:%s: New stream %p waiting for concurrency", + t->is_client ? "CLI" : "SVR", s)); + stream_list_join(t, s, WAITING_FOR_CONCURRENCY); + maybe_start_some_streams(t); + } else if (s->outgoing_window > 0) { + stream_list_join(t, s, WRITABLE); + } } else { - stream_list_join(t, s, WRITABLE); + schedule_nuke_sopb(t, op->send_ops); + schedule_cb(t, s->send_done_closure, 0); } - } else { - grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count); } - if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed && - !s->published_close) { - stream_list_join(t, s, PENDING_CALLBACKS); + + if (op->recv_ops) { + GPR_ASSERT(s->incoming_sopb == NULL); + s->recv_done_closure.cb = op->on_done_recv; + s->recv_done_closure.user_data = op->recv_user_data; + s->incoming_sopb = op->recv_ops; + s->incoming_sopb->nops = 0; + s->publish_state = op->recv_state; + gpr_free(s->old_incoming_metadata); + s->old_incoming_metadata = NULL; + maybe_finish_read(t, s); + maybe_join_window_updates(t, s); } - unlock(t); + if (op->bind_pollset) { + add_to_pollset_locked(t, op->bind_pollset); + } } -static void abort_stream(grpc_transport *gt, grpc_stream *gs, - grpc_status_code status) { +static void perform_op(grpc_transport *gt, grpc_stream *gs, + grpc_transport_op *op) { transport *t = (transport *)gt; stream *s = (stream *)gs; lock(t); - cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status), - 1); + perform_op_locked(t, s, op); unlock(t); } @@ -1063,8 +1117,8 @@ static void finalize_cancellations(transport *t) { while ((s = stream_list_remove_head(t, CANCELLED))) { s->read_closed = 1; - s->sent_write_closed = 1; - stream_list_join(t, s, PENDING_CALLBACKS); + s->write_state = WRITE_STATE_SENT_CLOSE; + maybe_finish_read(t, s); } } @@ -1082,18 +1136,24 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) { static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, - int send_rst) { + grpc_mdstr *optional_message, int send_rst) { int had_outgoing; char buffer[GPR_LTOA_MIN_BUFSIZE]; if (s) { /* clear out any unreported input & output: nobody cares anymore */ - had_outgoing = s->outgoing_sopb.nops != 0; + had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0; schedule_nuke_sopb(t, &s->parser.incoming_sopb); - schedule_nuke_sopb(t, &s->outgoing_sopb); + if (s->outgoing_sopb) { + schedule_nuke_sopb(t, s->outgoing_sopb); + s->outgoing_sopb = NULL; + stream_list_remove(t, s, WRITABLE); + schedule_cb(t, s->send_done_closure, 0); + } if (s->cancelled) { send_rst = 0; - } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) { + } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE || + had_outgoing) { s->cancelled = 1; stream_list_join(t, s, CANCELLED); @@ -1101,17 +1161,26 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, add_incoming_metadata( t, s, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); - switch (local_status) { - case GRPC_STATUS_CANCELLED: - add_incoming_metadata( - t, s, grpc_mdelem_from_strings(t->metadata_context, - "grpc-message", "Cancelled")); - break; - default: - break; + if (!optional_message) { + switch (local_status) { + case GRPC_STATUS_CANCELLED: + add_incoming_metadata( + t, s, grpc_mdelem_from_strings(t->metadata_context, + "grpc-message", "Cancelled")); + break; + default: + break; + } + } else { + add_incoming_metadata( + t, s, + grpc_mdelem_from_metadata_strings( + t->metadata_context, + grpc_mdstr_from_string(t->metadata_context, "grpc-message"), + grpc_mdstr_ref(optional_message))); } - - stream_list_join(t, s, PENDING_CALLBACKS); + add_metadata_batch(t, s); + maybe_finish_read(t, s); } } if (!id) send_rst = 0; @@ -1119,24 +1188,29 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(id, error_code)); } + if (optional_message) { + grpc_mdstr_unref(optional_message); + } } static void cancel_stream_id(transport *t, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, int send_rst) { cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code, - send_rst); + NULL, send_rst); } static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, - grpc_chttp2_error_code error_code, int send_rst) { - cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst); + grpc_chttp2_error_code error_code, + grpc_mdstr *optional_message, int send_rst) { + cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, + send_rst); } static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) { cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE, - GRPC_CHTTP2_INTERNAL_ERROR, 0); + GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0); } static void end_all_the_calls(transport *t) { @@ -1150,8 +1224,14 @@ static void drop_connection(transport *t) { end_all_the_calls(t); } +static void maybe_finish_read(transport *t, stream *s) { + if (s->incoming_sopb) { + stream_list_join(t, s, FINISHED_READ_OP); + } +} + static void maybe_join_window_updates(transport *t, stream *s) { - if (s->allow_window_updates && + if (s->incoming_sopb != NULL && s->incoming_window < t->settings[LOCAL_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] * @@ -1160,21 +1240,6 @@ static void maybe_join_window_updates(transport *t, stream *s) { } } -static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp, - int allow) { - transport *t = (transport *)tp; - stream *s = (stream *)sp; - - lock(t); - s->allow_window_updates = allow; - if (allow) { - maybe_join_window_updates(t, s); - } else { - stream_list_remove(t, s, WINDOW_UPDATE); - } - unlock(t); -} - static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { if (t->incoming_frame_size > t->incoming_window) { gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", @@ -1248,7 +1313,7 @@ static int init_data_frame_parser(transport *t) { case GRPC_CHTTP2_STREAM_ERROR: cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( GRPC_CHTTP2_INTERNAL_ERROR), - GRPC_CHTTP2_INTERNAL_ERROR, 1); + GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1); return init_skip_frame(t, 0); case GRPC_CHTTP2_CONNECTION_ERROR: drop_connection(t); @@ -1267,11 +1332,10 @@ static void on_header(void *tp, grpc_mdelem *md) { GPR_ASSERT(s); - IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id, - grpc_mdstr_as_c_string(md->key), - grpc_mdstr_as_c_string(md->value))); + IF_TRACING(gpr_log( + GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); - stream_list_join(t, s, PENDING_CALLBACKS); if (md->key == t->str_grpc_timeout) { gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); if (!cached_timeout) { @@ -1290,6 +1354,7 @@ static void on_header(void *tp, grpc_mdelem *md) { } else { add_incoming_metadata(t, s, md); } + maybe_finish_read(t, s); } static int init_header_frame_parser(transport *t, int is_continuation) { @@ -1327,7 +1392,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) { gpr_log(GPR_ERROR, "ignoring out of order new stream request on server; last stream " "id=%d, new stream id=%d", - t->last_incoming_stream_id, t->incoming_stream); + t->last_incoming_stream_id, t->incoming_stream_id); + return init_skip_frame(t, 1); + } else if ((t->incoming_stream_id & 1) == 0) { + gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id); return init_skip_frame(t, 1); } t->incoming_stream = NULL; @@ -1464,33 +1532,20 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { return window + window_update < MAX_WINDOW; } -static void free_md(void *p, grpc_op_error result) { gpr_free(p); } - static void add_metadata_batch(transport *t, stream *s) { grpc_metadata_batch b; - size_t i; - b.list.head = &s->incoming_metadata[0]; - b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1]; + b.list.head = NULL; + /* Store away the last element of the list, so that in patch_metadata_ops + we can reconstitute the list. + We can't do list building here as later incoming metadata may reallocate + the underlying array. */ + b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count; b.garbage.head = b.garbage.tail = NULL; b.deadline = s->incoming_deadline; - - for (i = 1; i < s->incoming_metadata_count; i++) { - s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1]; - s->incoming_metadata[i - 1].next = &s->incoming_metadata[i]; - } - s->incoming_metadata[0].prev = NULL; - s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL; + s->incoming_deadline = gpr_inf_future; grpc_sopb_add_metadata(&s->parser.incoming_sopb, b); - grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md, - s->incoming_metadata); - - /* reset */ - s->incoming_deadline = gpr_inf_future; - s->incoming_metadata = NULL; - s->incoming_metadata_count = 0; - s->incoming_metadata_capacity = 0; } static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { @@ -1501,14 +1556,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { case GRPC_CHTTP2_PARSE_OK: if (st.end_of_stream) { t->incoming_stream->read_closed = 1; - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.need_flush_reads) { - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.metadata_boundary) { add_metadata_batch(t, t->incoming_stream); - stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); + maybe_finish_read(t, t->incoming_stream); } if (st.ack_settings) { gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); @@ -1545,11 +1600,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { } if (st.initial_window_update) { for (i = 0; i < t->stream_map.count; i++) { - stream *s = (stream*)(t->stream_map.values[i]); + stream *s = (stream *)(t->stream_map.values[i]); int was_window_empty = s->outgoing_window <= 0; s->outgoing_window += st.initial_window_update; - if (was_window_empty && s->outgoing_window > 0 && - s->outgoing_sopb.nops > 0) { + if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb && + s->outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } @@ -1563,12 +1618,13 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { if (!is_window_update_legal(st.window_update, s->outgoing_window)) { cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( GRPC_CHTTP2_FLOW_CONTROL_ERROR), - GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1); + GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1); } else { s->outgoing_window += st.window_update; /* if this window update makes outgoing ops writable again, flag that */ - if (was_window_empty && s->outgoing_sopb.nops) { + if (was_window_empty && s->outgoing_sopb && + s->outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } @@ -1830,53 +1886,135 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, return GRPC_STREAM_OPEN; } -static int prepare_callbacks(transport *t) { - stream *s; - int n = 0; - while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { - int execute = 1; - - s->callback_state = compute_state(s->sent_write_closed, s->read_closed); - if (s->callback_state == GRPC_STREAM_CLOSED) { - remove_from_stream_map(t, s); - if (s->published_close) { - execute = 0; - } else if (s->incoming_metadata_count) { - add_metadata_batch(t, s); - } - s->published_close = 1; +static void patch_metadata_ops(stream *s) { + grpc_stream_op *ops = s->incoming_sopb->ops; + size_t nops = s->incoming_sopb->nops; + size_t i; + size_t j; + size_t mdidx = 0; + size_t last_mdidx; + int found_metadata = 0; + + /* rework the array of metadata into a linked list, making use + of the breadcrumbs we left in metadata batches during + add_metadata_batch */ + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + found_metadata = 1; + /* we left a breadcrumb indicating where the end of this list is, + and since we add sequentially, we know from the end of the last + segment where this segment begins */ + last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); + GPR_ASSERT(last_mdidx > mdidx); + GPR_ASSERT(last_mdidx <= s->incoming_metadata_count); + /* turn the array into a doubly linked list */ + op->data.metadata.list.head = &s->incoming_metadata[mdidx]; + op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1]; + for (j = mdidx + 1; j < last_mdidx; j++) { + s->incoming_metadata[j].prev = &s->incoming_metadata[j-1]; + s->incoming_metadata[j-1].next = &s->incoming_metadata[j]; + } + s->incoming_metadata[mdidx].prev = NULL; + s->incoming_metadata[last_mdidx-1].next = NULL; + /* track where we're up to */ + mdidx = last_mdidx; + } + if (found_metadata) { + s->old_incoming_metadata = s->incoming_metadata; + if (mdidx != s->incoming_metadata_count) { + /* we have a partially read metadata batch still in incoming_metadata */ + size_t new_count = s->incoming_metadata_count - mdidx; + size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count; + GPR_ASSERT(mdidx < s->incoming_metadata_count); + s->incoming_metadata = gpr_malloc(copy_bytes); + memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes); + s->incoming_metadata_count = s->incoming_metadata_capacity = new_count; + } else { + s->incoming_metadata = NULL; + s->incoming_metadata_count = 0; + s->incoming_metadata_capacity = 0; } + } +} - grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb); +static void finish_reads(transport *t) { + stream *s; - if (execute) { - stream_list_add_tail(t, s, EXECUTING_CALLBACKS); - n = 1; + while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { + int publish = 0; + GPR_ASSERT(s->incoming_sopb); + *s->publish_state = + compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); + if (*s->publish_state != s->published_state) { + s->published_state = *s->publish_state; + publish = 1; + if (s->published_state == GRPC_STREAM_CLOSED) { + remove_from_stream_map(t, s); + } + } + if (s->parser.incoming_sopb.nops > 0) { + grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); + publish = 1; + } + if (publish) { + if (s->incoming_metadata_count > 0) { + patch_metadata_ops(s); + } + s->incoming_sopb = NULL; + schedule_cb(t, s->recv_done_closure, 1); } } - return n; + +} + +static void schedule_cb(transport *t, op_closure closure, int success) { + if (t->pending_callbacks.capacity == t->pending_callbacks.count) { + t->pending_callbacks.capacity = + GPR_MAX(t->pending_callbacks.capacity * 2, 8); + t->pending_callbacks.callbacks = + gpr_realloc(t->pending_callbacks.callbacks, + t->pending_callbacks.capacity * + sizeof(*t->pending_callbacks.callbacks)); + } + closure.success = success; + t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure; +} + +static int prepare_callbacks(transport *t) { + op_closure_array temp = t->pending_callbacks; + t->pending_callbacks = t->executing_callbacks; + t->executing_callbacks = temp; + return t->executing_callbacks.count > 0; } static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { - stream *s; - while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) { - size_t nops = s->callback_sopb.nops; - s->callback_sopb.nops = 0; - cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, - s->callback_sopb.ops, nops, s->callback_state); + size_t i; + for (i = 0; i < t->executing_callbacks.count; i++) { + op_closure c = t->executing_callbacks.callbacks[i]; + c.cb(c.user_data, c.success); } + t->executing_callbacks.count = 0; } static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) { cb->closed(t->cb_user_data, &t->base); } -static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { - transport *t = (transport *)gt; - lock(t); +/* + * POLLSET STUFF + */ + +static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) { if (t->ep) { grpc_endpoint_add_to_pollset(t->ep, pollset); } +} + +static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { + transport *t = (transport *)gt; + lock(t); + add_to_pollset_locked(t, pollset); unlock(t); } @@ -1885,9 +2023,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { */ static const grpc_transport_vtable vtable = { - sizeof(stream), init_stream, send_batch, set_allow_window_updates, - add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, - send_ping, destroy_transport}; + sizeof(stream), init_stream, perform_op, + add_to_pollset, destroy_stream, goaway, + close_transport, send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void *arg, diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 882c078d51..e1a75adcb6 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -81,9 +81,6 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { case GRPC_OP_METADATA: grpc_metadata_batch_destroy(&ops[i].data.metadata); break; - case GRPC_OP_FLOW_CTL_CB: - ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR); - break; case GRPC_NO_OP: case GRPC_OP_BEGIN_MESSAGE: break; @@ -91,34 +88,20 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { } } -static void assert_contained_metadata_ok(grpc_stream_op *ops, size_t nops) { -#ifndef NDEBUG - size_t i; - for (i = 0; i < nops; i++) { - if (ops[i].type == GRPC_OP_METADATA) { - grpc_metadata_batch_assert_ok(&ops[i].data.metadata); - } - } -#endif /* NDEBUG */ -} - static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) { sopb->capacity = new_capacity; - assert_contained_metadata_ok(sopb->ops, sopb->nops); if (sopb->ops == sopb->inlined_ops) { sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity); memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op)); } else { sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity); } - assert_contained_metadata_ok(sopb->ops, sopb->nops); } static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { grpc_stream_op *out; - assert_contained_metadata_ok(sopb->ops, sopb->nops); - + GPR_ASSERT(sopb->nops <= sopb->capacity); if (sopb->nops == sopb->capacity) { expandto(sopb, GROW(sopb->capacity)); } @@ -129,7 +112,6 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) { add(sopb)->type = GRPC_NO_OP; - assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, @@ -138,34 +120,19 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, op->type = GRPC_OP_BEGIN_MESSAGE; op->data.begin_message.length = length; op->data.begin_message.flags = flags; - assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b) { grpc_stream_op *op = add(sopb); - grpc_metadata_batch_assert_ok(&b); op->type = GRPC_OP_METADATA; op->data.metadata = b; - grpc_metadata_batch_assert_ok(&op->data.metadata); - assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { grpc_stream_op *op = add(sopb); op->type = GRPC_OP_SLICE; op->data.slice = slice; - assert_contained_metadata_ok(sopb->ops, sopb->nops); -} - -void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, - void (*cb)(void *arg, grpc_op_error error), - void *arg) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_FLOW_CTL_CB; - op->data.flow_ctl_cb.cb = cb; - op->data.flow_ctl_cb.arg = arg; - assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, @@ -173,15 +140,12 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t orig_nops = sopb->nops; size_t new_nops = orig_nops + nops; - assert_contained_metadata_ok(ops, nops); - assert_contained_metadata_ok(sopb->ops, sopb->nops); if (new_nops > sopb->capacity) { expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops)); } memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops); sopb->nops = new_nops; - assert_contained_metadata_ok(sopb->ops, sopb->nops); } static void assert_valid_list(grpc_mdelem_list *list) { diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 20146b9af2..95497a3cc8 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -55,9 +55,7 @@ typedef enum grpc_stream_op_code { GRPC_OP_BEGIN_MESSAGE, /* Add a slice of data to the current message/metadata element/status. Must not overflow the forward declared length. */ - GRPC_OP_SLICE, - /* Call some function once this operation has passed flow control. */ - GRPC_OP_FLOW_CTL_CB + GRPC_OP_SLICE } grpc_stream_op_code; /* Arguments for GRPC_OP_BEGIN */ @@ -68,12 +66,6 @@ typedef struct grpc_begin_message { gpr_uint32 flags; } grpc_begin_message; -/* Arguments for GRPC_OP_FLOW_CTL_CB */ -typedef struct grpc_flow_ctl_cb { - void (*cb)(void *arg, grpc_op_error error); - void *arg; -} grpc_flow_ctl_cb; - typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; @@ -94,29 +86,31 @@ typedef struct grpc_metadata_batch { void grpc_metadata_batch_init(grpc_metadata_batch *comd); void grpc_metadata_batch_destroy(grpc_metadata_batch *comd); void grpc_metadata_batch_merge(grpc_metadata_batch *target, - grpc_metadata_batch *add); + grpc_metadata_batch *add); void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, - grpc_linked_mdelem *storage); + grpc_linked_mdelem *storage); void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, - grpc_linked_mdelem *storage); + grpc_linked_mdelem *storage); void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, - grpc_linked_mdelem *storage, - grpc_mdelem *elem_to_add); + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, - grpc_linked_mdelem *storage, - grpc_mdelem *elem_to_add); + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); void grpc_metadata_batch_filter(grpc_metadata_batch *comd, - grpc_mdelem *(*filter)(void *user_data, - grpc_mdelem *elem), - void *user_data); + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data); #ifndef NDEBUG void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd); #else -#define grpc_metadata_batch_assert_ok(comd) do {} while (0) +#define grpc_metadata_batch_assert_ok(comd) \ + do { \ + } while (0) #endif /* Represents a single operation performed on a stream/transport */ @@ -129,7 +123,6 @@ typedef struct grpc_stream_op { grpc_begin_message begin_message; grpc_metadata_batch metadata; gpr_slice slice; - grpc_flow_ctl_cb flow_ctl_cb; } data; } grpc_stream_op; @@ -160,15 +153,14 @@ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb); /* Append a GRPC_OP_BEGIN to a buffer */ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, gpr_uint32 flags); -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata); +void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, + grpc_metadata_batch metadata); /* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); -/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */ -void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, - void (*cb)(void *arg, grpc_op_error error), - void *arg); /* Append a buffer to a buffer - does not ref/unref any internal objects */ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t nops); -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */ +char *grpc_sopb_string(grpc_stream_op_buffer *sopb); + +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index ef0020dc58..d9a1319c42 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -52,18 +52,15 @@ void grpc_transport_destroy(grpc_transport *transport) { } int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - const void *server_data) { - return transport->vtable->init_stream(transport, stream, server_data); + const void *server_data, + grpc_transport_op *initial_op) { + return transport->vtable->init_stream(transport, stream, server_data, + initial_op); } -void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream, - grpc_stream_op *ops, size_t nops, int is_last) { - transport->vtable->send_batch(transport, stream, ops, nops, is_last); -} - -void grpc_transport_set_allow_window_updates(grpc_transport *transport, - grpc_stream *stream, int allow) { - transport->vtable->set_allow_window_updates(transport, stream, allow); +void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, + grpc_transport_op *op) { + transport->vtable->perform_op(transport, stream, op); } void grpc_transport_add_to_pollset(grpc_transport *transport, @@ -76,11 +73,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport, transport->vtable->destroy_stream(transport, stream); } -void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream, - grpc_status_code status) { - transport->vtable->abort_stream(transport, stream, status); -} - void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data), void *user_data) { transport->vtable->ping(transport, cb, user_data); @@ -93,3 +85,23 @@ void grpc_transport_setup_cancel(grpc_transport_setup *setup) { void grpc_transport_setup_initiate(grpc_transport_setup *setup) { setup->vtable->initiate(setup); } + +void grpc_transport_op_finish_with_failure(grpc_transport_op *op) { + if (op->send_ops) { + op->on_done_send(op->send_user_data, 0); + } + if (op->recv_ops) { + op->on_done_recv(op->recv_user_data, 0); + } +} + +void grpc_transport_op_add_cancellation(grpc_transport_op *op, + grpc_status_code status, + grpc_mdstr *message) { + if (op->cancel_with_status == GRPC_STATUS_OK) { + op->cancel_with_status = status; + op->cancel_message = message; + } else if (message) { + grpc_mdstr_unref(message); + } +} diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index ce8c17c322..cdea0b9a0b 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -60,26 +60,26 @@ typedef enum grpc_stream_state { GRPC_STREAM_CLOSED } grpc_stream_state; -/* Callbacks made from the transport to the upper layers of grpc. */ -struct grpc_transport_callbacks { - /* Allocate a buffer to receive data into. - It's safe to call grpc_slice_new() to do this, but performance minded - proxies may want to carefully place data into optimal locations for - transports. - This function must return a valid, non-empty slice. +/* Transport op: a set of operations to perform on a transport */ +typedef struct grpc_transport_op { + grpc_stream_op_buffer *send_ops; + int is_last_send; + void (*on_done_send)(void *user_data, int success); + void *send_user_data; - Arguments: - user_data - the transport user data set at transport creation time - transport - the grpc_transport instance making this call - stream - the grpc_stream instance the buffer will be used for, or - NULL if this is not known - size_hint - how big of a buffer would the transport optimally like? - the actual returned buffer can be smaller or larger than - size_hint as the implementation finds convenient */ - struct gpr_slice (*alloc_recv_buffer)(void *user_data, - grpc_transport *transport, - grpc_stream *stream, size_t size_hint); + grpc_stream_op_buffer *recv_ops; + grpc_stream_state *recv_state; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; + grpc_pollset *bind_pollset; + + grpc_status_code cancel_with_status; + grpc_mdstr *cancel_message; +} grpc_transport_op; + +/* Callbacks made from the transport to the upper layers of grpc. */ +struct grpc_transport_callbacks { /* Initialize a new stream on behalf of the transport. Must result in a call to grpc_transport_init_stream(transport, ..., request) in the same call @@ -96,28 +96,6 @@ struct grpc_transport_callbacks { void (*accept_stream)(void *user_data, grpc_transport *transport, const void *server_data); - /* Process a set of stream ops that have been received by the transport. - Called by network threads, so must be careful not to block on network - activity. - - If final_state == GRPC_STREAM_CLOSED, the upper layers should arrange to - call grpc_transport_destroy_stream. - - Ownership of any objects contained in ops is transferred to the callee. - - Arguments: - user_data - the transport user data set at transport creation time - transport - the grpc_transport instance making this call - stream - the stream this data was received for - ops - stream operations that are part of this batch - ops_count - the number of stream operations in this batch - final_state - the state of the stream as of the final operation in this - batch */ - void (*recv_batch)(void *user_data, grpc_transport *transport, - grpc_stream *stream, grpc_stream_op *ops, size_t ops_count, - grpc_stream_state final_state); - - /* The transport received a goaway */ void (*goaway)(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug); @@ -139,7 +117,8 @@ size_t grpc_transport_stream_size(grpc_transport *transport); server_data - either NULL for a client initiated stream, or a pointer supplied from the accept_stream callback function */ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - const void *server_data); + const void *server_data, + grpc_transport_op *initial_op); /* Destroy transport data for a stream. @@ -154,20 +133,17 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream); -/* Enable/disable incoming data for a stream. +void grpc_transport_op_finish_with_failure(grpc_transport_op *op); - This effectively disables new window becoming available for a given stream, - but does not prevent existing window from being consumed by a sender: the - caller must still be prepared to receive some additional data after this - call. +void grpc_transport_op_add_cancellation(grpc_transport_op *op, + grpc_status_code status, + grpc_mdstr *message); - Arguments: - transport - the transport on which to create this stream - stream - the grpc_stream to destroy (memory is still owned by the - caller, but any child memory must be cleaned up) - allow - is it allowed that new window be opened up? */ -void grpc_transport_set_allow_window_updates(grpc_transport *transport, - grpc_stream *stream, int allow); +/* TODO(ctiller): remove this */ +void grpc_transport_add_to_pollset(grpc_transport *transport, + grpc_pollset *pollset); + +char *grpc_transport_op_string(grpc_transport_op *op); /* Send a batch of operations on a transport @@ -177,13 +153,9 @@ void grpc_transport_set_allow_window_updates(grpc_transport *transport, transport - the transport on which to initiate the stream stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. - ops - an array of operations to apply to the stream - can be NULL - if ops_count == 0. - ops_count - the number of elements in ops - is_last - is this the last batch of operations to be sent out */ -void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream, - grpc_stream_op *ops, size_t ops_count, - int is_last); + op - a grpc_transport_op specifying the op to perform */ +void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, + grpc_transport_op *op); /* Send a ping on a transport @@ -193,19 +165,6 @@ void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream, void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data), void *user_data); -/* Abort a stream - - Terminate reading and writing for a stream. A final recv_batch with no - operations and final_state == GRPC_STREAM_CLOSED will be received locally, - and no more data will be presented to the up-layer. - - TODO(ctiller): consider adding a HTTP/2 reason to this function. */ -void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream, - grpc_status_code status); - -void grpc_transport_add_to_pollset(grpc_transport *transport, - grpc_pollset *pollset); - /* Advise peer of pending connection termination. */ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, gpr_slice debug_data); @@ -254,4 +213,4 @@ void grpc_transport_setup_initiate(grpc_transport_setup *setup); used as a destruction call by setup). */ void grpc_transport_setup_cancel(grpc_transport_setup *setup); -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index ac275c7560..479e15338f 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -43,15 +43,11 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_init_stream */ int (*init_stream)(grpc_transport *self, grpc_stream *stream, - const void *server_data); + const void *server_data, grpc_transport_op *initial_op); /* implementation of grpc_transport_send_batch */ - void (*send_batch)(grpc_transport *self, grpc_stream *stream, - grpc_stream_op *ops, size_t ops_count, int is_last); - - /* implementation of grpc_transport_set_allow_window_updates */ - void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream, - int allow); + void (*perform_op)(grpc_transport *self, grpc_stream *stream, + grpc_transport_op *op); /* implementation of grpc_transport_add_to_pollset */ void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset); @@ -59,10 +55,6 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); - /* implementation of grpc_transport_abort_stream */ - void (*abort_stream)(grpc_transport *self, grpc_stream *stream, - grpc_status_code status); - /* implementation of grpc_transport_goaway */ void (*goaway)(grpc_transport *self, grpc_status_code status, gpr_slice debug_data); @@ -84,4 +76,4 @@ struct grpc_transport { const grpc_transport_vtable *vtable; }; -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */ +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */ diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c new file mode 100644 index 0000000000..7bbe8276c3 --- /dev/null +++ b/src/core/transport/transport_op_string.c @@ -0,0 +1,164 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/channel_stack.h" + +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +#include "src/core/support/string.h" +#include <grpc/support/alloc.h> +#include <grpc/support/useful.h> + +/* These routines are here to facilitate debugging - they produce string + representations of various transport data structures */ + +static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { + gpr_strvec_add(b, gpr_strdup("key=")); + gpr_strvec_add( + b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), + GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT)); + + gpr_strvec_add(b, gpr_strdup(" value=")); + gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice), + GPR_SLICE_LENGTH(md->value->slice), + GPR_HEXDUMP_PLAINTEXT)); +} + +static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { + grpc_linked_mdelem *m; + for (m = md.list.head; m != NULL; m = m->next) { + if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", ")); + put_metadata(b, m->md); + } + if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) { + char *tmp; + gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, + md.deadline.tv_nsec); + gpr_strvec_add(b, tmp); + } +} + +char *grpc_sopb_string(grpc_stream_op_buffer *sopb) { + char *out; + char *tmp; + size_t i; + gpr_strvec b; + gpr_strvec_init(&b); + + for (i = 0; i < sopb->nops; i++) { + grpc_stream_op *op = &sopb->ops[i]; + if (i > 0) gpr_strvec_add(&b, gpr_strdup(", ")); + switch (op->type) { + case GRPC_NO_OP: + gpr_strvec_add(&b, gpr_strdup("NO_OP")); + break; + case GRPC_OP_BEGIN_MESSAGE: + gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_SLICE: + gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice)); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_METADATA: + gpr_strvec_add(&b, gpr_strdup("METADATA{")); + put_metadata_list(&b, op->data.metadata); + gpr_strvec_add(&b, gpr_strdup("}")); + break; + } + } + + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); + + return out; +} + +char *grpc_transport_op_string(grpc_transport_op *op) { + char *tmp; + char *out; + int first = 1; + + gpr_strvec b; + gpr_strvec_init(&b); + + if (op->send_ops) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("SEND")); + if (op->is_last_send) { + gpr_strvec_add(&b, gpr_strdup("_LAST")); + } + gpr_strvec_add(&b, gpr_strdup("[")); + gpr_strvec_add(&b, grpc_sopb_string(op->send_ops)); + gpr_strvec_add(&b, gpr_strdup("]")); + } + + if (op->recv_ops) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("RECV")); + } + + if (op->bind_pollset) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("BIND")); + } + + if (op->cancel_with_status != GRPC_STATUS_OK) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); + gpr_strvec_add(&b, tmp); + if (op->cancel_message) { + gpr_asprintf(&tmp, ";msg='%s'", + grpc_mdstr_as_c_string(op->cancel_message)); + gpr_strvec_add(&b, tmp); + } + } + + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); + + return out; +} + +void grpc_call_log_op(char *file, int line, gpr_log_severity severity, + grpc_call_element *elem, grpc_transport_op *op) { + char *str = grpc_transport_op_string(op); + gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); + gpr_free(str); +} |