diff options
author | Craig Tiller <ctiller@google.com> | 2017-04-12 15:16:35 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-04-12 15:16:35 -0700 |
commit | b3d308b9a11496722754f4dc23ebb82a9f0e63d2 (patch) | |
tree | d5c4d27375d08a91fdb71b6e84a2bb4558e28b52 /src | |
parent | c20fa90c6094599540e3a7ef16a5aa381580be33 (diff) | |
parent | 1b76bda4a61a0ed65d5a5de7a6f3363a47871e50 (diff) |
Merge github.com:grpc/grpc into cpparena
Diffstat (limited to 'src')
35 files changed, 1099 insertions, 794 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index e2816b0e04..29ed4bf90e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -44,6 +44,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/frame_data.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" @@ -129,6 +130,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); +static void incoming_byte_stream_publish_error( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, + grpc_error *error); +static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs); static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); @@ -174,6 +180,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); +static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -356,6 +365,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, DEFAULT_WINDOW); push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); + push_setting(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); t->ping_policy = (grpc_chttp2_repeated_ping_policy){ .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA, @@ -486,26 +497,31 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_setting_id setting_id; grpc_integer_options integer_options; bool availability[2] /* server, client */; - } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - {-1, 0, INT32_MAX}, - {true, false}}, - {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, - GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_MAX_METADATA_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - {-1, 16384, 16777215}, - {true, true}}, - {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - {-1, 5, INT32_MAX}, - {true, true}}}; + } settings_map[] = { + {GRPC_ARG_MAX_CONCURRENT_STREAMS, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + {-1, 0, INT32_MAX}, + {true, false}}, + {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, + GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_MAX_METADATA_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + {-1, 16384, 16777215}, + {true, true}}, + {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, + {1, 0, 1}, + {true, true}}, + {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + {-1, 5, INT32_MAX}, + {true, true}}}; for (j = 0; j < (int)GPR_ARRAY_SIZE(settings_map); j++) { if (0 == strcmp(channel_args->args[i].key, settings_map[j].channel_arg_name)) { @@ -648,7 +664,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, /* We reserve one 'active stream' that's dropped when the stream is read-closed. The others are for incoming_byte_streams that are actively reading */ - gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); @@ -658,6 +673,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_schedule_on_exec_ctx); + grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); + grpc_slice_buffer_init(&s->frame_storage); + s->pending_byte_stream = false; + grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s, + grpc_combiner_scheduler(t->combiner, false)); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); @@ -675,7 +695,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_error *error) { - grpc_byte_stream *bs; grpc_chttp2_stream *s = sp; grpc_chttp2_transport *t = s->t; @@ -686,9 +705,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL); } - while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); - } + grpc_slice_buffer_destroy_internal(exec_ctx, + &s->unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -715,6 +734,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer); GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); + GRPC_ERROR_UNREF(s->byte_stream_error); if (s->incoming_window_delta > 0) { GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( @@ -1168,8 +1188,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, s->fetching_send_message = NULL; return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, - &s->fetching_slice, UINT32_MAX, - &s->complete_fetch_locked)) { + UINT32_MAX, &s->complete_fetch_locked)) { + grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, + &s->fetching_slice); add_fetched_slice_locked(exec_ctx, t, s); } } @@ -1180,9 +1201,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_chttp2_stream *s = gs; grpc_chttp2_transport *t = s->t; if (error == GRPC_ERROR_NONE) { - add_fetched_slice_locked(exec_ctx, t, s); - continue_fetching_send_locked(exec_ctx, t, s); - } else { + error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, + &s->fetching_slice); + if (error == GRPC_ERROR_NONE) { + add_fetched_slice_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s); + } + } + + if (error != GRPC_ERROR_NONE) { /* TODO(ctiller): what to do here */ abort(); } @@ -1417,8 +1444,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_ASSERT(s->recv_message_ready == NULL); s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message = op_payload->recv_message.recv_message; - if (s->id != 0 && - (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) { + if (s->id != 0 && s->frame_storage.length == 0) { incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); @@ -1607,13 +1633,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - grpc_byte_stream *bs; if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { if (s->seen_error) { - while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != - NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + if (!s->pending_byte_stream) { + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); } } grpc_chttp2_incoming_metadata_buffer_publish( @@ -1626,39 +1652,65 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - grpc_byte_stream *bs; + grpc_error *error = GRPC_ERROR_NONE; if (s->recv_message_ready != NULL) { - while (s->final_metadata_requested && s->seen_error && - (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != - NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + *s->recv_message = NULL; + if (s->final_metadata_requested && s->seen_error) { + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + if (!s->pending_byte_stream) { + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); + } } - if (s->incoming_frames.head != NULL) { - *s->recv_message = - grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames); - GPR_ASSERT(*s->recv_message != NULL); + if (!s->pending_byte_stream) { + while (s->unprocessed_incoming_frames_buffer.length > 0 || + s->frame_storage.length > 0) { + if (s->unprocessed_incoming_frames_buffer.length == 0) { + grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, + &s->frame_storage); + } + error = deframe_unprocessed_incoming_frames( + exec_ctx, &s->data_parser, s, + &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); + if (error != GRPC_ERROR_NONE) { + s->seen_error = true; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); + break; + } else if (*s->recv_message != NULL) { + break; + } + } + } + if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) { null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { *s->recv_message = NULL; null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } + GRPC_ERROR_UNREF(error); } } void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - grpc_byte_stream *bs; grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); if (s->recv_trailing_metadata_finished != NULL && s->read_closed && s->write_closed) { if (s->seen_error) { - while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != - NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + if (!s->pending_byte_stream) { + grpc_slice_buffer_reset_and_unref_internal( + exec_ctx, &s->unprocessed_incoming_frames_buffer); } } - if (s->all_incoming_byte_streams_finished && + bool pending_data = s->pending_byte_stream || + s->unprocessed_incoming_frames_buffer.length > 0; + if (s->read_closed && s->frame_storage.length == 0 && + (!pending_data || s->seen_error) && s->recv_trailing_metadata_finished != NULL) { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata); @@ -1669,14 +1721,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } } -static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) { - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); - } -} - static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t id, grpc_error *error) { grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id); @@ -1685,10 +1729,19 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); } - if (s->data_parser.parsing_frame != NULL) { - grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error)); - s->data_parser.parsing_frame = NULL; + if (s->pending_byte_stream) { + if (s->on_next != NULL) { + grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame; + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + } + incoming_byte_stream_publish_error(exec_ctx, bs, error); + incoming_byte_stream_unref(exec_ctx, bs); + s->data_parser.parsing_frame = NULL; + } else { + GRPC_ERROR_UNREF(s->byte_stream_error); + s->byte_stream_error = GRPC_ERROR_REF(error); + } } if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { @@ -1874,7 +1927,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE; } } - decrement_active_streams_locked(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -1890,6 +1942,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_slice hdr; grpc_slice status_hdr; grpc_slice http_status_hdr; + grpc_slice content_type_hdr; grpc_slice message_pfx; uint8_t *p; uint32_t len = 0; @@ -1923,6 +1976,42 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = '0'; GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr)); len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr); + + content_type_hdr = grpc_slice_malloc(31); + p = GRPC_SLICE_START_PTR(content_type_hdr); + *p++ = 0x00; + *p++ = 12; + *p++ = 'c'; + *p++ = 'o'; + *p++ = 'n'; + *p++ = 't'; + *p++ = 'e'; + *p++ = 'n'; + *p++ = 't'; + *p++ = '-'; + *p++ = 't'; + *p++ = 'y'; + *p++ = 'p'; + *p++ = 'e'; + *p++ = 16; + *p++ = 'a'; + *p++ = 'p'; + *p++ = 'p'; + *p++ = 'l'; + *p++ = 'i'; + *p++ = 'c'; + *p++ = 'a'; + *p++ = 't'; + *p++ = 'i'; + *p++ = 'o'; + *p++ = 'n'; + *p++ = '/'; + *p++ = 'g'; + *p++ = 'r'; + *p++ = 'p'; + *p++ = 'c'; + GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr)); + len += (uint32_t)GRPC_SLICE_LENGTH(content_type_hdr); } status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10)); @@ -1992,6 +2081,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_slice_buffer_add(&t->qbuf, hdr); if (!s->sent_initial_metadata) { grpc_slice_buffer_add(&t->qbuf, http_status_hdr); + grpc_slice_buffer_add(&t->qbuf, content_type_hdr); } grpc_slice_buffer_add(&t->qbuf, status_hdr); grpc_slice_buffer_add(&t->qbuf, message_pfx); @@ -2374,12 +2464,28 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * BYTE STREAM */ +static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg; + + s->pending_byte_stream = false; + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s); + } else { + GPR_ASSERT(error != GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); + s->on_next = NULL; + GRPC_ERROR_UNREF(s->byte_stream_error); + s->byte_stream_error = GRPC_ERROR_NONE; + grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error)); + s->byte_stream_error = error; + } +} + static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { - GRPC_ERROR_UNREF(bs->error); - grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices); - gpr_mu_destroy(&bs->slice_mu); gpr_free(bs); } } @@ -2439,47 +2545,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t = bs->transport; grpc_chttp2_stream *s = bs->stream; - if (bs->is_tail) { - gpr_mu_lock(&bs->slice_mu); - size_t cur_length = bs->slices.length; - gpr_mu_unlock(&bs->slice_mu); - incoming_byte_stream_update_flow_control( - exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); - } - gpr_mu_lock(&bs->slice_mu); - if (bs->slices.count > 0) { - *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices); - grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); - } else if (bs->error != GRPC_ERROR_NONE) { - grpc_closure_run(exec_ctx, bs->next_action.on_complete, - GRPC_ERROR_REF(bs->error)); + size_t cur_length = s->frame_storage.length; + incoming_byte_stream_update_flow_control( + exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); + + GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); + if (s->frame_storage.length > 0) { + grpc_slice_buffer_swap(&s->frame_storage, + &s->unprocessed_incoming_frames_buffer); + grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); + } else if (s->byte_stream_error != GRPC_ERROR_NONE) { + grpc_closure_sched(exec_ctx, bs->next_action.on_complete, + GRPC_ERROR_REF(s->byte_stream_error)); + if (s->data_parser.parsing_frame != NULL) { + incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); + s->data_parser.parsing_frame = NULL; + } + } else if (s->read_closed) { + if (bs->remaining_bytes != 0) { + s->byte_stream_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + grpc_closure_sched(exec_ctx, bs->next_action.on_complete, + GRPC_ERROR_REF(s->byte_stream_error)); + if (s->data_parser.parsing_frame != NULL) { + incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); + s->data_parser.parsing_frame = NULL; + } + } else { + /* Should never reach here. */ + GPR_ASSERT(false); + } } else { - bs->on_next = bs->next_action.on_complete; - bs->next = bs->next_action.slice; + s->on_next = bs->next_action.on_complete; } - gpr_mu_unlock(&bs->slice_mu); incoming_byte_stream_unref(exec_ctx, bs); } -static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_slice *slice, size_t max_size_hint, - grpc_closure *on_complete) { +static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + size_t max_size_hint, + grpc_closure *on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; - gpr_ref(&bs->refs); - bs->next_action.slice = slice; - bs->next_action.max_size_hint = max_size_hint; - bs->next_action.on_complete = on_complete; - grpc_closure_sched( - exec_ctx, - grpc_closure_init( - &bs->next_action.closure, incoming_byte_stream_next_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner, false)), - GRPC_ERROR_NONE); - GPR_TIMER_END("incoming_byte_stream_next", 0); - return 0; + grpc_chttp2_stream *s = bs->stream; + if (s->unprocessed_incoming_frames_buffer.length > 0) { + return true; + } else { + gpr_ref(&bs->refs); + bs->next_action.max_size_hint = max_size_hint; + bs->next_action.on_complete = on_complete; + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &bs->next_action.closure, incoming_byte_stream_next_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner, false)), + GRPC_ERROR_NONE); + GPR_TIMER_END("incoming_byte_stream_next", 0); + return false; + } +} + +static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice) { + GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0); + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + grpc_chttp2_stream *s = bs->stream; + + if (s->unprocessed_incoming_frames_buffer.length > 0) { + grpc_error *error = deframe_unprocessed_incoming_frames( + exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, + slice, NULL); + if (error != GRPC_ERROR_NONE) { + return error; + } + } else { + grpc_error *error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); + return error; + } + GPR_TIMER_END("incoming_byte_stream_pull", 0); + return GRPC_ERROR_NONE; } static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -2489,9 +2638,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored) { grpc_chttp2_incoming_byte_stream *bs = byte_stream; + grpc_chttp2_stream *s = bs->stream; + grpc_chttp2_transport *t = s->t; + GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); - decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream); incoming_byte_stream_unref(exec_ctx, bs); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -2511,50 +2665,53 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_publish_error( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error) { + grpc_chttp2_stream *s = bs->stream; + GPR_ASSERT(error != GRPC_ERROR_NONE); - grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error)); - bs->on_next = NULL; - GRPC_ERROR_UNREF(bs->error); + grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); + s->on_next = NULL; + GRPC_ERROR_UNREF(s->byte_stream_error); + s->byte_stream_error = GRPC_ERROR_REF(error); grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream, GRPC_ERROR_REF(error)); - bs->error = error; } -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice) { - gpr_mu_lock(&bs->slice_mu); +grpc_error *grpc_chttp2_incoming_byte_stream_push( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, + grpc_slice slice, grpc_slice *slice_out) { + grpc_chttp2_stream *s = bs->stream; + if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { - incoming_byte_stream_publish_error( - exec_ctx, bs, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream")); + grpc_error *error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); + + grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); + grpc_slice_unref_internal(exec_ctx, slice); + return error; } else { bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); - if (bs->on_next != NULL) { - *bs->next = slice; - grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE); - bs->on_next = NULL; - } else { - grpc_slice_buffer_add(&bs->slices, slice); + if (slice_out != NULL) { + *slice_out = slice; } + return GRPC_ERROR_NONE; } - gpr_mu_unlock(&bs->slice_mu); } -void grpc_chttp2_incoming_byte_stream_finished( +grpc_error *grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error) { + grpc_error *error, bool reset_on_error) { + grpc_chttp2_stream *s = bs->stream; + if (error == GRPC_ERROR_NONE) { - gpr_mu_lock(&bs->slice_mu); if (bs->remaining_bytes != 0) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } - gpr_mu_unlock(&bs->slice_mu); } - if (error != GRPC_ERROR_NONE) { - incoming_byte_stream_publish_error(exec_ctx, bs, error); + if (error != GRPC_ERROR_NONE && reset_on_error) { + grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); } incoming_byte_stream_unref(exec_ctx, bs); + return error; } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( @@ -2566,26 +2723,12 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; incoming_byte_stream->base.next = incoming_byte_stream_next; + incoming_byte_stream->base.pull = incoming_byte_stream_pull; incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; - gpr_mu_init(&incoming_byte_stream->slice_mu); gpr_ref_init(&incoming_byte_stream->refs, 2); - incoming_byte_stream->next_message = NULL; incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; - gpr_ref(&incoming_byte_stream->stream->active_streams); - grpc_slice_buffer_init(&incoming_byte_stream->slices); - incoming_byte_stream->on_next = NULL; - incoming_byte_stream->is_tail = 1; - incoming_byte_stream->error = GRPC_ERROR_NONE; - grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames; - if (q->head == NULL) { - q->head = incoming_byte_stream; - } else { - q->tail->is_tail = 0; - q->tail->next_message = incoming_byte_stream; - } - q->tail = incoming_byte_stream; - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + s->byte_stream_error = GRPC_ERROR_NONE; return incoming_byte_stream; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index 6e9258ee7e..5d382d80a8 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -40,6 +40,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport.h" @@ -53,16 +54,17 @@ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) { void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *parser) { if (parser->parsing_frame != NULL) { - grpc_chttp2_incoming_byte_stream_finished( + GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( exec_ctx, parser->parsing_frame, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed")); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false)); } GRPC_ERROR_UNREF(parser->error); } grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, uint8_t flags, - uint32_t stream_id) { + uint32_t stream_id, + grpc_chttp2_stream *s) { if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) { char *msg; gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags); @@ -74,47 +76,14 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, } if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { - parser->is_last_frame = 1; + s->received_last_frame = true; } else { - parser->is_last_frame = 0; + s->received_last_frame = false; } return GRPC_ERROR_NONE; } -void grpc_chttp2_incoming_frame_queue_merge( - grpc_chttp2_incoming_frame_queue *head_dst, - grpc_chttp2_incoming_frame_queue *tail_src) { - if (tail_src->head == NULL) { - return; - } - - if (head_dst->head == NULL) { - *head_dst = *tail_src; - memset(tail_src, 0, sizeof(*tail_src)); - return; - } - - head_dst->tail->next_message = tail_src->head; - head_dst->tail = tail_src->tail; - memset(tail_src, 0, sizeof(*tail_src)); -} - -grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( - grpc_chttp2_incoming_frame_queue *q) { - grpc_byte_stream *out; - if (q->head == NULL) { - return NULL; - } - out = &q->head->base; - if (q->head == q->tail) { - memset(q, 0, sizeof(*q)); - } else { - q->head = q->head->next_message; - } - return out; -} - void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, grpc_transport_one_way_stats *stats, @@ -143,145 +112,217 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, stats->data_bytes += write_bytes; } -static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, - grpc_chttp2_data_parser *p, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_slice slice) { - uint8_t *const beg = GRPC_SLICE_START_PTR(slice); - uint8_t *const end = GRPC_SLICE_END_PTR(slice); - uint8_t *cur = beg; - uint32_t message_flags; - grpc_chttp2_incoming_byte_stream *incoming_byte_stream; - char *msg; +grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *p, + grpc_chttp2_stream *s, + grpc_slice_buffer *slices, + grpc_slice *slice_out, + grpc_byte_stream **stream_out) { + grpc_error *error = GRPC_ERROR_NONE; + grpc_chttp2_transport *t = s->t; - if (cur == end) { - return GRPC_ERROR_NONE; - } + while (slices->count > 0) { + uint8_t *beg = NULL; + uint8_t *end = NULL; + uint8_t *cur = NULL; - switch (p->state) { - case GRPC_CHTTP2_DATA_ERROR: - p->state = GRPC_CHTTP2_DATA_ERROR; - return GRPC_ERROR_REF(p->error); - fh_0: - case GRPC_CHTTP2_DATA_FH_0: - s->stats.incoming.framing_bytes++; - p->frame_type = *cur; - switch (p->frame_type) { - case 0: - p->is_frame_compressed = 0; /* GPR_FALSE */ - break; - case 1: - p->is_frame_compressed = 1; /* GPR_TRUE */ - break; - default: - gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type); - p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, - (intptr_t)s->id); - gpr_free(msg); - msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); - p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, - grpc_slice_from_copied_string(msg)); - gpr_free(msg); - p->error = - grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg); - p->state = GRPC_CHTTP2_DATA_ERROR; - return GRPC_ERROR_REF(p->error); - } - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_1; - return GRPC_ERROR_NONE; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_1: - s->stats.incoming.framing_bytes++; - p->frame_size = ((uint32_t)*cur) << 24; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_2; - return GRPC_ERROR_NONE; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_2: - s->stats.incoming.framing_bytes++; - p->frame_size |= ((uint32_t)*cur) << 16; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_3; - return GRPC_ERROR_NONE; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_3: - s->stats.incoming.framing_bytes++; - p->frame_size |= ((uint32_t)*cur) << 8; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_4; - return GRPC_ERROR_NONE; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_4: - s->stats.incoming.framing_bytes++; - p->frame_size |= ((uint32_t)*cur); - p->state = GRPC_CHTTP2_DATA_FRAME; - ++cur; - message_flags = 0; - if (p->is_frame_compressed) { - message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; - } - p->parsing_frame = incoming_byte_stream = - grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size, - message_flags); - /* fallthrough */ - case GRPC_CHTTP2_DATA_FRAME: - if (cur == end) { - return GRPC_ERROR_NONE; - } - uint32_t remaining = (uint32_t)(end - cur); - if (remaining == p->frame_size) { - s->stats.incoming.data_bytes += p->frame_size; - grpc_chttp2_incoming_byte_stream_push( - exec_ctx, p->parsing_frame, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, - GRPC_ERROR_NONE); - p->parsing_frame = NULL; - p->state = GRPC_CHTTP2_DATA_FH_0; - return GRPC_ERROR_NONE; - } else if (remaining > p->frame_size) { - s->stats.incoming.data_bytes += p->frame_size; - grpc_chttp2_incoming_byte_stream_push( - exec_ctx, p->parsing_frame, - grpc_slice_sub(slice, (size_t)(cur - beg), - (size_t)(cur + p->frame_size - beg))); - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, - GRPC_ERROR_NONE); - p->parsing_frame = NULL; - cur += p->frame_size; - goto fh_0; /* loop */ - } else { - GPR_ASSERT(remaining <= p->frame_size); - grpc_chttp2_incoming_byte_stream_push( - exec_ctx, p->parsing_frame, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - p->frame_size -= remaining; - s->stats.incoming.data_bytes += remaining; + grpc_slice slice = grpc_slice_buffer_take_first(slices); + + beg = GRPC_SLICE_START_PTR(slice); + end = GRPC_SLICE_END_PTR(slice); + cur = beg; + uint32_t message_flags; + char *msg; + + if (cur == end) { + grpc_slice_unref_internal(exec_ctx, slice); + continue; + } + + switch (p->state) { + case GRPC_CHTTP2_DATA_ERROR: + p->state = GRPC_CHTTP2_DATA_ERROR; + grpc_slice_unref_internal(exec_ctx, slice); + return GRPC_ERROR_REF(p->error); + case GRPC_CHTTP2_DATA_FH_0: + p->frame_type = *cur; + switch (p->frame_type) { + case 0: + p->is_frame_compressed = false; /* GPR_FALSE */ + break; + case 1: + p->is_frame_compressed = true; /* GPR_TRUE */ + break; + default: + gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type); + p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, + (intptr_t)s->id); + gpr_free(msg); + msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, + grpc_slice_from_copied_string(msg)); + gpr_free(msg); + p->error = + grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg); + p->state = GRPC_CHTTP2_DATA_ERROR; + grpc_slice_unref_internal(exec_ctx, slice); + return GRPC_ERROR_REF(p->error); + } + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_1; + grpc_slice_unref_internal(exec_ctx, slice); + continue; + } + /* fallthrough */ + case GRPC_CHTTP2_DATA_FH_1: + p->frame_size = ((uint32_t)*cur) << 24; + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_2; + grpc_slice_unref_internal(exec_ctx, slice); + continue; + } + /* fallthrough */ + case GRPC_CHTTP2_DATA_FH_2: + p->frame_size |= ((uint32_t)*cur) << 16; + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_3; + grpc_slice_unref_internal(exec_ctx, slice); + continue; + } + /* fallthrough */ + case GRPC_CHTTP2_DATA_FH_3: + p->frame_size |= ((uint32_t)*cur) << 8; + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_4; + grpc_slice_unref_internal(exec_ctx, slice); + continue; + } + /* fallthrough */ + case GRPC_CHTTP2_DATA_FH_4: + GPR_ASSERT(stream_out != NULL); + GPR_ASSERT(p->parsing_frame == NULL); + p->frame_size |= ((uint32_t)*cur); + p->state = GRPC_CHTTP2_DATA_FRAME; + ++cur; + message_flags = 0; + if (p->is_frame_compressed) { + message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } + p->parsing_frame = grpc_chttp2_incoming_byte_stream_create( + exec_ctx, t, s, p->frame_size, message_flags); + *stream_out = &p->parsing_frame->base; + if (p->parsing_frame->remaining_bytes == 0) { + GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true)); + p->parsing_frame = NULL; + p->state = GRPC_CHTTP2_DATA_FH_0; + } + s->pending_byte_stream = true; + + if (cur != end) { + grpc_slice_buffer_undo_take_first( + &s->unprocessed_incoming_frames_buffer, + grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + } + grpc_slice_unref_internal(exec_ctx, slice); return GRPC_ERROR_NONE; + case GRPC_CHTTP2_DATA_FRAME: { + GPR_ASSERT(p->parsing_frame != NULL); + GPR_ASSERT(slice_out != NULL); + if (cur == end) { + grpc_slice_unref_internal(exec_ctx, slice); + continue; + } + uint32_t remaining = (uint32_t)(end - cur); + if (remaining == p->frame_size) { + if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, + grpc_slice_sub(slice, (size_t)(cur - beg), + (size_t)(end - beg)), + slice_out))) { + grpc_slice_unref_internal(exec_ctx, slice); + return error; + } + if (GRPC_ERROR_NONE != + (error = grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) { + grpc_slice_unref_internal(exec_ctx, slice); + return error; + } + p->parsing_frame = NULL; + p->state = GRPC_CHTTP2_DATA_FH_0; + grpc_slice_unref_internal(exec_ctx, slice); + return GRPC_ERROR_NONE; + } else if (remaining < p->frame_size) { + if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, + grpc_slice_sub(slice, (size_t)(cur - beg), + (size_t)(end - beg)), + slice_out))) { + return error; + } + p->frame_size -= remaining; + grpc_slice_unref_internal(exec_ctx, slice); + return GRPC_ERROR_NONE; + } else { + GPR_ASSERT(remaining > p->frame_size); + if (GRPC_ERROR_NONE != + (grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, + grpc_slice_sub(slice, (size_t)(cur - beg), + (size_t)(cur + p->frame_size - beg)), + slice_out))) { + grpc_slice_unref_internal(exec_ctx, slice); + return error; + } + if (GRPC_ERROR_NONE != + (error = grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) { + grpc_slice_unref_internal(exec_ctx, slice); + return error; + } + p->parsing_frame = NULL; + p->state = GRPC_CHTTP2_DATA_FH_0; + cur += p->frame_size; + grpc_slice_buffer_undo_take_first( + &s->unprocessed_incoming_frames_buffer, + grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + grpc_slice_unref_internal(exec_ctx, slice); + return GRPC_ERROR_NONE; + } } + } } - GPR_UNREACHABLE_CODE( - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here")); + return GRPC_ERROR_NONE; } grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice slice, int is_last) { - grpc_chttp2_data_parser *p = parser; - grpc_error *error = parse_inner(exec_ctx, p, t, s, slice); + /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */ + s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice); + if (!s->pending_byte_stream) { + grpc_slice_ref_internal(slice); + grpc_slice_buffer_add(&s->frame_storage, slice); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + } else if (s->on_next) { + GPR_ASSERT(s->frame_storage.length == 0); + grpc_slice_ref_internal(slice); + grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); + grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE); + s->on_next = NULL; + } else { + grpc_slice_ref_internal(slice); + grpc_slice_buffer_add(&s->frame_storage, slice); + } - if (is_last && p->is_last_frame) { + if (is_last && s->received_last_frame) { grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, GRPC_ERROR_NONE); } - return error; + return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index 264ad14608..2fb8983c38 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -56,28 +56,16 @@ typedef enum { typedef struct grpc_chttp2_incoming_byte_stream grpc_chttp2_incoming_byte_stream; -typedef struct grpc_chttp2_incoming_frame_queue { - grpc_chttp2_incoming_byte_stream *head; - grpc_chttp2_incoming_byte_stream *tail; -} grpc_chttp2_incoming_frame_queue; - typedef struct { grpc_chttp2_stream_state state; - uint8_t is_last_frame; uint8_t frame_type; uint32_t frame_size; grpc_error *error; - int is_frame_compressed; + bool is_frame_compressed; grpc_chttp2_incoming_byte_stream *parsing_frame; } grpc_chttp2_data_parser; -void grpc_chttp2_incoming_frame_queue_merge( - grpc_chttp2_incoming_frame_queue *head_dst, - grpc_chttp2_incoming_frame_queue *tail_src); -grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( - grpc_chttp2_incoming_frame_queue *q); - /* initialize per-stream state for data frame parsing */ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser); @@ -87,7 +75,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, /* start processing a new data frame */ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, uint8_t flags, - uint32_t stream_id); + uint32_t stream_id, + grpc_chttp2_stream *s); /* handle a slice of a data frame - is_last indicates the last slice of a frame */ @@ -101,4 +90,11 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, grpc_transport_one_way_stats *stats, grpc_slice_buffer *outbuf); +grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *p, + grpc_chttp2_stream *s, + grpc_slice_buffer *slices, + grpc_slice *slice_out, + grpc_byte_stream **stream_out); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 16881c0707..4f2b827832 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -46,29 +46,6 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/http2_errors.h" -#define MAX_MAX_HEADER_LIST_SIZE (1024 * 1024 * 1024) - -/* HTTP/2 mandated initial connection settings */ -const grpc_chttp2_setting_parameters - grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS] = { - {NULL, 0, 0, 0, GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, - GRPC_HTTP2_PROTOCOL_ERROR}, - {"HEADER_TABLE_SIZE", 4096, 0, 0xffffffff, - GRPC_CHTTP2_CLAMP_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR}, - {"ENABLE_PUSH", 1, 0, 1, GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, - GRPC_HTTP2_PROTOCOL_ERROR}, - {"MAX_CONCURRENT_STREAMS", 0xffffffffu, 0, 0xffffffffu, - GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR}, - {"INITIAL_WINDOW_SIZE", 65535, 0, 0x7fffffffu, - GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, - GRPC_HTTP2_FLOW_CONTROL_ERROR}, - {"MAX_FRAME_SIZE", 16384, 16384, 16777215, - GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR}, - {"MAX_HEADER_LIST_SIZE", MAX_MAX_HEADER_LIST_SIZE, 0, - MAX_MAX_HEADER_LIST_SIZE, GRPC_CHTTP2_CLAMP_INVALID_VALUE, - GRPC_HTTP2_PROTOCOL_ERROR}, -}; - static uint8_t *fill_header(uint8_t *out, uint32_t length, uint8_t flags) { *out++ = (uint8_t)(length >> 16); *out++ = (uint8_t)(length >> 8); @@ -98,9 +75,8 @@ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, for (i = 0; i < count; i++) { if (new[i] != old[i] || (force_mask & (1u << i)) != 0) { - GPR_ASSERT(i); - *p++ = (uint8_t)(i >> 8); - *p++ = (uint8_t)(i); + *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i] >> 8); + *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i]); *p++ = (uint8_t)(new[i] >> 24); *p++ = (uint8_t)(new[i] >> 16); *p++ = (uint8_t)(new[i] >> 8); @@ -154,6 +130,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, const uint8_t *cur = GRPC_SLICE_START_PTR(slice); const uint8_t *end = GRPC_SLICE_END_PTR(slice); char *msg; + grpc_chttp2_setting_id id; if (parser->is_ack) { return GRPC_ERROR_NONE; @@ -216,9 +193,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, parser->value |= *cur; cur++; - if (parser->id > 0 && parser->id < GRPC_CHTTP2_NUM_SETTINGS) { + if (grpc_wire_id_to_setting_id(parser->id, &id)) { const grpc_chttp2_setting_parameters *sp = - &grpc_chttp2_settings_parameters[parser->id]; + &grpc_chttp2_settings_parameters[id]; if (parser->value < sp->min_value || parser->value > sp->max_value) { switch (sp->invalid_value_behavior) { case GRPC_CHTTP2_CLAMP_INVALID_VALUE: @@ -237,19 +214,19 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, return err; } } - if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && - parser->incoming_settings[parser->id] != parser->value) { + if (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && + parser->incoming_settings[id] != parser->value) { t->initial_window_update += - (int64_t)parser->value - parser->incoming_settings[parser->id]; + (int64_t)parser->value - parser->incoming_settings[id]; if (grpc_http_trace) { gpr_log(GPR_DEBUG, "adding %d for initial_window change", (int)t->initial_window_update); } } - parser->incoming_settings[parser->id] = parser->value; + parser->incoming_settings[id] = parser->value; if (grpc_http_trace) { - gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %d = %d", - t->is_client ? "CLI" : "SVR", t->peer_string, parser->id, + gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %s = %d", + t->is_client ? "CLI" : "SVR", t->peer_string, sp->name, parser->value); } } else if (grpc_http_trace) { diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h index 44137798c0..2a85d0dba7 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.h +++ b/src/core/ext/transport/chttp2/transport/frame_settings.h @@ -37,6 +37,7 @@ #include <grpc/slice.h> #include <grpc/support/port_platform.h> #include "src/core/ext/transport/chttp2/transport/frame.h" +#include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/lib/iomgr/exec_ctx.h" typedef enum { @@ -48,17 +49,6 @@ typedef enum { GRPC_CHTTP2_SPS_VAL3 } grpc_chttp2_settings_parse_state; -/* The things HTTP/2 defines as connection level settings */ -typedef enum { - GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE = 1, - GRPC_CHTTP2_SETTINGS_ENABLE_PUSH = 2, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS = 3, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE = 4, - GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE = 5, - GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE = 6, - GRPC_CHTTP2_NUM_SETTINGS -} grpc_chttp2_setting_id; - typedef struct { grpc_chttp2_settings_parse_state state; uint32_t *target_settings; @@ -68,24 +58,6 @@ typedef struct { uint32_t incoming_settings[GRPC_CHTTP2_NUM_SETTINGS]; } grpc_chttp2_settings_parser; -typedef enum { - GRPC_CHTTP2_CLAMP_INVALID_VALUE, - GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE -} grpc_chttp2_invalid_value_behavior; - -typedef struct { - const char *name; - uint32_t default_value; - uint32_t min_value; - uint32_t max_value; - grpc_chttp2_invalid_value_behavior invalid_value_behavior; - uint32_t error_value; -} grpc_chttp2_setting_parameters; - -/* HTTP/2 mandated connection setting parameters */ -extern const grpc_chttp2_setting_parameters - grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS]; - /* Create a settings frame by diffing old & new, and updating old to be new */ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *newval, uint32_t force_mask, size_t count); diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 84586cd998..b1bc677a7a 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -86,6 +86,7 @@ typedef struct { grpc_transport_one_way_stats *stats; /* maximum size of a frame */ size_t max_frame_size; + bool use_true_binary_metadata; } framer_state; /* fills p (which is expected to be 9 bytes long) with a data frame header */ @@ -290,86 +291,113 @@ static void emit_indexed(grpc_chttp2_hpack_compressor *c, uint32_t elem_index, len); } -static grpc_slice get_wire_value(grpc_mdelem elem, uint8_t *huffman_prefix) { +typedef struct { + grpc_slice data; + uint8_t huffman_prefix; + bool insert_null_before_wire_value; +} wire_value; + +static wire_value get_wire_value(grpc_mdelem elem, bool true_binary_enabled) { if (grpc_is_binary_header(GRPC_MDKEY(elem))) { - *huffman_prefix = 0x80; - return grpc_chttp2_base64_encode_and_huffman_compress(GRPC_MDVALUE(elem)); + if (true_binary_enabled) { + return (wire_value){ + .huffman_prefix = 0x00, + .insert_null_before_wire_value = true, + .data = grpc_slice_ref_internal(GRPC_MDVALUE(elem)), + }; + } else { + return (wire_value){ + .huffman_prefix = 0x80, + .insert_null_before_wire_value = false, + .data = grpc_chttp2_base64_encode_and_huffman_compress( + GRPC_MDVALUE(elem)), + }; + } + } else { + /* TODO(ctiller): opportunistically compress non-binary headers */ + return (wire_value){ + .huffman_prefix = 0x00, + .insert_null_before_wire_value = false, + .data = grpc_slice_ref_internal(GRPC_MDVALUE(elem)), + }; } - /* TODO(ctiller): opportunistically compress non-binary headers */ - *huffman_prefix = 0x00; - return grpc_slice_ref_internal(GRPC_MDVALUE(elem)); +} + +static size_t wire_value_length(wire_value v) { + return GPR_SLICE_LENGTH(v.data) + v.insert_null_before_wire_value; +} + +static void add_wire_value(framer_state *st, wire_value v) { + if (v.insert_null_before_wire_value) *add_tiny_header_data(st, 1) = 0; + add_header_data(st, v.data); } static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, uint32_t key_index, grpc_mdelem elem, framer_state *st) { uint32_t len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 2); - uint8_t huffman_prefix; - grpc_slice value_slice = get_wire_value(elem, &huffman_prefix); - size_t len_val = GRPC_SLICE_LENGTH(value_slice); + wire_value value = get_wire_value(elem, st->use_true_binary_metadata); + size_t len_val = wire_value_length(value); uint32_t len_val_len; GPR_ASSERT(len_val <= UINT32_MAX); len_val_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)len_val, 1); GRPC_CHTTP2_WRITE_VARINT(key_index, 2, 0x40, add_tiny_header_data(st, len_pfx), len_pfx); - GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix, + GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, value.huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, value_slice); + add_wire_value(st, value); } static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, uint32_t key_index, grpc_mdelem elem, framer_state *st) { uint32_t len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 4); - uint8_t huffman_prefix; - grpc_slice value_slice = get_wire_value(elem, &huffman_prefix); - size_t len_val = GRPC_SLICE_LENGTH(value_slice); + wire_value value = get_wire_value(elem, st->use_true_binary_metadata); + size_t len_val = wire_value_length(value); uint32_t len_val_len; GPR_ASSERT(len_val <= UINT32_MAX); len_val_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)len_val, 1); GRPC_CHTTP2_WRITE_VARINT(key_index, 4, 0x00, add_tiny_header_data(st, len_pfx), len_pfx); - GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix, + GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, value.huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, value_slice); + add_wire_value(st, value); } static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, grpc_mdelem elem, framer_state *st) { uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); - uint8_t huffman_prefix; - grpc_slice value_slice = get_wire_value(elem, &huffman_prefix); - uint32_t len_val = (uint32_t)GRPC_SLICE_LENGTH(value_slice); + wire_value value = get_wire_value(elem, st->use_true_binary_metadata); + uint32_t len_val = (uint32_t)wire_value_length(value); uint32_t len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); uint32_t len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); GPR_ASSERT(len_key <= UINT32_MAX); - GPR_ASSERT(GRPC_SLICE_LENGTH(value_slice) <= UINT32_MAX); + GPR_ASSERT(wire_value_length(value) <= UINT32_MAX); *add_tiny_header_data(st, 1) = 0x40; GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, add_tiny_header_data(st, len_key_len), len_key_len); add_header_data(st, grpc_slice_ref_internal(GRPC_MDKEY(elem))); - GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix, + GRPC_CHTTP2_WRITE_VARINT(len_val, 1, value.huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, value_slice); + add_wire_value(st, value); } static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c, grpc_mdelem elem, framer_state *st) { uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); - uint8_t huffman_prefix; - grpc_slice value_slice = get_wire_value(elem, &huffman_prefix); - uint32_t len_val = (uint32_t)GRPC_SLICE_LENGTH(value_slice); + wire_value value = get_wire_value(elem, st->use_true_binary_metadata); + uint32_t len_val = (uint32_t)wire_value_length(value); uint32_t len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); uint32_t len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); GPR_ASSERT(len_key <= UINT32_MAX); - GPR_ASSERT(GRPC_SLICE_LENGTH(value_slice) <= UINT32_MAX); + GPR_ASSERT(wire_value_length(value) <= UINT32_MAX); *add_tiny_header_data(st, 1) = 0x00; GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, add_tiny_header_data(st, len_key_len), len_key_len); add_header_data(st, grpc_slice_ref_internal(GRPC_MDKEY(elem))); - GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix, + GRPC_CHTTP2_WRITE_VARINT(len_val, 1, value.huffman_prefix, add_tiny_header_data(st, len_val_len), len_val_len); - add_header_data(st, value_slice); + add_wire_value(st, value); } static void emit_advertise_table_size_change(grpc_chttp2_hpack_compressor *c, @@ -595,23 +623,22 @@ void grpc_chttp2_hpack_compressor_set_max_table_size( void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, - uint32_t stream_id, - grpc_metadata_batch *metadata, int is_eof, - size_t max_frame_size, - grpc_transport_one_way_stats *stats, + grpc_metadata_batch *metadata, + const grpc_encode_header_options *options, grpc_slice_buffer *outbuf) { framer_state st; grpc_linked_mdelem *l; gpr_timespec deadline; - GPR_ASSERT(stream_id != 0); + GPR_ASSERT(options->stream_id != 0); st.seen_regular_header = 0; - st.stream_id = stream_id; + st.stream_id = options->stream_id; st.output = outbuf; st.is_first_frame = 1; - st.stats = stats; - st.max_frame_size = max_frame_size; + st.stats = options->stats; + st.max_frame_size = options->max_frame_size; + st.use_true_binary_metadata = options->use_true_binary_metadata; /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata @@ -630,5 +657,5 @@ void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx, deadline_enc(exec_ctx, c, deadline, &st); } - finish_frame(&st, 1, is_eof); + finish_frame(&st, 1, options->is_eof); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h index 83ba5b1b3e..6ce3209604 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h @@ -90,11 +90,18 @@ void grpc_chttp2_hpack_compressor_set_max_table_size( void grpc_chttp2_hpack_compressor_set_max_usable_size( grpc_chttp2_hpack_compressor *c, uint32_t max_table_size); +typedef struct { + uint32_t stream_id; + bool is_eof; + bool use_true_binary_metadata; + size_t max_frame_size; + grpc_transport_one_way_stats *stats; +} grpc_encode_header_options; + void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx, - grpc_chttp2_hpack_compressor *c, uint32_t id, - grpc_metadata_batch *metadata, int is_eof, - size_t max_frame_size, - grpc_transport_one_way_stats *stats, + grpc_chttp2_hpack_compressor *c, + grpc_metadata_batch *metadata, + const grpc_encode_header_options *options, grpc_slice_buffer *outbuf); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_ENCODER_H */ diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 5099d736bf..1846a85fc6 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -38,11 +38,6 @@ #include <stddef.h> #include <string.h> -/* This is here for grpc_is_binary_header - * TODO(murgatroid99): Remove this - */ -#include <grpc/grpc.h> - #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> @@ -55,13 +50,11 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/http2_errors.h" -/* TODO(ctiller): remove before submission */ -#include "src/core/lib/slice/slice_string_helpers.h" - extern int grpc_http_trace; typedef enum { NOT_BINARY, + BINARY_BEGIN, B64_BYTE0, B64_BYTE1, B64_BYTE2, @@ -1325,6 +1318,19 @@ static grpc_error *append_string(grpc_exec_ctx *exec_ctx, case NOT_BINARY: append_bytes(str, cur, (size_t)(end - cur)); return GRPC_ERROR_NONE; + case BINARY_BEGIN: + if (cur == end) { + p->binary = BINARY_BEGIN; + return GRPC_ERROR_NONE; + } + if (*cur == 0) { + /* 'true-binary' case */ + ++cur; + p->binary = NOT_BINARY; + append_bytes(str, cur, (size_t)(end - cur)); + return GRPC_ERROR_NONE; + } + /* fallthrough */ b64_byte0: case B64_BYTE0: if (cur == end) { @@ -1409,6 +1415,8 @@ static grpc_error *finish_str(grpc_exec_ctx *exec_ctx, switch ((binary_state)p->binary) { case NOT_BINARY: break; + case BINARY_BEGIN: + break; case B64_BYTE0: break; case B64_BYTE1: @@ -1571,7 +1579,7 @@ static grpc_error *parse_value_string(grpc_exec_ctx *exec_ctx, const uint8_t *cur, const uint8_t *end, bool is_binary) { return begin_parse_string(exec_ctx, p, cur, end, - is_binary ? B64_BYTE0 : NOT_BINARY, &p->value); + is_binary ? BINARY_BEGIN : NOT_BINARY, &p->value); } static grpc_error *parse_value_string_with_indexed_key( diff --git a/src/core/ext/transport/chttp2/transport/http2_settings.c b/src/core/ext/transport/chttp2/transport/http2_settings.c new file mode 100644 index 0000000000..d4905107ef --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/http2_settings.c @@ -0,0 +1,75 @@ +/* + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Automatically generated by tools/codegen/core/gen_settings_ids.py + */ + +#include "src/core/ext/transport/chttp2/transport/http2_settings.h" + +#include <grpc/support/useful.h> +#include "src/core/lib/transport/http2_errors.h" + +const uint16_t grpc_setting_id_to_wire_id[] = {1, 2, 3, 4, 5, 6, 65027}; + +bool grpc_wire_id_to_setting_id(uint32_t wire_id, grpc_chttp2_setting_id *out) { + uint32_t i = wire_id - 1; + uint32_t x = i % 256; + uint32_t y = i / 256; + uint32_t h = x; + switch (y) { + case 254: + h += 4; + break; + } + *out = (grpc_chttp2_setting_id)h; + return h < GPR_ARRAY_SIZE(grpc_setting_id_to_wire_id) && + grpc_setting_id_to_wire_id[h] == wire_id; +} + +const grpc_chttp2_setting_parameters + grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS] = { + {"HEADER_TABLE_SIZE", 4096u, 0u, 4294967295u, + GRPC_CHTTP2_CLAMP_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR}, + {"ENABLE_PUSH", 1u, 0u, 1u, GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, + GRPC_HTTP2_PROTOCOL_ERROR}, + {"MAX_CONCURRENT_STREAMS", 4294967295u, 0u, 4294967295u, + GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR}, + {"INITIAL_WINDOW_SIZE", 65535u, 0u, 2147483647u, + GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, + GRPC_HTTP2_FLOW_CONTROL_ERROR}, + {"MAX_FRAME_SIZE", 16384u, 16384u, 16777215u, + GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR}, + {"MAX_HEADER_LIST_SIZE", 16777216u, 0u, 16777216u, + GRPC_CHTTP2_CLAMP_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR}, + {"GRPC_ALLOW_TRUE_BINARY_METADATA", 0u, 0u, 1u, + GRPC_CHTTP2_CLAMP_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR}, +}; diff --git a/src/core/ext/transport/chttp2/transport/http2_settings.h b/src/core/ext/transport/chttp2/transport/http2_settings.h new file mode 100644 index 0000000000..9781cdc989 --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/http2_settings.h @@ -0,0 +1,74 @@ +/* + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Automatically generated by tools/codegen/core/gen_settings_ids.py + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_SETTINGS_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_SETTINGS_H + +#include <stdbool.h> +#include <stdint.h> + +typedef enum { + GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE = 0, /* wire id 1 */ + GRPC_CHTTP2_SETTINGS_ENABLE_PUSH = 1, /* wire id 2 */ + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS = 2, /* wire id 3 */ + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE = 3, /* wire id 4 */ + GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE = 4, /* wire id 5 */ + GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE = 5, /* wire id 6 */ + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA = 6, /* wire id 65027 */ +} grpc_chttp2_setting_id; + +#define GRPC_CHTTP2_NUM_SETTINGS 7 +extern const uint16_t grpc_setting_id_to_wire_id[]; + +bool grpc_wire_id_to_setting_id(uint32_t wire_id, grpc_chttp2_setting_id *out); + +typedef enum { + GRPC_CHTTP2_CLAMP_INVALID_VALUE, + GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE +} grpc_chttp2_invalid_value_behavior; + +typedef struct { + const char *name; + uint32_t default_value; + uint32_t min_value; + uint32_t max_value; + grpc_chttp2_invalid_value_behavior invalid_value_behavior; + uint32_t error_value; +} grpc_chttp2_setting_parameters; + +extern const grpc_chttp2_setting_parameters + grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS]; + +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_SETTINGS_H */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6eb848b8d7..a10e3886ea 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -195,22 +195,20 @@ typedef struct grpc_chttp2_write_cb { struct grpc_chttp2_incoming_byte_stream { grpc_byte_stream base; gpr_refcount refs; - struct grpc_chttp2_incoming_byte_stream *next_message; - grpc_error *error; - grpc_chttp2_transport *transport; - grpc_chttp2_stream *stream; - bool is_tail; + grpc_chttp2_transport *transport; /* immutable */ + grpc_chttp2_stream *stream; /* immutable */ - gpr_mu slice_mu; // protects slices, on_next - grpc_slice_buffer slices; - grpc_closure *on_next; - grpc_slice *next; + /* Accessed only by transport thread when stream->pending_byte_stream == false + * Accessed only by application thread when stream->pending_byte_stream == + * true */ uint32_t remaining_bytes; + /* Accessed only by transport thread when stream->pending_byte_stream == false + * Accessed only by application thread when stream->pending_byte_stream == + * true */ struct { grpc_closure closure; - grpc_slice *slice; size_t max_size_hint; grpc_closure *on_complete; } next_action; @@ -445,8 +443,8 @@ struct grpc_chttp2_stream { uint32_t id; /** window available for us to send to peer, over or under the initial window - * size of the transport... ie: - * outgoing_window = outgoing_window_delta + transport.initial_window_size */ + * size of the transport... ie: + * outgoing_window = outgoing_window_delta + transport.initial_window_size */ int64_t outgoing_window_delta; /** things the upper layers would like to send */ grpc_metadata_batch *send_initial_metadata; @@ -473,9 +471,6 @@ struct grpc_chttp2_stream { grpc_transport_stream_stats *collecting_stats; grpc_transport_stream_stats stats; - /** number of streams that are currently being read */ - gpr_refcount active_streams; - /** Is this stream closed for writing. */ bool write_closed; /** Is this stream reading half-closed. */ @@ -499,7 +494,17 @@ struct grpc_chttp2_stream { grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; - grpc_chttp2_incoming_frame_queue incoming_frames; + grpc_slice_buffer frame_storage; /* protected by t combiner */ + + /* Accessed only by transport thread when stream->pending_byte_stream == false + * Accessed only by application thread when stream->pending_byte_stream == + * true */ + grpc_slice_buffer unprocessed_incoming_frames_buffer; + grpc_closure *on_next; /* protected by t combiner */ + bool pending_byte_stream; /* protected by t combiner */ + grpc_closure reset_byte_stream; + grpc_error *byte_stream_error; /* protected by t combiner */ + bool received_last_frame; /* protected by t combiner */ gpr_timespec deadline; @@ -512,6 +517,9 @@ struct grpc_chttp2_stream { * incoming_window = incoming_window_delta + transport.initial_window_size */ int64_t incoming_window_delta; /** parsing state for data frames */ + /* Accessed only by transport thread when stream->pending_byte_stream == false + * Accessed only by application thread when stream->pending_byte_stream == + * true */ grpc_chttp2_data_parser data_parser; /** number of bytes received - reset at end of parse thread execution */ int64_t received_bytes; @@ -790,10 +798,13 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t); grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags); -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice); -void grpc_chttp2_incoming_byte_stream_finished( +grpc_error *grpc_chttp2_incoming_byte_stream_push( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, + grpc_slice slice, grpc_slice *slice_out); +grpc_error *grpc_chttp2_incoming_byte_stream_finished( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, + grpc_error *error, bool reset_on_error); +void grpc_chttp2_incoming_byte_stream_notify( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 7e457ced27..638b137316 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -458,12 +458,13 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, return init_skip_frame_parser(exec_ctx, t, 0); } if (err == GRPC_ERROR_NONE) { - err = grpc_chttp2_data_parser_begin_frame(&s->data_parser, - t->incoming_frame_flags, s->id); + err = grpc_chttp2_data_parser_begin_frame( + &s->data_parser, t->incoming_frame_flags, s->id, s); } error_handler: if (err == GRPC_ERROR_NONE) { t->incoming_stream = s; + /* t->parser = grpc_chttp2_data_parser_parse;*/ t->parser = grpc_chttp2_data_parser_parse; t->parser_data = &s->data_parser; return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index be41b3d186..ae9df175ff 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -219,10 +219,18 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, /* send initial metadata if it's available */ if (!sent_initial_metadata && s->send_initial_metadata) { - grpc_chttp2_encode_header( - exec_ctx, &t->hpack_compressor, s->id, s->send_initial_metadata, 0, - t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - &s->stats.outgoing, &t->outbuf); + grpc_encode_header_options hopt = { + .stream_id = s->id, + .is_eof = false, + .use_true_binary_metadata = + t->settings + [GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != 0, + .max_frame_size = t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + .stats = &s->stats.outgoing}; + grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, + s->send_initial_metadata, &hopt, &t->outbuf); s->send_initial_metadata = NULL; s->sent_initial_metadata = true; sent_initial_metadata = true; @@ -315,11 +323,21 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, &s->stats.outgoing, &t->outbuf); } else { - grpc_chttp2_encode_header( - exec_ctx, &t->hpack_compressor, s->id, s->send_trailing_metadata, - true, t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - &s->stats.outgoing, &t->outbuf); + grpc_encode_header_options hopt = { + .stream_id = s->id, + .is_eof = true, + .use_true_binary_metadata = + t->settings + [GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != + 0, + .max_frame_size = + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + .stats = &s->stats.outgoing}; + grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, + s->send_trailing_metadata, &hopt, + &t->outbuf); } s->send_trailing_metadata = NULL; s->sent_trailing_metadata = true; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 0b9189558f..7896c70f9e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -973,9 +973,20 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer write_slice_buffer; grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); - grpc_byte_stream_next( - NULL, stream_op->payload->send_message.send_message, &slice, - stream_op->payload->send_message.send_message->length, NULL); + if (1 != grpc_byte_stream_next( + exec_ctx, stream_op->payload->send_message.send_message, + stream_op->payload->send_message.send_message->length, + NULL)) { + /* Should never reach here */ + GPR_ASSERT(false); + } + if (GRPC_ERROR_NONE != + grpc_byte_stream_pull(exec_ctx, + stream_op->payload->send_message.send_message, + &slice)) { + /* Should never reach here */ + GPR_ASSERT(false); + } grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ @@ -1124,7 +1135,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (stream_state->rs.compressed) { stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - *((grpc_byte_buffer **)stream_op->recv_message) = + *((grpc_byte_buffer **) + stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; grpc_closure_sched( exec_ctx, stream_op->payload->recv_message.recv_message_ready, diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 4625cba0d2..764524b24d 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -221,6 +221,13 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; + if (GRPC_ERROR_NONE != + grpc_byte_stream_pull(exec_ctx, + calld->send_op->payload->send_message.send_message, + &calld->incoming_slice)) { + /* Should never reach here */ + abort(); + } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); @@ -233,8 +240,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, - &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { + exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, + &calld->got_slice)) { + grpc_byte_stream_pull(exec_ctx, + calld->send_op->payload->send_message.send_message, + &calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 4e47c5c658..151fb9885d 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -220,8 +220,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; uint8_t *wrptr = calld->payload_bytes; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, - &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { + exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, + &calld->got_slice)) { + grpc_byte_stream_pull(exec_ctx, + calld->send_op->payload->send_message.send_message, + &calld->incoming_slice); memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), GRPC_SLICE_LENGTH(calld->incoming_slice)); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); @@ -237,6 +240,13 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; calld->send_message_blocked = false; + if (GRPC_ERROR_NONE != + grpc_byte_stream_pull(exec_ctx, + calld->send_op->payload->send_message.send_message, + &calld->incoming_slice)) { + /* Should never reach here */ + abort(); + } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { /* Pass down the original send_message op that was blocked.*/ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 57726c8476..c12fabb37c 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -218,14 +218,14 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { const grpc_integer_options options = { - GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX}; + GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, -1, INT_MAX}; chand->max_send_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { const grpc_integer_options options = { - GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX}; + GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, -1, INT_MAX}; chand->max_recv_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 102d1aa290..6b468764fc 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -69,8 +69,9 @@ static int retry_named_port_failure(int status, request *r, int retry_status; uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; + r->port = svc[i][1]; retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb, - r->host, svc[i][1], r->hints); + r->host, r->port, r->hints); if (retry_status < 0 || getaddrinfo_cb == NULL) { // The callback will not be called gpr_free(req); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 6a8ae03a21..ef159a4ca4 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1197,6 +1197,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, batch_control *bctl) { + grpc_error *error; grpc_call *call = bctl->call; for (;;) { size_t remaining = call->receiving_stream->length - @@ -1208,11 +1209,22 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, finish_batch_step(exec_ctx, bctl); return; } - if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, - &call->receiving_slice, remaining, + if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, &call->receiving_slice_ready)) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); + error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream, + &call->receiving_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + } else { + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + grpc_byte_buffer_destroy(*call->receiving_buffer); + *call->receiving_buffer = NULL; + call->receiving_message = 0; + finish_batch_step(exec_ctx, bctl); + return; + } } else { return; } @@ -1223,12 +1235,24 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; + grpc_byte_stream *bs = call->receiving_stream; + bool release_error = false; if (error == GRPC_ERROR_NONE) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); - continue_receiving_slices(exec_ctx, bctl); - } else { + grpc_slice slice; + error = grpc_byte_stream_pull(exec_ctx, bs, &slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + slice); + continue_receiving_slices(exec_ctx, bctl); + } else { + /* Error returned by grpc_byte_stream_pull needs to be released manually + */ + release_error = true; + } + } + + if (error != GRPC_ERROR_NONE) { if (grpc_trace_operation_failures) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } @@ -1236,7 +1260,11 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; + call->receiving_message = 0; finish_batch_step(exec_ctx, bctl); + if (release_error) { + GRPC_ERROR_UNREF(error); + } } } diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index 4d4206189e..5800c70ef4 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -40,10 +40,15 @@ #include "src/core/lib/slice/slice_internal.h" int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, grpc_slice *slice, - size_t max_size_hint, grpc_closure *on_complete) { - return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint, - on_complete); + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete) { + return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete); +} + +grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice) { + return byte_stream->pull(exec_ctx, byte_stream, slice); } void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -53,16 +58,24 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, /* slice_buffer_stream */ -static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_slice *slice, size_t max_size_hint, - grpc_closure *on_complete) { +static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + size_t max_size_hint, + grpc_closure *on_complete) { + grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + GPR_ASSERT(stream->cursor < stream->backing_buffer->count); + return true; +} + +static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; GPR_ASSERT(stream->cursor < stream->backing_buffer->count); *slice = grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); stream->cursor++; - return 1; + return GRPC_ERROR_NONE; } static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -75,6 +88,7 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, stream->base.length = (uint32_t)slice_buffer->length; stream->base.flags = flags; stream->base.next = slice_buffer_stream_next; + stream->base.pull = slice_buffer_stream_pull; stream->base.destroy = slice_buffer_stream_destroy; stream->backing_buffer = slice_buffer; stream->cursor = 0; diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 1fdd5b4d77..381c65fb04 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -49,9 +49,10 @@ typedef struct grpc_byte_stream grpc_byte_stream; struct grpc_byte_stream { uint32_t length; uint32_t flags; - int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - grpc_slice *slice, size_t max_size_hint, - grpc_closure *on_complete); + bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + size_t max_size_hint, grpc_closure *on_complete); + grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + grpc_slice *slice); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); }; @@ -61,12 +62,20 @@ struct grpc_byte_stream { * * max_size_hint can be set as a hint as to the maximum number * of bytes that would be acceptable to read. + */ +int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete); + +/* returns the next slice in the byte stream when it is ready (indicated by + * either grpc_byte_stream_next returning 1 or on_complete passed to + * grpc_byte_stream_next is called). * * once a slice is returned into *slice, it is owned by the caller. */ -int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, grpc_slice *slice, - size_t max_size_hint, grpc_closure *on_complete); +grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice); void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); diff --git a/src/csharp/Grpc.Auth/project.json b/src/csharp/Grpc.Auth/project.json deleted file mode 100644 index 370bf11b2d..0000000000 --- a/src/csharp/Grpc.Auth/project.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "version": "1.3.0-dev", - "title": "gRPC C# Auth", - "authors": [ "Google Inc." ], - "copyright": "Copyright 2015, Google Inc.", - "packOptions": { - "summary": "Auth library for C# implementation of gRPC - an RPC library and framework", - "description": "Auth library for C# implementation of gRPC - an RPC library and framework. See project site for more info.", - "owners": [ "grpc-packages" ], - "licenseUrl": "https://github.com/grpc/grpc/blob/master/LICENSE", - "projectUrl": "https://github.com/grpc/grpc", - "requireLicenseAcceptance": false, - "tags": [ "gRPC RPC Protocol HTTP/2 Auth OAuth2" ], - }, - "buildOptions": { - "define": [ "SIGNED" ], - "keyFile": "../keys/Grpc.snk", - "xmlDoc": true, - "compile": { - "includeFiles": [ "../Grpc.Core/Version.cs" ] - } - }, - "dependencies": { - "Grpc.Core": "1.3.0-dev", - "Google.Apis.Auth": "1.21.0" - }, - "frameworks": { - "net45": { }, - "netstandard1.5": { - "dependencies": { - "NETStandard.Library": "1.6.0" - } - } - } -} diff --git a/src/csharp/Grpc.Core.Testing/project.json b/src/csharp/Grpc.Core.Testing/project.json deleted file mode 100644 index 38d5fab50e..0000000000 --- a/src/csharp/Grpc.Core.Testing/project.json +++ /dev/null @@ -1,39 +0,0 @@ -{ - "version": "1.3.0-dev", - "title": "gRPC C# Core Testing", - "authors": [ "Google Inc." ], - "copyright": "Copyright 2017, Google Inc.", - "packOptions": { - "summary": "Testing support for gRPC C#", - "description": "Useful when testing code that uses gRPC.", - "owners": [ "grpc-packages" ], - "licenseUrl": "https://github.com/grpc/grpc/blob/master/LICENSE", - "projectUrl": "https://github.com/grpc/grpc", - "requireLicenseAcceptance": false, - "tags": [ "gRPC test testing" ] - }, - "buildOptions": { - "define": [ "SIGNED" ], - "keyFile": "../keys/Grpc.snk", - "xmlDoc": true, - "compile": { - "includeFiles": [ "../Grpc.Core/Version.cs" ] - } - }, - "dependencies": { - "Grpc.Core": "1.3.0-dev" - }, - "frameworks": { - "net45": { - "frameworkAssemblies": { - "System.Runtime": "", - "System.IO": "" - } - }, - "netstandard1.5": { - "dependencies": { - "NETStandard.Library": "1.6.0" - } - } - } -} diff --git a/src/csharp/Grpc.HealthCheck/project.json b/src/csharp/Grpc.HealthCheck/project.json deleted file mode 100644 index e93d0bf81b..0000000000 --- a/src/csharp/Grpc.HealthCheck/project.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "version": "1.3.0-dev", - "title": "gRPC C# Healthchecking", - "authors": [ "Google Inc." ], - "copyright": "Copyright 2015, Google Inc.", - "packOptions": { - "summary": "Implementation of gRPC health service", - "description": "Example implementation of grpc.health.v1 service that can be used for health-checking.", - "owners": [ "grpc-packages" ], - "licenseUrl": "https://github.com/grpc/grpc/blob/master/LICENSE", - "projectUrl": "https://github.com/grpc/grpc", - "requireLicenseAcceptance": false, - "tags": [ "gRPC health check" ] - }, - "buildOptions": { - "define": [ "SIGNED" ], - "keyFile": "../keys/Grpc.snk", - "xmlDoc": true, - "compile": { - "includeFiles": [ "../Grpc.Core/Version.cs" ] - } - }, - "dependencies": { - "Grpc.Core": "1.3.0-dev", - "Google.Protobuf": "3.2.0" - }, - "frameworks": { - "net45": {}, - "netstandard1.5": { - "dependencies": { - "NETStandard.Library": "1.6.0" - } - } - } -} diff --git a/src/csharp/Grpc.Reflection/project.json b/src/csharp/Grpc.Reflection/project.json deleted file mode 100644 index 014c78e489..0000000000 --- a/src/csharp/Grpc.Reflection/project.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "version": "1.3.0-dev", - "title": "gRPC C# Reflection", - "authors": [ "Google Inc." ], - "copyright": "Copyright 2016, Google Inc.", - "packOptions": { - "summary": "Implementation of gRPC reflection service", - "description": "Provides information about services running on a gRPC C# server.", - "owners": [ "grpc-packages" ], - "licenseUrl": "https://github.com/grpc/grpc/blob/master/LICENSE", - "projectUrl": "https://github.com/grpc/grpc", - "requireLicenseAcceptance": false, - "tags": [ "gRPC reflection" ] - }, - "buildOptions": { - "define": [ "SIGNED" ], - "keyFile": "../keys/Grpc.snk", - "xmlDoc": true, - "compile": { - "includeFiles": [ "../Grpc.Core/Version.cs" ] - } - }, - "dependencies": { - "Grpc.Core": "1.3.0-dev", - "Google.Protobuf": "3.2.0" - }, - "frameworks": { - "net45": {}, - "netstandard1.5": { - "dependencies": { - "NETStandard.Library": "1.6.0" - } - } - } -} diff --git a/src/csharp/global.json b/src/csharp/global.json new file mode 100644 index 0000000000..f3c33cef6a --- /dev/null +++ b/src/csharp/global.json @@ -0,0 +1,5 @@ +{ + "sdk": { + "version": "1.0.0-preview2-003131" + } +}
\ No newline at end of file diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 88bcd6db0b..7f3cbb8ed1 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -31,23 +31,23 @@ * */ +#include <map> #include <memory> #include <vector> -#include <map> #include <node.h> -#include "grpc/support/log.h" -#include "grpc/grpc.h" -#include "grpc/grpc_security.h" -#include "grpc/support/alloc.h" -#include "grpc/support/time.h" #include "byte_buffer.h" #include "call.h" +#include "call_credentials.h" #include "channel.h" #include "completion_queue.h" #include "completion_queue_async_worker.h" -#include "call_credentials.h" +#include "grpc/grpc.h" +#include "grpc/grpc_security.h" +#include "grpc/support/alloc.h" +#include "grpc/support/log.h" +#include "grpc/support/time.h" #include "slice.h" #include "timeval.h" @@ -99,30 +99,31 @@ Local<Value> nanErrorWithCode(const char *msg, grpc_call_error code) { bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) { HandleScope scope; - grpc_metadata_array_init(array); Local<Array> keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked(); for (unsigned int i = 0; i < keys->Length(); i++) { - Local<String> current_key = Nan::To<String>( - Nan::Get(keys, i).ToLocalChecked()).ToLocalChecked(); + Local<String> current_key = + Nan::To<String>(Nan::Get(keys, i).ToLocalChecked()).ToLocalChecked(); Local<Value> value_array = Nan::Get(metadata, current_key).ToLocalChecked(); if (!value_array->IsArray()) { return false; } array->capacity += Local<Array>::Cast(value_array)->Length(); } - array->metadata = reinterpret_cast<grpc_metadata*>( - gpr_malloc(array->capacity * sizeof(grpc_metadata))); + array->metadata = reinterpret_cast<grpc_metadata *>( + gpr_zalloc(array->capacity * sizeof(grpc_metadata))); for (unsigned int i = 0; i < keys->Length(); i++) { Local<String> current_key(Nan::To<String>(keys->Get(i)).ToLocalChecked()); - Local<Array> values = Local<Array>::Cast( - Nan::Get(metadata, current_key).ToLocalChecked()); - grpc_slice key_slice = grpc_slice_intern(CreateSliceFromString(current_key)); + Local<Array> values = + Local<Array>::Cast(Nan::Get(metadata, current_key).ToLocalChecked()); + grpc_slice key_slice = CreateSliceFromString(current_key); + grpc_slice key_intern_slice = grpc_slice_intern(key_slice); + grpc_slice_unref(key_slice); for (unsigned int j = 0; j < values->Length(); j++) { Local<Value> value = Nan::Get(values, j).ToLocalChecked(); grpc_metadata *current = &array->metadata[array->count]; - current->key = key_slice; + current->key = key_intern_slice; // Only allow binary headers for "-bin" keys - if (grpc_is_binary_header(key_slice)) { + if (grpc_is_binary_header(key_intern_slice)) { if (::node::Buffer::HasInstance(value)) { current->value = CreateSliceFromBuffer(value); } else { @@ -142,13 +143,21 @@ bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) { return true; } +void DestroyMetadataArray(grpc_metadata_array *array) { + for (size_t i = 0; i < array->count; i++) { + // Don't unref keys because they are interned + grpc_slice_unref(array->metadata[i].value); + } + grpc_metadata_array_destroy(array); +} + Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { EscapableHandleScope scope; grpc_metadata *metadata_elements = metadata_array->metadata; size_t length = metadata_array->count; Local<Object> metadata_object = Nan::New<Object>(); for (unsigned int i = 0; i < length; i++) { - grpc_metadata* elem = &metadata_elements[i]; + grpc_metadata *elem = &metadata_elements[i]; // TODO(murgatroid99): Use zero-copy string construction instead Local<String> key_string = CopyStringFromSlice(elem->key); Local<Array> array; @@ -174,11 +183,12 @@ Local<Value> Op::GetOpType() const { return scope.Escape(Nan::New(GetTypeString()).ToLocalChecked()); } -Op::~Op() { -} +Op::~Op() {} class SendMetadataOp : public Op { public: + SendMetadataOp() { grpc_metadata_array_init(&send_metadata); } + ~SendMetadataOp() { DestroyMetadataArray(&send_metadata); } Local<Value> GetNodeValue() const { EscapableHandleScope scope; return scope.Escape(Nan::True()); @@ -187,33 +197,29 @@ class SendMetadataOp : public Op { if (!value->IsObject()) { return false; } - grpc_metadata_array array; MaybeLocal<Object> maybe_metadata = Nan::To<Object>(value); if (maybe_metadata.IsEmpty()) { return false; } - if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(), - &array)) { + if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(), &send_metadata)) { return false; } - out->data.send_initial_metadata.count = array.count; - out->data.send_initial_metadata.metadata = array.metadata; + out->data.send_initial_metadata.count = send_metadata.count; + out->data.send_initial_metadata.metadata = send_metadata.metadata; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } + protected: - std::string GetTypeString() const { - return "send_metadata"; - } + std::string GetTypeString() const { return "send_metadata"; } + + private: + grpc_metadata_array send_metadata; }; class SendMessageOp : public Op { public: - SendMessageOp() { - send_message = NULL; - } + SendMessageOp() { send_message = NULL; } ~SendMessageOp() { if (send_message != NULL) { grpc_byte_buffer_destroy(send_message); @@ -228,8 +234,8 @@ class SendMessageOp : public Op { return false; } Local<Object> object_value = Nan::To<Object>(value).ToLocalChecked(); - MaybeLocal<Value> maybe_flag_value = Nan::Get( - object_value, Nan::New("grpcWriteFlags").ToLocalChecked()); + MaybeLocal<Value> maybe_flag_value = + Nan::Get(object_value, Nan::New("grpcWriteFlags").ToLocalChecked()); if (!maybe_flag_value.IsEmpty()) { Local<Value> flag_value = maybe_flag_value.ToLocalChecked(); if (flag_value->IsUint32()) { @@ -241,13 +247,11 @@ class SendMessageOp : public Op { out->data.send_message.send_message = send_message; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } + protected: - std::string GetTypeString() const { - return "send_message"; - } + std::string GetTypeString() const { return "send_message"; } + private: grpc_byte_buffer *send_message; }; @@ -258,22 +262,19 @@ class SendClientCloseOp : public Op { EscapableHandleScope scope; return scope.Escape(Nan::True()); } - bool ParseOp(Local<Value> value, grpc_op *out) { - return true; - } - bool IsFinalOp() { - return false; - } + bool ParseOp(Local<Value> value, grpc_op *out) { return true; } + bool IsFinalOp() { return false; } + protected: - std::string GetTypeString() const { - return "client_close"; - } + std::string GetTypeString() const { return "client_close"; } }; class SendServerStatusOp : public Op { public: + SendServerStatusOp() { grpc_metadata_array_init(&status_metadata); } ~SendServerStatusOp() { grpc_slice_unref(details); + DestroyMetadataArray(&status_metadata); } Local<Value> GetNodeValue() const { EscapableHandleScope scope; @@ -284,18 +285,18 @@ class SendServerStatusOp : public Op { return false; } Local<Object> server_status = Nan::To<Object>(value).ToLocalChecked(); - MaybeLocal<Value> maybe_metadata = Nan::Get( - server_status, Nan::New("metadata").ToLocalChecked()); + MaybeLocal<Value> maybe_metadata = + Nan::Get(server_status, Nan::New("metadata").ToLocalChecked()); if (maybe_metadata.IsEmpty()) { return false; } if (!maybe_metadata.ToLocalChecked()->IsObject()) { return false; } - Local<Object> metadata = Nan::To<Object>( - maybe_metadata.ToLocalChecked()).ToLocalChecked(); - MaybeLocal<Value> maybe_code = Nan::Get(server_status, - Nan::New("code").ToLocalChecked()); + Local<Object> metadata = + Nan::To<Object>(maybe_metadata.ToLocalChecked()).ToLocalChecked(); + MaybeLocal<Value> maybe_code = + Nan::Get(server_status, Nan::New("code").ToLocalChecked()); if (maybe_code.IsEmpty()) { return false; } @@ -303,49 +304,44 @@ class SendServerStatusOp : public Op { return false; } uint32_t code = Nan::To<uint32_t>(maybe_code.ToLocalChecked()).FromJust(); - MaybeLocal<Value> maybe_details = Nan::Get( - server_status, Nan::New("details").ToLocalChecked()); + MaybeLocal<Value> maybe_details = + Nan::Get(server_status, Nan::New("details").ToLocalChecked()); if (maybe_details.IsEmpty()) { return false; } if (!maybe_details.ToLocalChecked()->IsString()) { return false; } - Local<String> details = Nan::To<String>( - maybe_details.ToLocalChecked()).ToLocalChecked(); - grpc_metadata_array array; - if (!CreateMetadataArray(metadata, &array)) { + Local<String> details = + Nan::To<String>(maybe_details.ToLocalChecked()).ToLocalChecked(); + if (!CreateMetadataArray(metadata, &status_metadata)) { return false; } - out->data.send_status_from_server.trailing_metadata_count = array.count; - out->data.send_status_from_server.trailing_metadata = array.metadata; + out->data.send_status_from_server.trailing_metadata_count = + status_metadata.count; + out->data.send_status_from_server.trailing_metadata = + status_metadata.metadata; out->data.send_status_from_server.status = static_cast<grpc_status_code>(code); this->details = CreateSliceFromString(details); out->data.send_status_from_server.status_details = &this->details; return true; } - bool IsFinalOp() { - return true; - } + bool IsFinalOp() { return true; } + protected: - std::string GetTypeString() const { - return "send_status"; - } + std::string GetTypeString() const { return "send_status"; } private: grpc_slice details; + grpc_metadata_array status_metadata; }; class GetMetadataOp : public Op { public: - GetMetadataOp() { - grpc_metadata_array_init(&recv_metadata); - } + GetMetadataOp() { grpc_metadata_array_init(&recv_metadata); } - ~GetMetadataOp() { - grpc_metadata_array_destroy(&recv_metadata); - } + ~GetMetadataOp() { grpc_metadata_array_destroy(&recv_metadata); } Local<Value> GetNodeValue() const { EscapableHandleScope scope; @@ -356,14 +352,10 @@ class GetMetadataOp : public Op { out->data.recv_initial_metadata.recv_initial_metadata = &recv_metadata; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } protected: - std::string GetTypeString() const { - return "metadata"; - } + std::string GetTypeString() const { return "metadata"; } private: grpc_metadata_array recv_metadata; @@ -371,9 +363,7 @@ class GetMetadataOp : public Op { class ReadMessageOp : public Op { public: - ReadMessageOp() { - recv_message = NULL; - } + ReadMessageOp() { recv_message = NULL; } ~ReadMessageOp() { if (recv_message != NULL) { grpc_byte_buffer_destroy(recv_message); @@ -388,14 +378,10 @@ class ReadMessageOp : public Op { out->data.recv_message.recv_message = &recv_message; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } protected: - std::string GetTypeString() const { - return "read"; - } + std::string GetTypeString() const { return "read"; } private: grpc_byte_buffer *recv_message; @@ -403,13 +389,9 @@ class ReadMessageOp : public Op { class ClientStatusOp : public Op { public: - ClientStatusOp() { - grpc_metadata_array_init(&metadata_array); - } + ClientStatusOp() { grpc_metadata_array_init(&metadata_array); } - ~ClientStatusOp() { - grpc_metadata_array_destroy(&metadata_array); - } + ~ClientStatusOp() { grpc_metadata_array_destroy(&metadata_array); } bool ParseOp(Local<Value> value, grpc_op *out) { out->data.recv_status_on_client.trailing_metadata = &metadata_array; @@ -422,20 +404,18 @@ class ClientStatusOp : public Op { EscapableHandleScope scope; Local<Object> status_obj = Nan::New<Object>(); Nan::Set(status_obj, Nan::New("code").ToLocalChecked(), - Nan::New<Number>(status)); + Nan::New<Number>(status)); Nan::Set(status_obj, Nan::New("details").ToLocalChecked(), - CopyStringFromSlice(status_details)); + CopyStringFromSlice(status_details)); Nan::Set(status_obj, Nan::New("metadata").ToLocalChecked(), ParseMetadata(&metadata_array)); return scope.Escape(status_obj); } - bool IsFinalOp() { - return true; - } + bool IsFinalOp() { return true; } + protected: - std::string GetTypeString() const { - return "status"; - } + std::string GetTypeString() const { return "status"; } + private: grpc_metadata_array metadata_array; grpc_status_code status; @@ -453,21 +433,19 @@ class ServerCloseResponseOp : public Op { out->data.recv_close_on_server.cancelled = &cancelled; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } protected: - std::string GetTypeString() const { - return "cancelled"; - } + std::string GetTypeString() const { return "cancelled"; } private: int cancelled; }; -tag::tag(Callback *callback, OpVec *ops, Call *call) : - callback(callback), ops(ops), call(call){ +tag::tag(Callback *callback, OpVec *ops, Call *call, Local<Value> call_value) + : callback(callback), ops(ops), call(call) { + HandleScope scope; + call_persist.Reset(call_value); } tag::~tag() { @@ -513,17 +491,18 @@ void DestroyTag(void *tag) { delete tag_struct; } -Call::Call(grpc_call *call) : wrapped_call(call), - pending_batches(0), - has_final_op_completed(false) { -} - -Call::~Call() { - if (wrapped_call != NULL) { - grpc_call_unref(wrapped_call); +void Call::DestroyCall() { + if (this->wrapped_call != NULL) { + grpc_call_unref(this->wrapped_call); + this->wrapped_call = NULL; } } +Call::Call(grpc_call *call) + : wrapped_call(call), pending_batches(0), has_final_op_completed(false) {} + +Call::~Call() { DestroyCall(); } + void Call::Init(Local<Object> exports) { HandleScope scope; Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); @@ -551,10 +530,10 @@ Local<Value> Call::WrapStruct(grpc_call *call) { return scope.Escape(Nan::Null()); } const int argc = 1; - Local<Value> argv[argc] = {Nan::New<External>( - reinterpret_cast<void *>(call))}; - MaybeLocal<Object> maybe_instance = Nan::NewInstance( - constructor->GetFunction(), argc, argv); + Local<Value> argv[argc] = { + Nan::New<External>(reinterpret_cast<void *>(call))}; + MaybeLocal<Object> maybe_instance = + Nan::NewInstance(constructor->GetFunction(), argc, argv); if (maybe_instance.IsEmpty()) { return scope.Escape(Nan::Null()); } else { @@ -568,19 +547,25 @@ void Call::CompleteBatch(bool is_final_op) { } this->pending_batches--; if (this->has_final_op_completed && this->pending_batches == 0) { - grpc_call_unref(this->wrapped_call); - this->wrapped_call = NULL; + this->DestroyCall(); } } NAN_METHOD(Call::New) { + /* Arguments: + * 0: Channel to make the call on + * 1: Method + * 2: Deadline + * 3: host + * 4: parent Call + * 5: propagation flags + */ if (info.IsConstructCall()) { Call *call; if (info[0]->IsExternal()) { Local<External> ext = info[0].As<External>(); // This option is used for wrapping an existing call - grpc_call *call_value = - reinterpret_cast<grpc_call *>(ext->Value()); + grpc_call *call_value = reinterpret_cast<grpc_call *>(ext->Value()); call = new Call(call_value); } else { if (!Channel::HasInstance(info[0])) { @@ -596,8 +581,8 @@ NAN_METHOD(Call::New) { // These arguments are at the end because they are optional grpc_call *parent_call = NULL; if (Call::HasInstance(info[4])) { - Call *parent_obj = ObjectWrap::Unwrap<Call>( - Nan::To<Object>(info[4]).ToLocalChecked()); + Call *parent_obj = + ObjectWrap::Unwrap<Call>(Nan::To<Object>(info[4]).ToLocalChecked()); parent_call = parent_obj->wrapped_call; } else if (!(info[4]->IsUndefined() || info[4]->IsNull())) { return Nan::ThrowTypeError( @@ -618,25 +603,24 @@ NAN_METHOD(Call::New) { double deadline = Nan::To<double>(info[2]).FromJust(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); grpc_call *wrapped_call; + grpc_slice method = + CreateSliceFromString(Nan::To<String>(info[1]).ToLocalChecked()); if (info[3]->IsString()) { grpc_slice *host = new grpc_slice; - *host = CreateSliceFromString( - Nan::To<String>(info[3]).ToLocalChecked()); + *host = + CreateSliceFromString(Nan::To<String>(info[3]).ToLocalChecked()); wrapped_call = grpc_channel_create_call( - wrapped_channel, parent_call, propagate_flags, - GetCompletionQueue(), CreateSliceFromString( - Nan::To<String>(info[1]).ToLocalChecked()), - host, MillisecondsToTimespec(deadline), NULL); + wrapped_channel, parent_call, propagate_flags, GetCompletionQueue(), + method, host, MillisecondsToTimespec(deadline), NULL); delete host; } else if (info[3]->IsUndefined() || info[3]->IsNull()) { wrapped_call = grpc_channel_create_call( - wrapped_channel, parent_call, propagate_flags, - GetCompletionQueue(), CreateSliceFromString( - Nan::To<String>(info[1]).ToLocalChecked()), - NULL, MillisecondsToTimespec(deadline), NULL); + wrapped_channel, parent_call, propagate_flags, GetCompletionQueue(), + method, NULL, MillisecondsToTimespec(deadline), NULL); } else { return Nan::ThrowTypeError("Call's fourth argument must be a string"); } + grpc_slice_unref(method); call = new Call(wrapped_call); Nan::Set(info.This(), Nan::New("channel_").ToLocalChecked(), channel_object); @@ -646,8 +630,8 @@ NAN_METHOD(Call::New) { } else { const int argc = 4; Local<Value> argv[argc] = {info[0], info[1], info[2], info[3]}; - MaybeLocal<Object> maybe_instance = Nan::NewInstance( - constructor->GetFunction(), argc, argv); + MaybeLocal<Object> maybe_instance = + Nan::NewInstance(constructor->GetFunction(), argc, argv); if (maybe_instance.IsEmpty()) { // There's probably a pending exception return; @@ -720,8 +704,8 @@ NAN_METHOD(Call::StartBatch) { } Callback *callback = new Callback(callback_func); grpc_call_error error = grpc_call_start_batch( - call->wrapped_call, &ops[0], nops, new struct tag( - callback, op_vector.release(), call), NULL); + call->wrapped_call, &ops[0], nops, + new struct tag(callback, op_vector.release(), call, info.This()), NULL); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("startBatch failed", error)); } @@ -754,8 +738,8 @@ NAN_METHOD(Call::CancelWithStatus) { "cancelWithStatus's second argument must be a string"); } Call *call = ObjectWrap::Unwrap<Call>(info.This()); - grpc_status_code code = static_cast<grpc_status_code>( - Nan::To<uint32_t>(info[0]).FromJust()); + grpc_status_code code = + static_cast<grpc_status_code>(Nan::To<uint32_t>(info[0]).FromJust()); if (code == GRPC_STATUS_OK) { return Nan::ThrowRangeError( "cancelWithStatus cannot be called with OK status"); diff --git a/src/node/ext/call.h b/src/node/ext/call.h index cffff00fce..53a5e4ab67 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -58,6 +58,8 @@ v8::Local<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array); bool CreateMetadataArray(v8::Local<v8::Object> metadata, grpc_metadata_array *array); +void DestroyMetadataArray(grpc_metadata_array *array); + /* Wrapper class for grpc_call structs. */ class Call : public Nan::ObjectWrap { public: @@ -76,6 +78,8 @@ class Call : public Nan::ObjectWrap { Call(const Call &); Call &operator=(const Call &); + void DestroyCall(); + static NAN_METHOD(New); static NAN_METHOD(StartBatch); static NAN_METHOD(Cancel); @@ -109,11 +113,14 @@ class Op { typedef std::vector<unique_ptr<Op>> OpVec; struct tag { - tag(Nan::Callback *callback, OpVec *ops, Call *call); + tag(Nan::Callback *callback, OpVec *ops, Call *call, + v8::Local<v8::Value> call_value); ~tag(); Nan::Callback *callback; OpVec *ops; Call *call; + Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>> + call_persist; }; v8::Local<v8::Value> GetTagNodeValue(void *tag); diff --git a/src/node/ext/call_credentials.cc b/src/node/ext/call_credentials.cc index afcc363131..5bd4bdcd5a 100644 --- a/src/node/ext/call_credentials.cc +++ b/src/node/ext/call_credentials.cc @@ -211,6 +211,7 @@ NAN_METHOD(PluginCallback) { Utf8String details_utf8_str(info[1]); char *details = *details_utf8_str; grpc_metadata_array array; + grpc_metadata_array_init(&array); Local<Object> callback_data = Nan::To<Object>(info[3]).ToLocalChecked(); if (!CreateMetadataArray(Nan::To<Object>(info[2]).ToLocalChecked(), &array)){ @@ -226,6 +227,7 @@ NAN_METHOD(PluginCallback) { Nan::New("user_data").ToLocalChecked() ).ToLocalChecked().As<External>()->Value(); cb(user_data, array.metadata, array.count, code, details); + DestroyMetadataArray(&array); } NAUV_WORK_CB(SendPluginCallback) { diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index c795ff7f42..1263cc0d28 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -280,7 +280,7 @@ NAN_METHOD(Channel::WatchConnectivityState) { channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline), GetCompletionQueue(), new struct tag(callback, - ops.release(), NULL)); + ops.release(), NULL, Nan::Null())); CompletionQueueNext(); } diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index 95e273f8ac..122e5e63ee 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -286,8 +286,10 @@ NAN_METHOD(MetadataKeyIsLegal) { "headerKeyIsLegal's argument must be a string"); } Local<String> key = Nan::To<String>(info[0]).ToLocalChecked(); + grpc_slice slice = CreateSliceFromString(key); info.GetReturnValue().Set(static_cast<bool>( - grpc_header_key_is_legal(CreateSliceFromString(key)))); + grpc_header_key_is_legal(slice))); + grpc_slice_unref(slice); } NAN_METHOD(MetadataNonbinValueIsLegal) { @@ -296,8 +298,10 @@ NAN_METHOD(MetadataNonbinValueIsLegal) { "metadataNonbinValueIsLegal's argument must be a string"); } Local<String> value = Nan::To<String>(info[0]).ToLocalChecked(); + grpc_slice slice = CreateSliceFromString(value); info.GetReturnValue().Set(static_cast<bool>( - grpc_header_nonbin_value_is_legal(CreateSliceFromString(value)))); + grpc_header_nonbin_value_is_legal(slice))); + grpc_slice_unref(slice); } NAN_METHOD(MetadataKeyIsBinary) { @@ -306,8 +310,10 @@ NAN_METHOD(MetadataKeyIsBinary) { "metadataKeyIsLegal's argument must be a string"); } Local<String> key = Nan::To<String>(info[0]).ToLocalChecked(); + grpc_slice slice = CreateSliceFromString(key); info.GetReturnValue().Set(static_cast<bool>( - grpc_is_binary_header(CreateSliceFromString(key)))); + grpc_is_binary_header(slice))); + grpc_slice_unref(slice); } static grpc_ssl_roots_override_result get_ssl_roots_override( diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index ccb55aa54c..f0920c842a 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -193,7 +193,7 @@ NAN_METHOD(Server::RequestCall) { GetCompletionQueue(), GetCompletionQueue(), new struct tag(new Callback(info[0].As<Function>()), ops.release(), - NULL)); + NULL, Nan::Null())); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("requestCall failed", error)); } @@ -246,7 +246,7 @@ NAN_METHOD(Server::TryShutdown) { grpc_server_shutdown_and_notify( server->wrapped_server, GetCompletionQueue(), new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(), - NULL)); + NULL, Nan::Null())); CompletionQueueNext(); } diff --git a/src/node/ext/server_uv.cc b/src/node/ext/server_uv.cc index c5e5ca9f42..82e7589fc8 100644 --- a/src/node/ext/server_uv.cc +++ b/src/node/ext/server_uv.cc @@ -118,7 +118,8 @@ void Server::ShutdownServer() { grpc_server_shutdown_and_notify( this->wrapped_server, GetCompletionQueue(), - new struct tag(new Callback(**shutdown_callback), ops.release(), NULL)); + new struct tag(new Callback(**shutdown_callback), ops.release(), NULL, + Nan::Null())); grpc_server_cancel_all_calls(this->wrapped_server); CompletionQueueNext(); this->wrapped_server = NULL; diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m index 9105356869..69968dcb60 100644 --- a/src/objective-c/tests/InteropTests.m +++ b/src/objective-c/tests/InteropTests.m @@ -100,6 +100,15 @@ return 0; } ++ (void)setUp { +#ifdef GRPC_COMPILE_WITH_CRONET + // Cronet setup + [Cronet setHttp2Enabled:YES]; + [Cronet start]; + [GRPCCall useCronetWithEngine:[Cronet getGlobalEngine]]; +#endif +} + - (void)setUp { self.continueAfterFailure = NO; diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 0d0a5fb088..7f810bd0b4 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -216,6 +216,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/transport/hpack_encoder.c', 'src/core/ext/transport/chttp2/transport/hpack_parser.c', 'src/core/ext/transport/chttp2/transport/hpack_table.c', + 'src/core/ext/transport/chttp2/transport/http2_settings.c', 'src/core/ext/transport/chttp2/transport/huffsyms.c', 'src/core/ext/transport/chttp2/transport/incoming_metadata.c', 'src/core/ext/transport/chttp2/transport/parsing.c', |