aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r--src/core/transport/chttp2/hpack_parser.c7
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c5
-rw-r--r--src/core/transport/chttp2/internal.h57
-rw-r--r--src/core/transport/chttp2/parsing.c3
-rw-r--r--src/core/transport/chttp2/stream_lists.c31
-rw-r--r--src/core/transport/chttp2/writing.c13
6 files changed, 70 insertions, 46 deletions
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c
index b8ab664db5..c729e0f22f 100644
--- a/src/core/transport/chttp2/hpack_parser.c
+++ b/src/core/transport/chttp2/hpack_parser.c
@@ -1329,12 +1329,9 @@ static int parse_value_string_with_literal_key(grpc_chttp2_hpack_parser *p,
/* PUBLIC INTERFACE */
static void on_header_not_set(void *user_data, grpc_mdelem *md) {
- char *keyhex =
- gpr_hexdump(grpc_mdstr_as_c_string(md->key),
- GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT);
+ char *keyhex = gpr_dump_slice(md->key->slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
char *valuehex =
- gpr_hexdump(grpc_mdstr_as_c_string(md->value),
- GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT);
+ gpr_dump_slice(md->value->slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_ERROR, "on_header callback not set; key=%s value=%s", keyhex,
valuehex);
gpr_free(keyhex);
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index e81927ab20..68e0912b9c 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -47,6 +47,10 @@ void grpc_chttp2_incoming_metadata_buffer_init(
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_chttp2_incoming_metadata_buffer *buffer) {
+ size_t i;
+ for (i = 0; i < buffer->count; i++) {
+ grpc_mdelem_unref(buffer->elems[i].md);
+ }
gpr_free(buffer->elems);
}
@@ -120,6 +124,7 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
sopb->ops[i].data.metadata.list.tail =
(void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail);
}
+ src->count = 0;
}
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 02c94744ee..bdd4b432eb 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -34,7 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
-#include "src/core/transport/transport_impl.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
@@ -47,6 +46,8 @@
#include "src/core/transport/chttp2/incoming_metadata.h"
#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
+#include "src/core/transport/connectivity_state.h"
+#include "src/core/transport/transport_impl.h"
typedef struct grpc_chttp2_transport grpc_chttp2_transport;
typedef struct grpc_chttp2_stream grpc_chttp2_stream;
@@ -62,6 +63,7 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE,
GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
@@ -134,12 +136,6 @@ typedef struct {
grpc_chttp2_stream *prev;
} grpc_chttp2_stream_link;
-typedef enum {
- GRPC_CHTTP2_ERROR_STATE_NONE,
- GRPC_CHTTP2_ERROR_STATE_SEEN,
- GRPC_CHTTP2_ERROR_STATE_NOTIFIED
-} grpc_chttp2_error_state;
-
/* We keep several sets of connection wide parameters */
typedef enum {
/* The settings our peer has asked for (and we have acked) */
@@ -165,7 +161,8 @@ typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** queued callbacks */
- grpc_iomgr_closure *pending_closures;
+ grpc_iomgr_closure *pending_closures_head;
+ grpc_iomgr_closure *pending_closures_tail;
/** window available for us to send to peer */
gpr_uint32 outgoing_window;
@@ -174,6 +171,11 @@ typedef struct {
/** how much window would we like to have for incoming_window */
gpr_uint32 connection_window_target;
+ /** have we seen a goaway */
+ gpr_uint8 seen_goaway;
+ /** have we sent a goaway */
+ gpr_uint8 sent_goaway;
+
/** is this transport a client? */
gpr_uint8 is_client;
/** are the local settings dirty and need to be sent? */
@@ -185,10 +187,6 @@ typedef struct {
/** settings values */
gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
- /** has there been a connection level error, and have we notified
- anyone about it? */
- grpc_chttp2_error_state error_state;
-
/** what is the next stream id to be allocated by this peer?
copied to next_stream_id in parsing when parsing commences */
gpr_uint32 next_stream_id;
@@ -204,13 +202,6 @@ typedef struct {
/** concurrent stream count: updated when not parsing,
so this is a strict over-estimation on the client */
gpr_uint32 concurrent_stream_count;
-
- /** is there a goaway available? (boolean) */
- grpc_chttp2_error_state goaway_state;
- /** what is the debug text of the goaway? */
- gpr_slice goaway_text;
- /** what is the status code of the goaway? */
- grpc_status_code goaway_error;
} grpc_chttp2_transport_global;
typedef struct {
@@ -343,14 +334,13 @@ struct grpc_chttp2_transport {
grpc_chttp2_stream **accepting_stream;
struct {
- /** is a thread currently performing channel callbacks */
- gpr_uint8 executing;
- /** transport channel-level callback */
- const grpc_transport_callbacks *cb;
- /** user data for cb calls */
- void *cb_user_data;
- /** closure for notifying transport closure */
- grpc_iomgr_closure notify_closed;
+ /* accept stream callback */
+ void (*accept_stream)(void *user_data, grpc_transport *transport,
+ const void *server_data);
+ void *accept_stream_user_data;
+
+ /** connectivity tracking */
+ grpc_connectivity_state_tracker state_tracker;
} channel_callback;
};
@@ -539,6 +529,13 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
+void grpc_chttp2_list_add_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global);
+int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global);
+
void grpc_chttp2_list_add_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
@@ -562,8 +559,10 @@ void grpc_chttp2_add_incoming_goaway(
void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
+/* returns 1 if this is the last stream, 0 otherwise */
+int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
+int grpc_chttp2_has_streams(grpc_chttp2_transport *t);
void grpc_chttp2_for_all_streams(
grpc_chttp2_transport_global *transport_global, void *user_data,
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 8f682e9017..4664a0895c 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -109,9 +109,6 @@ void grpc_chttp2_publish_reads(
transport_parsing->incoming_stream_id;
}
- /* TODO(ctiller): re-implement */
- GPR_ASSERT(transport_parsing->initial_window_update == 0);
-
/* copy parsing qbuf to global qbuf */
gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index c6ba12fca8..4fea058c19 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -222,7 +222,9 @@ int grpc_chttp2_list_pop_writable_window_update_stream(
void grpc_chttp2_list_remove_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
- stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
+ stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
}
void grpc_chttp2_list_add_parsing_seen_stream(
@@ -282,6 +284,24 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
return r;
}
+void grpc_chttp2_list_add_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+}
+
+int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global) {
+ grpc_chttp2_stream *stream;
+ int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+ *stream_global = &stream->global;
+ return r;
+}
+
void grpc_chttp2_list_add_incoming_window_updated(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
@@ -334,9 +354,14 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
-void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
+int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
+ stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);
+ return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
+}
+
+int grpc_chttp2_has_streams(grpc_chttp2_transport *t) {
+ return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS);
}
void grpc_chttp2_for_all_streams(
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index fdcc300099..a78654334e 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -97,12 +97,8 @@ int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
- /* we should either exhaust window or have no ops left, but not both */
- if (stream_global->outgoing_sopb->nops == 0) {
- stream_global->outgoing_sopb = NULL;
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->send_done_closure, 1);
- } else if (stream_global->outgoing_window > 0) {
+ if (stream_global->outgoing_window > 0 &&
+ stream_global->outgoing_sopb->nops != 0) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
@@ -201,6 +197,11 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
+ if (stream_global->outgoing_sopb->nops == 0) {
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global,
+ stream_global->send_done_closure, 1);
+ }
if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {