aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-07 13:50:59 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-07 13:50:59 -0700
commit22e217d8fdffac425f64a11b18886026060e767c (patch)
tree33048e60357a7fe96a985df59139f4943a39a38d /src/core/transport/chttp2
parent737aa9f1aec55e7f8e1d3f44d291aa5ea758c08c (diff)
parent772187cdf0ff9dfafd2e693474c51eeddfe4c800 (diff)
Merge github.com:grpc/grpc into flow-like-lava-to-a-barnyard
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c1
-rw-r--r--src/core/transport/chttp2/internal.h49
-rw-r--r--src/core/transport/chttp2/stream_lists.c22
-rw-r--r--src/core/transport/chttp2/writing.c13
4 files changed, 51 insertions, 34 deletions
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index a4b7174329..68e0912b9c 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -124,6 +124,7 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
sopb->ops[i].data.metadata.list.tail =
(void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail);
}
+ src->count = 0;
}
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 45addd8c2b..93e7836db5 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -34,7 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
-#include "src/core/transport/transport_impl.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
@@ -47,6 +46,8 @@
#include "src/core/transport/chttp2/incoming_metadata.h"
#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
+#include "src/core/transport/connectivity_state.h"
+#include "src/core/transport/transport_impl.h"
typedef struct grpc_chttp2_transport grpc_chttp2_transport;
typedef struct grpc_chttp2_stream grpc_chttp2_stream;
@@ -62,6 +63,7 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE,
GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
@@ -134,12 +136,6 @@ typedef struct {
grpc_chttp2_stream *prev;
} grpc_chttp2_stream_link;
-typedef enum {
- GRPC_CHTTP2_ERROR_STATE_NONE,
- GRPC_CHTTP2_ERROR_STATE_SEEN,
- GRPC_CHTTP2_ERROR_STATE_NOTIFIED
-} grpc_chttp2_error_state;
-
/* We keep several sets of connection wide parameters */
typedef enum {
/* The settings our peer has asked for (and we have acked) */
@@ -165,7 +161,8 @@ typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** queued callbacks */
- grpc_iomgr_closure *pending_closures;
+ grpc_iomgr_closure *pending_closures_head;
+ grpc_iomgr_closure *pending_closures_tail;
/** window available for us to send to peer */
gpr_uint32 outgoing_window;
@@ -174,6 +171,9 @@ typedef struct {
/** how much window would we like to have for incoming_window */
gpr_uint32 connection_window_target;
+ /** have we seen a goaway */
+ gpr_uint8 seen_goaway;
+
/** is this transport a client? */
gpr_uint8 is_client;
/** are the local settings dirty and need to be sent? */
@@ -185,10 +185,6 @@ typedef struct {
/** settings values */
gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
- /** has there been a connection level error, and have we notified
- anyone about it? */
- grpc_chttp2_error_state error_state;
-
/** what is the next stream id to be allocated by this peer?
copied to next_stream_id in parsing when parsing commences */
gpr_uint32 next_stream_id;
@@ -204,13 +200,6 @@ typedef struct {
/** concurrent stream count: updated when not parsing,
so this is a strict over-estimation on the client */
gpr_uint32 concurrent_stream_count;
-
- /** is there a goaway available? (boolean) */
- grpc_chttp2_error_state goaway_state;
- /** what is the debug text of the goaway? */
- gpr_slice goaway_text;
- /** what is the status code of the goaway? */
- grpc_status_code goaway_error;
} grpc_chttp2_transport_global;
typedef struct {
@@ -343,14 +332,13 @@ struct grpc_chttp2_transport {
grpc_chttp2_stream **accepting_stream;
struct {
- /** is a thread currently performing channel callbacks */
- gpr_uint8 executing;
- /** transport channel-level callback */
- const grpc_transport_callbacks *cb;
- /** user data for cb calls */
- void *cb_user_data;
- /** closure for notifying transport closure */
- grpc_iomgr_closure notify_closed;
+ /* accept stream callback */
+ void (*accept_stream)(void *user_data, grpc_transport *transport,
+ const void *server_data);
+ void *accept_stream_user_data;
+
+ /** connectivity tracking */
+ grpc_connectivity_state_tracker state_tracker;
} channel_callback;
};
@@ -551,6 +539,13 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
+void grpc_chttp2_list_add_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global);
+int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global);
+
void grpc_chttp2_list_add_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index c6ba12fca8..85691b32d2 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -222,7 +222,9 @@ int grpc_chttp2_list_pop_writable_window_update_stream(
void grpc_chttp2_list_remove_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
- stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
+ stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
}
void grpc_chttp2_list_add_parsing_seen_stream(
@@ -282,6 +284,24 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
return r;
}
+void grpc_chttp2_list_add_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+}
+
+int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global) {
+ grpc_chttp2_stream *stream;
+ int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+ *stream_global = &stream->global;
+ return r;
+}
+
void grpc_chttp2_list_add_incoming_window_updated(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index ab50c5639d..d49caa4870 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -97,12 +97,8 @@ int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
- /* we should either exhaust window or have no ops left, but not both */
- if (stream_global->outgoing_sopb->nops == 0) {
- stream_global->outgoing_sopb = NULL;
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->send_done_closure, 1);
- } else if (stream_global->outgoing_window > 0) {
+ if (stream_global->outgoing_window > 0 &&
+ stream_global->outgoing_sopb->nops != 0) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
@@ -201,6 +197,11 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
+ if (stream_global->outgoing_sopb->nops == 0) {
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global,
+ stream_global->send_done_closure, 1);
+ }
if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {