aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-07-08 01:04:46 +0200
committerGravatar Julien Boeuf <jboeuf@google.com>2015-07-08 01:04:46 +0200
commit607359467fe318c22a7e8bdc59528060e51251a0 (patch)
tree6b296dbce36726ae6ce82648abb998500813d5fa /src/core/transport
parent272814feeac2cbb4562fabf5e5e8a8a64a1e3b81 (diff)
parent772187cdf0ff9dfafd2e693474c51eeddfe4c800 (diff)
Merge branch 'master' of https://github.com/grpc/grpc into slice_to_cstring
Diffstat (limited to 'src/core/transport')
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c1
-rw-r--r--src/core/transport/chttp2/internal.h49
-rw-r--r--src/core/transport/chttp2/parsing.c3
-rw-r--r--src/core/transport/chttp2/stream_lists.c22
-rw-r--r--src/core/transport/chttp2/writing.c13
-rw-r--r--src/core/transport/chttp2_transport.c345
-rw-r--r--src/core/transport/chttp2_transport.h12
-rw-r--r--src/core/transport/connectivity_state.c112
-rw-r--r--src/core/transport/connectivity_state.h74
-rw-r--r--src/core/transport/stream_op.h2
-rw-r--r--src/core/transport/transport.c55
-rw-r--r--src/core/transport/transport.h128
-rw-r--r--src/core/transport/transport_impl.h23
-rw-r--r--src/core/transport/transport_op_string.c13
14 files changed, 468 insertions, 384 deletions
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index a4b7174329..68e0912b9c 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -124,6 +124,7 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
sopb->ops[i].data.metadata.list.tail =
(void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail);
}
+ src->count = 0;
}
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 02c94744ee..7f98a5bd71 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -34,7 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
-#include "src/core/transport/transport_impl.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
@@ -47,6 +46,8 @@
#include "src/core/transport/chttp2/incoming_metadata.h"
#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
+#include "src/core/transport/connectivity_state.h"
+#include "src/core/transport/transport_impl.h"
typedef struct grpc_chttp2_transport grpc_chttp2_transport;
typedef struct grpc_chttp2_stream grpc_chttp2_stream;
@@ -62,6 +63,7 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE,
GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
@@ -134,12 +136,6 @@ typedef struct {
grpc_chttp2_stream *prev;
} grpc_chttp2_stream_link;
-typedef enum {
- GRPC_CHTTP2_ERROR_STATE_NONE,
- GRPC_CHTTP2_ERROR_STATE_SEEN,
- GRPC_CHTTP2_ERROR_STATE_NOTIFIED
-} grpc_chttp2_error_state;
-
/* We keep several sets of connection wide parameters */
typedef enum {
/* The settings our peer has asked for (and we have acked) */
@@ -165,7 +161,8 @@ typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** queued callbacks */
- grpc_iomgr_closure *pending_closures;
+ grpc_iomgr_closure *pending_closures_head;
+ grpc_iomgr_closure *pending_closures_tail;
/** window available for us to send to peer */
gpr_uint32 outgoing_window;
@@ -174,6 +171,9 @@ typedef struct {
/** how much window would we like to have for incoming_window */
gpr_uint32 connection_window_target;
+ /** have we seen a goaway */
+ gpr_uint8 seen_goaway;
+
/** is this transport a client? */
gpr_uint8 is_client;
/** are the local settings dirty and need to be sent? */
@@ -185,10 +185,6 @@ typedef struct {
/** settings values */
gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
- /** has there been a connection level error, and have we notified
- anyone about it? */
- grpc_chttp2_error_state error_state;
-
/** what is the next stream id to be allocated by this peer?
copied to next_stream_id in parsing when parsing commences */
gpr_uint32 next_stream_id;
@@ -204,13 +200,6 @@ typedef struct {
/** concurrent stream count: updated when not parsing,
so this is a strict over-estimation on the client */
gpr_uint32 concurrent_stream_count;
-
- /** is there a goaway available? (boolean) */
- grpc_chttp2_error_state goaway_state;
- /** what is the debug text of the goaway? */
- gpr_slice goaway_text;
- /** what is the status code of the goaway? */
- grpc_status_code goaway_error;
} grpc_chttp2_transport_global;
typedef struct {
@@ -343,14 +332,13 @@ struct grpc_chttp2_transport {
grpc_chttp2_stream **accepting_stream;
struct {
- /** is a thread currently performing channel callbacks */
- gpr_uint8 executing;
- /** transport channel-level callback */
- const grpc_transport_callbacks *cb;
- /** user data for cb calls */
- void *cb_user_data;
- /** closure for notifying transport closure */
- grpc_iomgr_closure notify_closed;
+ /* accept stream callback */
+ void (*accept_stream)(void *user_data, grpc_transport *transport,
+ const void *server_data);
+ void *accept_stream_user_data;
+
+ /** connectivity tracking */
+ grpc_connectivity_state_tracker state_tracker;
} channel_callback;
};
@@ -539,6 +527,13 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
+void grpc_chttp2_list_add_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global);
+int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global);
+
void grpc_chttp2_list_add_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 8f682e9017..4664a0895c 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -109,9 +109,6 @@ void grpc_chttp2_publish_reads(
transport_parsing->incoming_stream_id;
}
- /* TODO(ctiller): re-implement */
- GPR_ASSERT(transport_parsing->initial_window_update == 0);
-
/* copy parsing qbuf to global qbuf */
gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index c6ba12fca8..85691b32d2 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -222,7 +222,9 @@ int grpc_chttp2_list_pop_writable_window_update_stream(
void grpc_chttp2_list_remove_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
- stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
+ stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
}
void grpc_chttp2_list_add_parsing_seen_stream(
@@ -282,6 +284,24 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
return r;
}
+void grpc_chttp2_list_add_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+}
+
+int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global) {
+ grpc_chttp2_stream *stream;
+ int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
+ GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+ *stream_global = &stream->global;
+ return r;
+}
+
void grpc_chttp2_list_add_incoming_window_updated(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index fdcc300099..a78654334e 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -97,12 +97,8 @@ int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
- /* we should either exhaust window or have no ops left, but not both */
- if (stream_global->outgoing_sopb->nops == 0) {
- stream_global->outgoing_sopb = NULL;
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->send_done_closure, 1);
- } else if (stream_global->outgoing_window > 0) {
+ if (stream_global->outgoing_window > 0 &&
+ stream_global->outgoing_sopb->nops != 0) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
@@ -201,6 +197,11 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
+ if (stream_global->outgoing_sopb->nops == 0) {
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global,
+ stream_global->send_done_closure, 1);
+ }
if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 80eeae31a6..70df146239 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -37,18 +37,19 @@
#include <stdio.h>
#include <string.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/http2_errors.h"
+#include "src/core/transport/chttp2/internal.h"
#include "src/core/transport/chttp2/status_conversion.h"
#include "src/core/transport/chttp2/timeout_encoding.h"
-#include "src/core/transport/chttp2/internal.h"
#include "src/core/transport/transport_impl.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/useful.h>
#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
@@ -79,13 +80,11 @@ static const grpc_transport_vtable vtable;
static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t);
-static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored);
static void reading_action(void *t, int iomgr_success_ignored);
-static void notify_closed(void *t, int iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
@@ -99,9 +98,9 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
static void drop_connection(grpc_chttp2_transport *t);
/** Perform a transport_op */
-static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global,
- grpc_transport_op *op);
+static void perform_stream_op_locked(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
@@ -116,6 +115,10 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t,
static void maybe_start_some_streams(
grpc_chttp2_transport_global *transport_global);
+static void connectivity_state_set(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_connectivity_state state);
+
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -148,6 +151,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
+ grpc_connectivity_state_destroy(&t->channel_callback.state_tracker);
gpr_mu_unlock(&t->mu);
gpr_mu_destroy(&t->mu);
@@ -196,13 +200,11 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
#endif
static void init_transport(grpc_chttp2_transport *t,
- grpc_transport_setup_callback setup, void *arg,
const grpc_channel_args *channel_args,
- grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
- grpc_mdctx *mdctx, int is_client) {
+ grpc_endpoint *ep, grpc_mdctx *mdctx,
+ int is_client) {
size_t i;
int j;
- grpc_transport_setup_result sr;
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
@@ -217,7 +219,6 @@ static void init_transport(grpc_chttp2_transport *t,
grpc_mdctx_ref(mdctx);
t->metadata_context = mdctx;
t->endpoint_reading = 1;
- t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NONE;
t->global.next_stream_id = is_client ? 1 : 2;
t->global.is_client = is_client;
t->global.outgoing_window = DEFAULT_WINDOW;
@@ -231,6 +232,8 @@ static void init_transport(grpc_chttp2_transport *t,
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
+ grpc_connectivity_state_init(&t->channel_callback.state_tracker,
+ GRPC_CHANNEL_READY);
gpr_slice_buffer_init(&t->global.qbuf);
@@ -243,7 +246,6 @@ static void init_transport(grpc_chttp2_transport *t,
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
- grpc_iomgr_closure_init(&t->channel_callback.notify_closed, notify_closed, t);
if (is_client) {
gpr_slice_buffer_add(
&t->global.qbuf,
@@ -309,24 +311,6 @@ static void init_transport(grpc_chttp2_transport *t,
}
}
}
-
- gpr_mu_lock(&t->mu);
- t->channel_callback.executing = 1;
- REF_TRANSPORT(t, "init"); /* matches unref at end of this function */
- gpr_mu_unlock(&t->mu);
-
- sr = setup(arg, &t->base, t->metadata_context);
-
- lock(t);
- t->channel_callback.cb = sr.callbacks;
- t->channel_callback.cb_user_data = sr.user_data;
- t->channel_callback.executing = 0;
- unlock(t);
-
- REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
- recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
-
- UNREF_TRANSPORT(t, "init");
}
static void destroy_transport(grpc_transport *gt) {
@@ -343,31 +327,16 @@ static void destroy_transport(grpc_transport *gt) {
static void close_transport_locked(grpc_chttp2_transport *t) {
if (!t->closed) {
t->closed = 1;
+ connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE);
if (t->ep) {
grpc_endpoint_shutdown(t->ep);
}
}
}
-static void close_transport(grpc_transport *gt) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- gpr_mu_lock(&t->mu);
- close_transport_locked(t);
- gpr_mu_unlock(&t->mu);
-}
-
-static void goaway(grpc_transport *gt, grpc_status_code status,
- gpr_slice debug_data) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- lock(t);
- grpc_chttp2_goaway_append(t->global.last_incoming_stream_id,
- grpc_chttp2_grpc_status_to_http2_error(status),
- debug_data, &t->global.qbuf);
- unlock(t);
-}
-
static int init_stream(grpc_transport *gt, grpc_stream *gs,
- const void *server_data, grpc_transport_op *initial_op) {
+ const void *server_data,
+ grpc_transport_stream_op *initial_op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@@ -397,7 +366,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.in_stream_map = 1;
}
- if (initial_op) perform_op_locked(&t->global, &s->global, initial_op);
+ if (initial_op) perform_stream_op_locked(&t->global, &s->global, initial_op);
unlock(t);
return 0;
@@ -455,8 +424,8 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
GPR_ASSERT(t->accepting_stream == NULL);
t->accepting_stream = &accepting;
- t->channel_callback.cb->accept_stream(t->channel_callback.cb_user_data,
- &t->base, (void *)(gpr_uintptr)id);
+ t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
+ &t->base, (void *)(gpr_uintptr)id);
t->accepting_stream = NULL;
return &accepting->parsing;
}
@@ -477,17 +446,16 @@ static void unlock(grpc_chttp2_transport *t) {
grpc_iomgr_closure *run_closures;
unlock_check_read_write_state(t);
- if (!t->writing_active && t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE &&
+ if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
}
- /* unlock_check_parser(t); */
- unlock_check_channel_callbacks(t);
- run_closures = t->global.pending_closures;
- t->global.pending_closures = NULL;
+ run_closures = t->global.pending_closures_head;
+ t->global.pending_closures_head = NULL;
+ t->global.pending_closures_tail = NULL;
gpr_mu_unlock(&t->mu);
@@ -556,13 +524,9 @@ void grpc_chttp2_add_incoming_goaway(
char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
gpr_free(msg);
- if (transport_global->goaway_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
- transport_global->goaway_state = GRPC_CHTTP2_ERROR_STATE_SEEN;
- transport_global->goaway_text = goaway_text;
- transport_global->goaway_error = goaway_error;
- } else {
- gpr_slice_unref(goaway_text);
- }
+ gpr_slice_unref(goaway_text);
+ transport_global->seen_goaway = 1;
+ connectivity_state_set(transport_global, GRPC_CHANNEL_FATAL_FAILURE);
}
static void maybe_start_some_streams(
@@ -587,9 +551,7 @@ static void maybe_start_some_streams(
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
- grpc_chttp2_add_incoming_goaway(
- transport_global, GRPC_CHTTP2_NO_ERROR,
- gpr_slice_from_copied_string("Exceeded sequence number limit"));
+ connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE);
}
stream_global->outgoing_window =
@@ -615,9 +577,9 @@ static void maybe_start_some_streams(
}
}
-static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global,
- grpc_transport_op *op) {
+static void perform_stream_op_locked(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(transport_global, stream_global, op->cancel_with_status);
}
@@ -674,21 +636,19 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
}
}
-static void perform_op(grpc_transport *gt, grpc_stream *gs,
- grpc_transport_op *op) {
+static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
+ grpc_transport_stream_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
lock(t);
- perform_op_locked(&t->global, &s->global, op);
+ perform_stream_op_locked(&t->global, &s->global, op);
unlock(t);
}
-static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+static void send_ping_locked(grpc_chttp2_transport *t,
+ grpc_iomgr_closure *on_recv) {
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
-
- lock(t);
p->next = &t->global.pings;
p->prev = p->next->prev;
p->prev->next = p->next->prev = p;
@@ -702,6 +662,48 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
p->id[7] = t->global.ping_counter & 0xff;
p->on_recv = on_recv;
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
+}
+
+static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+
+ lock(t);
+
+ if (op->on_consumed) {
+ grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1);
+ }
+
+ if (op->on_connectivity_state_change) {
+ grpc_connectivity_state_notify_on_state_change(
+ &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ }
+
+ if (op->send_goaway) {
+ grpc_chttp2_goaway_append(
+ t->global.last_incoming_stream_id,
+ grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
+ gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
+ }
+
+ if (op->set_accept_stream != NULL) {
+ t->channel_callback.accept_stream = op->set_accept_stream;
+ t->channel_callback.accept_stream_user_data =
+ op->set_accept_stream_user_data;
+ }
+
+ if (op->bind_pollset) {
+ add_to_pollset_locked(t, op->bind_pollset);
+ }
+
+ if (op->send_ping) {
+ send_ping_locked(t, op->send_ping);
+ }
+
+ if (op->disconnect) {
+ close_transport_locked(t);
+ }
+
unlock(t);
}
@@ -731,9 +733,8 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) {
grpc_chttp2_parsing_become_skip_parser(&t->parsing);
}
- new_stream_count =
- grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
- grpc_chttp2_stream_map_size(&t->new_stream_map);
+ new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
+ grpc_chttp2_stream_map_size(&t->new_stream_map);
if (new_stream_count != t->global.concurrent_stream_count) {
t->global.concurrent_stream_count = new_stream_count;
maybe_start_some_streams(&t->global);
@@ -760,20 +761,35 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
}
}
+ if (!t->writing_active) {
+ while (grpc_chttp2_list_pop_cancelled_waiting_for_writing(transport_global,
+ &stream_global)) {
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
+ }
+ }
+
while (grpc_chttp2_list_pop_read_write_state_changed(transport_global,
&stream_global)) {
if (stream_global->cancelled) {
- stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
- stream_global->read_closed = 1;
- if (!stream_global->published_cancelled) {
- char buffer[GPR_LTOA_MIN_BUFSIZE];
- gpr_ltoa(stream_global->cancelled_status, buffer);
- grpc_chttp2_incoming_metadata_buffer_add(&stream_global->incoming_metadata,
- grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
- &stream_global->incoming_metadata,
- &stream_global->incoming_sopb);
- stream_global->published_cancelled = 1;
+ if (t->writing_active &&
+ stream_global->write_state != GRPC_WRITE_STATE_SENT_CLOSE) {
+ grpc_chttp2_list_add_cancelled_waiting_for_writing(transport_global,
+ stream_global);
+ } else {
+ stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
+ stream_global->read_closed = 1;
+ if (!stream_global->published_cancelled) {
+ char buffer[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(stream_global->cancelled_status, buffer);
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &stream_global->incoming_metadata,
+ grpc_mdelem_from_strings(t->metadata_context, "grpc-status",
+ buffer));
+ grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
+ &stream_global->incoming_metadata, &stream_global->incoming_sopb);
+ stream_global->published_cancelled = 1;
+ }
}
}
if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
@@ -821,10 +837,10 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
stream_global->cancelled = 1;
stream_global->cancelled_status = status;
if (stream_global->id != 0) {
- gpr_slice_buffer_add(&transport_global->qbuf,
- grpc_chttp2_rst_stream_create(
- stream_global->id,
- grpc_chttp2_grpc_status_to_http2_error(status)));
+ gpr_slice_buffer_add(
+ &transport_global->qbuf,
+ grpc_chttp2_rst_stream_create(
+ stream_global->id, grpc_chttp2_grpc_status_to_http2_error(status)));
}
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
@@ -841,9 +857,6 @@ static void end_all_the_calls(grpc_chttp2_transport *t) {
}
static void drop_connection(grpc_chttp2_transport *t) {
- if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
- t->global.error_state = GRPC_CHTTP2_ERROR_STATE_SEEN;
- }
close_transport_locked(t);
end_all_the_calls(t);
}
@@ -893,7 +906,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
lock(t);
i = 0;
GPR_ASSERT(!t->parsing_active);
- if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_NONE) {
+ if (!t->closed) {
t->parsing_active = 1;
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
@@ -911,10 +924,12 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
- t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+ t->global.concurrent_stream_count =
+ grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (t->parsing.initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t);
+ t->parsing.initial_window_update = 0;
}
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);
@@ -944,76 +959,33 @@ static void reading_action(void *pt, int iomgr_success_ignored) {
* CALLBACK LOOP
*/
-typedef struct {
- grpc_chttp2_transport *t;
- gpr_uint32 error;
- gpr_slice text;
- grpc_iomgr_closure closure;
-} notify_goaways_args;
-
-static void notify_goaways(void *p, int iomgr_success_ignored) {
- notify_goaways_args *a = p;
- grpc_chttp2_transport *t = a->t;
-
- t->channel_callback.cb->goaway(t->channel_callback.cb_user_data, &t->base,
- a->error, a->text);
-
- gpr_free(a);
-
- lock(t);
- t->channel_callback.executing = 0;
- unlock(t);
-
- UNREF_TRANSPORT(t, "notify_goaways");
+static void schedule_closure_for_connectivity(void *a,
+ grpc_iomgr_closure *closure) {
+ grpc_chttp2_schedule_closure(a, closure, 1);
}
-static void notify_closed(void *gt, int iomgr_success_ignored) {
- grpc_chttp2_transport *t = gt;
- t->channel_callback.cb->closed(t->channel_callback.cb_user_data, &t->base);
-
- lock(t);
- t->channel_callback.executing = 0;
- unlock(t);
-
- UNREF_TRANSPORT(t, "notify_closed");
-}
-
-static void unlock_check_channel_callbacks(grpc_chttp2_transport *t) {
- if (t->channel_callback.executing) {
- return;
- }
- if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NONE) {
- if (t->global.goaway_state == GRPC_CHTTP2_ERROR_STATE_SEEN &&
- t->global.error_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
- notify_goaways_args *a = gpr_malloc(sizeof(*a));
- a->t = t;
- a->error = t->global.goaway_error;
- a->text = t->global.goaway_text;
- t->global.goaway_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
- t->channel_callback.executing = 1;
- grpc_iomgr_closure_init(&a->closure, notify_goaways, a);
- REF_TRANSPORT(t, "notify_goaways");
- grpc_chttp2_schedule_closure(&t->global, &a->closure, 1);
- return;
- } else if (t->global.goaway_state != GRPC_CHTTP2_ERROR_STATE_NOTIFIED) {
- return;
- }
- }
- if (t->global.error_state == GRPC_CHTTP2_ERROR_STATE_SEEN) {
- t->global.error_state = GRPC_CHTTP2_ERROR_STATE_NOTIFIED;
- t->channel_callback.executing = 1;
- REF_TRANSPORT(t, "notify_closed");
- grpc_chttp2_schedule_closure(&t->global, &t->channel_callback.notify_closed,
- 1);
- }
+static void connectivity_state_set(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_connectivity_state state) {
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
+ grpc_connectivity_state_set_with_scheduler(
+ &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
+ state, schedule_closure_for_connectivity, transport_global);
}
void grpc_chttp2_schedule_closure(
grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
int success) {
closure->success = success;
- closure->next = transport_global->pending_closures;
- transport_global->pending_closures = closure;
+ if (transport_global->pending_closures_tail == NULL) {
+ transport_global->pending_closures_head =
+ transport_global->pending_closures_tail = closure;
+ } else {
+ transport_global->pending_closures_tail->next = closure;
+ transport_global->pending_closures_tail = closure;
+ }
+ closure->next = NULL;
}
/*
@@ -1027,13 +999,6 @@ static void add_to_pollset_locked(grpc_chttp2_transport *t,
}
}
-static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
- grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- lock(t);
- add_to_pollset_locked(t, pollset);
- unlock(t);
-}
-
/*
* TRACING
*/
@@ -1069,23 +1034,21 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
* INTEGRATION GLUE
*/
-static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
- init_stream,
- perform_op,
- add_to_pollset,
- destroy_stream,
- goaway,
- close_transport,
- send_ping,
- destroy_transport};
-
-void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
- void *arg,
- const grpc_channel_args *channel_args,
- grpc_endpoint *ep, gpr_slice *slices,
- size_t nslices, grpc_mdctx *mdctx,
- int is_client) {
+static const grpc_transport_vtable vtable = {
+ sizeof(grpc_chttp2_stream), init_stream, perform_stream_op,
+ perform_transport_op, destroy_stream, destroy_transport};
+
+grpc_transport *grpc_create_chttp2_transport(
+ const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx,
+ int is_client) {
grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
- init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx,
- is_client);
+ init_transport(t, channel_args, ep, mdctx, is_client);
+ return &t->base;
+}
+
+void grpc_chttp2_transport_start_reading(grpc_transport *transport,
+ gpr_slice *slices, size_t nslices) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
+ REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
+ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
}
diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h
index 18e19f03af..fa0d6e4151 100644
--- a/src/core/transport/chttp2_transport.h
+++ b/src/core/transport/chttp2_transport.h
@@ -40,11 +40,11 @@
extern int grpc_http_trace;
extern int grpc_flowctl_trace;
-void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
- void *arg,
- const grpc_channel_args *channel_args,
- grpc_endpoint *ep, gpr_slice *slices,
- size_t nslices, grpc_mdctx *metadata_context,
- int is_client);
+grpc_transport *grpc_create_chttp2_transport(
+ const grpc_channel_args *channel_args, grpc_endpoint *ep,
+ grpc_mdctx *metadata_context, int is_client);
+
+void grpc_chttp2_transport_start_reading(grpc_transport *transport,
+ gpr_slice *slices, size_t nslices);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
new file mode 100644
index 0000000000..1091ceae44
--- /dev/null
+++ b/src/core/transport/connectivity_state.c
@@ -0,0 +1,112 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/transport/connectivity_state.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state init_state) {
+ tracker->current_state = init_state;
+ tracker->watchers = NULL;
+}
+
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
+ grpc_connectivity_state_watcher *w;
+ while ((w = tracker->watchers)) {
+ tracker->watchers = w->next;
+
+ if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) {
+ *w->current = GRPC_CHANNEL_FATAL_FAILURE;
+ grpc_iomgr_add_callback(w->notify);
+ } else {
+ grpc_iomgr_add_delayed_callback(w->notify, 0);
+ }
+ gpr_free(w);
+ }
+}
+
+grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker *tracker) {
+ return tracker->current_state;
+}
+
+int grpc_connectivity_state_notify_on_state_change(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify) {
+ if (tracker->current_state != *current) {
+ *current = tracker->current_state;
+ grpc_iomgr_add_callback(notify);
+ } else {
+ grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
+ w->current = current;
+ w->notify = notify;
+ w->next = tracker->watchers;
+ tracker->watchers = w;
+ }
+ return tracker->current_state == GRPC_CHANNEL_IDLE;
+}
+
+void grpc_connectivity_state_set_with_scheduler(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
+ void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg) {
+ grpc_connectivity_state_watcher *new = NULL;
+ grpc_connectivity_state_watcher *w;
+ if (tracker->current_state == state) {
+ return;
+ }
+ tracker->current_state = state;
+ while ((w = tracker->watchers)) {
+ tracker->watchers = w->next;
+
+ if (state != *w->current) {
+ *w->current = state;
+ scheduler(arg, w->notify);
+ gpr_free(w);
+ } else {
+ w->next = new;
+ new = w;
+ }
+ }
+ tracker->watchers = new;
+}
+
+static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) {
+ grpc_iomgr_add_callback(closure);
+}
+
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state) {
+ grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler,
+ NULL);
+}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
new file mode 100644
index 0000000000..bbdcbcb069
--- /dev/null
+++ b/src/core/transport/connectivity_state.h
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H
+#define GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H
+
+#include <grpc/grpc.h>
+#include "src/core/iomgr/iomgr.h"
+
+typedef struct grpc_connectivity_state_watcher {
+ /** we keep watchers in a linked list */
+ struct grpc_connectivity_state_watcher *next;
+ /** closure to notify on change */
+ grpc_iomgr_closure *notify;
+ /** the current state as believed by the watcher */
+ grpc_connectivity_state *current;
+} grpc_connectivity_state_watcher;
+
+typedef struct {
+ /** current connectivity state */
+ grpc_connectivity_state current_state;
+ /** all our watchers */
+ grpc_connectivity_state_watcher *watchers;
+} grpc_connectivity_state_tracker;
+
+void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state init_state);
+void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
+
+void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
+ grpc_connectivity_state state);
+void grpc_connectivity_state_set_with_scheduler(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state,
+ void (*scheduler)(void *arg, grpc_iomgr_closure *closure), void *arg);
+
+grpc_connectivity_state grpc_connectivity_state_check(
+ grpc_connectivity_state_tracker *tracker);
+
+/** Return 1 if the channel should start connecting, 0 otherwise */
+int grpc_connectivity_state_notify_on_state_change(
+ grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify);
+
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 842fc932b9..964d39d14f 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -41,7 +41,7 @@
#include "src/core/transport/metadata.h"
/* this many stream ops are inlined into a sopb before allocating */
-#define GRPC_SOPB_INLINE_ELEMENTS 16
+#define GRPC_SOPB_INLINE_ELEMENTS 4
/* Operations that can be performed on a stream.
Used by grpc_stream_op. */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 39d7b701f2..fe565944ed 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -38,34 +38,26 @@ size_t grpc_transport_stream_size(grpc_transport *transport) {
return transport->vtable->sizeof_stream;
}
-void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
- gpr_slice message) {
- transport->vtable->goaway(transport, status, message);
-}
-
-void grpc_transport_close(grpc_transport *transport) {
- transport->vtable->close(transport);
-}
-
void grpc_transport_destroy(grpc_transport *transport) {
transport->vtable->destroy(transport);
}
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
const void *server_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
return transport->vtable->init_stream(transport, stream, server_data,
initial_op);
}
-void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
- grpc_transport_op *op) {
- transport->vtable->perform_op(transport, stream, op);
+void grpc_transport_perform_stream_op(grpc_transport *transport,
+ grpc_stream *stream,
+ grpc_transport_stream_op *op) {
+ transport->vtable->perform_stream_op(transport, stream, op);
}
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset) {
- transport->vtable->add_to_pollset(transport, pollset);
+void grpc_transport_perform_op(grpc_transport *transport,
+ grpc_transport_op *op) {
+ transport->vtable->perform_op(transport, op);
}
void grpc_transport_destroy_stream(grpc_transport *transport,
@@ -73,29 +65,8 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream);
}
-void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb) {
- transport->vtable->ping(transport, cb);
-}
-
-void grpc_transport_setup_cancel(grpc_transport_setup *setup) {
- setup->vtable->cancel(setup);
-}
-
-void grpc_transport_setup_initiate(grpc_transport_setup *setup) {
- setup->vtable->initiate(setup);
-}
-
-void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup,
- grpc_pollset *pollset) {
- setup->vtable->add_interested_party(setup, pollset);
-}
-
-void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup,
- grpc_pollset *pollset) {
- setup->vtable->del_interested_party(setup, pollset);
-}
-
-void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
+void grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op *op) {
if (op->send_ops) {
op->on_done_send->cb(op->on_done_send->cb_arg, 0);
}
@@ -107,9 +78,9 @@ void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
}
}
-void grpc_transport_op_add_cancellation(grpc_transport_op *op,
- grpc_status_code status,
- grpc_mdstr *message) {
+void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
+ grpc_status_code status,
+ grpc_mdstr *message) {
if (op->cancel_with_status == GRPC_STATUS_OK) {
op->cancel_with_status = status;
}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index a2c41c47af..579bcc943f 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -43,7 +43,6 @@
/* forward declarations */
typedef struct grpc_transport grpc_transport;
-typedef struct grpc_transport_callbacks grpc_transport_callbacks;
/* grpc_stream doesn't actually exist. It's used as a typesafe
opaque pointer for whatever data the transport wants to track
@@ -62,8 +61,9 @@ typedef enum grpc_stream_state {
GRPC_STREAM_CLOSED
} grpc_stream_state;
-/* Transport op: a set of operations to perform on a transport */
-typedef struct grpc_transport_op {
+/* Transport stream op: a set of operations to perform on a transport
+ against a single stream */
+typedef struct grpc_transport_stream_op {
grpc_iomgr_closure *on_consumed;
grpc_stream_op_buffer *send_ops;
@@ -80,32 +80,32 @@ typedef struct grpc_transport_op {
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
-} grpc_transport_op;
+} grpc_transport_stream_op;
-/* Callbacks made from the transport to the upper layers of grpc. */
-struct grpc_transport_callbacks {
- /* Initialize a new stream on behalf of the transport.
- Must result in a call to
- grpc_transport_init_stream(transport, ..., request) in the same call
- stack.
- Must not result in any other calls to the transport.
-
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- request - request parameters for this stream (owned by the caller)
- server_data - opaque transport dependent argument that should be passed
- to grpc_transport_init_stream
- */
- void (*accept_stream)(void *user_data, grpc_transport *transport,
- const void *server_data);
-
- void (*goaway)(void *user_data, grpc_transport *transport,
- grpc_status_code status, gpr_slice debug);
-
- /* The transport has been closed */
- void (*closed)(void *user_data, grpc_transport *transport);
-};
+/** Transport op: a set of operations to perform on a transport as a whole */
+typedef struct grpc_transport_op {
+ /** called when processing of this op is done */
+ grpc_iomgr_closure *on_consumed;
+ /** connectivity monitoring */
+ grpc_iomgr_closure *on_connectivity_state_change;
+ grpc_connectivity_state *connectivity_state;
+ /** should the transport be disconnected */
+ int disconnect;
+ /** should we send a goaway? */
+ int send_goaway;
+ /** what should the goaway contain? */
+ grpc_status_code goaway_status;
+ gpr_slice *goaway_message;
+ /** set the callback for accepting new streams;
+ this is a permanent callback, unlike the other one-shot closures */
+ void (*set_accept_stream)(void *user_data, grpc_transport *transport,
+ const void *server_data);
+ void *set_accept_stream_user_data;
+ /** add this transport to a pollset */
+ grpc_pollset *bind_pollset;
+ /** send a ping, call this back if not NULL */
+ grpc_iomgr_closure *send_ping;
+} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this
transport */
@@ -122,7 +122,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
supplied from the accept_stream callback function */
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
const void *server_data,
- grpc_transport_op *initial_op);
+ grpc_transport_stream_op *initial_op);
/* Destroy transport data for a stream.
@@ -137,17 +137,13 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream);
-void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
-
-void grpc_transport_op_add_cancellation(grpc_transport_op *op,
- grpc_status_code status,
- grpc_mdstr *message);
+void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op);
-/* TODO(ctiller): remove this */
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset);
+void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
+ grpc_status_code status,
+ grpc_mdstr *message);
-char *grpc_transport_op_string(grpc_transport_op *op);
+char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);
/* Send a batch of operations on a transport
@@ -157,8 +153,12 @@ char *grpc_transport_op_string(grpc_transport_op *op);
transport - the transport on which to initiate the stream
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
- op - a grpc_transport_op specifying the op to perform */
-void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ op - a grpc_transport_stream_op specifying the op to perform */
+void grpc_transport_perform_stream_op(grpc_transport *transport,
+ grpc_stream *stream,
+ grpc_transport_stream_op *op);
+
+void grpc_transport_perform_op(grpc_transport *transport,
grpc_transport_op *op);
/* Send a ping on a transport
@@ -176,52 +176,4 @@ void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
void grpc_transport_destroy(grpc_transport *transport);
-/* Return type for grpc_transport_setup_callback */
-typedef struct grpc_transport_setup_result {
- void *user_data;
- const grpc_transport_callbacks *callbacks;
-} grpc_transport_setup_result;
-
-/* Given a transport, return callbacks for that transport. Used to finalize
- setup as a transport is being created */
-typedef grpc_transport_setup_result (*grpc_transport_setup_callback)(
- void *setup_arg, grpc_transport *transport, grpc_mdctx *mdctx);
-
-typedef struct grpc_transport_setup grpc_transport_setup;
-typedef struct grpc_transport_setup_vtable grpc_transport_setup_vtable;
-
-struct grpc_transport_setup_vtable {
- void (*initiate)(grpc_transport_setup *setup);
- void (*add_interested_party)(grpc_transport_setup *setup,
- grpc_pollset *pollset);
- void (*del_interested_party)(grpc_transport_setup *setup,
- grpc_pollset *pollset);
- void (*cancel)(grpc_transport_setup *setup);
-};
-
-/* Transport setup is an asynchronous utility interface for client channels to
- establish connections. It's transport agnostic. */
-struct grpc_transport_setup {
- const grpc_transport_setup_vtable *vtable;
-};
-
-/* Initiate transport setup: e.g. for TCP+DNS trigger a resolve of the name
- given at transport construction time, create the tcp connection, perform
- handshakes, and call some grpc_transport_setup_result function provided at
- setup construction time.
- This *may* be implemented as a no-op if the setup process monitors something
- continuously. */
-void grpc_transport_setup_initiate(grpc_transport_setup *setup);
-
-void grpc_transport_setup_add_interested_party(grpc_transport_setup *setup,
- grpc_pollset *pollset);
-void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup,
- grpc_pollset *pollset);
-
-/* Cancel transport setup. After this returns, no new transports should be
- created, and all pending transport setup callbacks should be completed.
- After this call completes, setup should be considered invalid (this can be
- used as a destruction call by setup). */
-void grpc_transport_setup_cancel(grpc_transport_setup *setup);
-
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index c51951b7a7..515721dfb6 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -43,28 +43,19 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_transport *self, grpc_stream *stream,
- const void *server_data, grpc_transport_op *initial_op);
+ const void *server_data,
+ grpc_transport_stream_op *initial_op);
- /* implementation of grpc_transport_send_batch */
- void (*perform_op)(grpc_transport *self, grpc_stream *stream,
- grpc_transport_op *op);
+ /* implementation of grpc_transport_perform_stream_op */
+ void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream,
+ grpc_transport_stream_op *op);
- /* implementation of grpc_transport_add_to_pollset */
- void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
+ /* implementation of grpc_transport_perform_op */
+ void (*perform_op)(grpc_transport *self, grpc_transport_op *op);
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
- /* implementation of grpc_transport_goaway */
- void (*goaway)(grpc_transport *self, grpc_status_code status,
- gpr_slice debug_data);
-
- /* implementation of grpc_transport_close */
- void (*close)(grpc_transport *self);
-
- /* implementation of grpc_transport_ping */
- void (*ping)(grpc_transport *self, grpc_iomgr_closure *cb);
-
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_transport *self);
} grpc_transport_vtable;
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index ac04540a2e..0da396a320 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -105,7 +105,7 @@ char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
return out;
}
-char *grpc_transport_op_string(grpc_transport_op *op) {
+char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
char *tmp;
char *out;
int first = 1;
@@ -144,6 +144,13 @@ char *grpc_transport_op_string(grpc_transport_op *op) {
gpr_strvec_add(&b, tmp);
}
+ if (op->on_consumed != NULL) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_asprintf(&tmp, "ON_CONSUMED:%p", op->on_consumed);
+ gpr_strvec_add(&b, tmp);
+ }
+
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);
@@ -151,8 +158,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) {
}
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_transport_op *op) {
- char *str = grpc_transport_op_string(op);
+ grpc_call_element *elem, grpc_transport_stream_op *op) {
+ char *str = grpc_transport_stream_op_string(op);
gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
gpr_free(str);
}