aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc/grpc.h14
-rw-r--r--src/core/channel/channel_stack.c8
-rw-r--r--src/core/channel/channel_stack.h46
-rw-r--r--src/core/channel/client_channel.c147
-rw-r--r--src/core/client_config/subchannel.h16
-rw-r--r--src/core/transport/chttp2_transport.c35
-rw-r--r--src/core/transport/transport.c9
-rw-r--r--src/core/transport/transport.h58
-rw-r--r--src/core/transport/transport_impl.h19
9 files changed, 154 insertions, 198 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 8b4676562b..bb6753a9bb 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -118,6 +118,20 @@ typedef struct {
#define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \
"grpc.http2.initial_sequence_number"
+/** Connectivity state of a channel. */
+typedef enum {
+ /** channel is connecting */
+ GRPC_CHANNEL_CONNECTING,
+ /** channel is ready for work */
+ GRPC_CHANNEL_READY,
+ /** channel has seen a failure but expects to recover */
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ /** channel is idle */
+ GRPC_CHANNEL_IDLE,
+ /** channel has seen a failure that it cannot recover from */
+ GRPC_CHANNEL_FATAL_FAILURE
+} grpc_connectivity_state;
+
/* Result of a grpc call. If the caller satisfies the prerequisites of a
particular operation, the grpc_call_error returned will be GRPC_CALL_OK.
Receiving any other value listed here is an indication of a bug in the
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index b575367b52..ff1077ce4c 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -186,12 +186,12 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) {
grpc_call_element *next_elem = elem + 1;
- next_elem->filter->start_transport_op(next_elem, op);
+ next_elem->filter->start_transport_stream_op(next_elem, op);
}
-void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
- grpc_channel_element *next_elem = elem + op->dir;
- next_elem->filter->channel_op(next_elem, elem, op);
+void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) {
+ grpc_channel_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_op(next_elem, op);
}
grpc_channel_stack *grpc_channel_stack_from_top_element(
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 873a8ac4d3..5ac2372f3a 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,45 +51,6 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
-/* The direction of the call.
- The values of the enums (1, -1) matter here - they are used to increment
- or decrement a pointer to find the next element to call */
-typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
-
-typedef enum {
- /* send a goaway message to remote channels indicating that we are going
- to disconnect in the future */
- GRPC_CHANNEL_GOAWAY,
- /* disconnect any underlying transports */
- GRPC_CHANNEL_DISCONNECT,
- /* transport received a new call */
- GRPC_ACCEPT_CALL,
- /* an underlying transport was closed */
- GRPC_TRANSPORT_CLOSED,
- /* an underlying transport is about to be closed */
- GRPC_TRANSPORT_GOAWAY
-} grpc_channel_op_type;
-
-/* A single filterable operation to be performed on a channel */
-typedef struct {
- /* The type of operation we're performing */
- grpc_channel_op_type type;
- /* The directionality of this call - is it bubbling up the stack, or down? */
- grpc_call_dir dir;
-
- /* Argument data, matching up with grpc_channel_op_type names */
- union {
- struct {
- grpc_transport *transport;
- const void *transport_server_data;
- } accept_call;
- struct {
- grpc_status_code status;
- gpr_slice message;
- } goaway;
- } data;
-} grpc_channel_op;
-
/* Channel filters specify:
1. the amount of memory needed in the channel & call (via the sizeof_XXX
members)
@@ -103,13 +64,12 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*start_transport_op)(grpc_call_element *elem,
+ void (*start_transport_stream_op)(grpc_call_element *elem,
grpc_transport_stream_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
- void (*channel_op)(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op);
+ void (*start_transport_op)(grpc_channel_element *elem, grpc_transport_op *op);
/* sizeof(per call data) */
size_t sizeof_call_data;
@@ -211,7 +171,7 @@ void grpc_call_stack_destroy(grpc_call_stack *stack);
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
-void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
+void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op);
/* Given the top element of a channel stack, get the channel stack itself */
grpc_channel_stack *grpc_channel_stack_from_top_element(
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 4ae951074d..9630f6898d 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -54,26 +54,27 @@ typedef struct {
grpc_mdctx *mdctx;
/** resolver for this channel */
grpc_resolver *resolver;
- /** channel arguments for this channel
- TODO(ctiller): still needed? */
- grpc_channel_args *args;
- /** mutex protecting waiting list */
- gpr_mu mu_waiting;
/** mutex protecting client configuration, resolution state */
gpr_mu mu_config;
-
/** currently active load balancer - guarded by mu_config */
grpc_lb_policy *lb_policy;
-
/** incoming configuration - set by resolver.next
guarded by mu_config */
grpc_client_config *incoming_configuration;
+ /** a list of closures that are all waiting for config to come in */
+ grpc_iomgr_closure *waiting_for_config_closures;
+ /** resolver callback */
+ grpc_iomgr_closure on_config_changed;
+ /** connectivity state being tracked */
+ grpc_iomgr_closure *on_connectivity_state_change;
+ grpc_connectivity_state *connectivity_state;
} channel_data;
typedef enum {
CALL_CREATED,
- CALL_WAITING,
+ CALL_WAITING_FOR_CONFIG,
+ CALL_WAITING_FOR_PICK,
CALL_ACTIVE,
CALL_CANCELLED
} call_state;
@@ -193,13 +194,12 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
abort();
}
-static void cc_start_transport_op(grpc_call_element *elem,
+static void cc_start_transport_stream_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
- grpc_transport_stream_op waiting_op;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -207,10 +207,8 @@ static void cc_start_transport_op(grpc_call_element *elem,
switch (calld->state) {
case CALL_ACTIVE:
subchannel_call = calld->s.active.subchannel_call;
- grpc_subchannel_call_ref(subchannel_call);
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_process_op(subchannel_call, op);
- grpc_subchannel_call_unref(subchannel_call);
break;
case CALL_CANCELLED:
gpr_mu_unlock(&calld->mu_state);
@@ -222,7 +220,6 @@ static void cc_start_transport_op(grpc_call_element *elem,
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
} else {
- calld->state = CALL_WAITING;
calld->s.waiting_op = *op;
gpr_mu_lock(&chand->mu_config);
@@ -230,45 +227,44 @@ static void cc_start_transport_op(grpc_call_element *elem,
if (lb_policy) {
grpc_lb_policy_ref(lb_policy);
gpr_mu_unlock(&chand->mu_config);
+ calld->state = CALL_WAITING_FOR_PICK;
gpr_mu_unlock(&calld->mu_state);
pick_target(lb_policy, calld);
grpc_lb_policy_unref(lb_policy);
} else {
+ calld->state = CALL_WAITING_FOR_CONFIG;
add_to_lb_policy_wait_queue_locked_state_config(chand, calld);
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
}
}
break;
- case CALL_WAITING:
+ case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_PICK:
if (op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op = calld->s.waiting_op;
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, &waiting_op);
handle_op_after_cancellation(elem, op);
} else {
GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
(op->send_ops == NULL));
GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
(op->recv_ops == NULL));
- if (op->send_ops) {
+ if (op->send_ops != NULL) {
calld->s.waiting_op.send_ops = op->send_ops;
calld->s.waiting_op.is_last_send = op->is_last_send;
calld->s.waiting_op.on_done_send = op->on_done_send;
- calld->s.waiting_op.send_user_data = op->send_user_data;
}
- if (op->recv_ops) {
+ if (op->recv_ops != NULL) {
calld->s.waiting_op.recv_ops = op->recv_ops;
calld->s.waiting_op.recv_state = op->recv_state;
calld->s.waiting_op.on_done_recv = op->on_done_recv;
- calld->s.waiting_op.recv_user_data = op->recv_user_data;
}
gpr_mu_unlock(&calld->mu_state);
- if (op->on_consumed) {
- op->on_consumed(op->on_consumed_user_data, 0);
+ if (op->on_consumed != NULL) {
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
break;
@@ -372,6 +368,55 @@ static void cc_start_transport_op(grpc_call_element *elem,
#endif
}
+static void update_state_locked(channel_data *chand) {
+
+}
+
+static void cc_on_config_changed(void *arg, int iomgr_success) {
+ channel_data *chand = arg;
+ grpc_lb_policy *lb_policy = NULL;
+ grpc_lb_policy *old_lb_policy;
+ grpc_resolver *old_resolver;
+ grpc_iomgr_closure *wakeup_closures = NULL;
+
+ if (chand->incoming_configuration) {
+ lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
+ grpc_lb_policy_ref(lb_policy);
+ }
+
+ grpc_client_config_unref(chand->incoming_configuration);
+ chand->incoming_configuration = NULL;
+
+ gpr_mu_lock(&chand->mu_config);
+ old_lb_policy = chand->lb_policy;
+ chand->lb_policy = lb_policy;
+ if (lb_policy != NULL) {
+ wakeup_closures = chand->waiting_for_config_closures;
+ chand->waiting_for_config_closures = NULL;
+ }
+ gpr_mu_unlock(&chand->mu_config);
+
+ while (wakeup_closures) {
+ grpc_iomgr_closure *next = wakeup_closures->next;
+ grpc_iomgr_add_callback(wakeup_closures);
+ wakeup_closures = next;
+ }
+
+ grpc_lb_policy_unref(old_lb_policy);
+
+ if (iomgr_success) {
+ grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
+ } else {
+ gpr_mu_lock(&chand->mu_config);
+ old_resolver = chand->resolver;
+ chand->resolver = NULL;
+ update_state_locked(chand);
+ gpr_mu_unlock(&chand->mu_config);
+ grpc_resolver_unref(old_resolver);
+ }
+}
+
+#if 0
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
channel_data *chand = elem->channel_data;
@@ -451,6 +496,9 @@ static void channel_op(grpc_channel_element *elem,
break;
}
}
+#endif
+
+static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
@@ -471,26 +519,28 @@ static void init_call_elem(grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
+ grpc_subchannel_call *subchannel_call;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
- gpr_mu_lock(&chand->mu);
+ gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- gpr_mu_unlock(&chand->mu);
- grpc_child_call_destroy(calld->s.active.child_call);
+ subchannel_call = calld->s.active.subchannel_call;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_unref(subchannel_call);
break;
- case CALL_WAITING:
- remove_waiting_child(chand, calld);
- gpr_mu_unlock(&chand->mu);
+ case CALL_CREATED:
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&calld->mu_state);
break;
- default:
- gpr_mu_unlock(&chand->mu);
+ case CALL_WAITING_FOR_PICK:
+ case CALL_WAITING_FOR_CONFIG:
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
break;
}
- GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@@ -504,41 +554,32 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- gpr_mu_init(&chand->mu);
- chand->active_child = NULL;
- chand->waiting_children = NULL;
- chand->waiting_child_count = 0;
- chand->waiting_child_capacity = 0;
- chand->transport_setup = NULL;
- chand->transport_setup_initiated = 0;
- chand->args = grpc_channel_args_copy(args);
+ gpr_mu_init(&chand->mu_config);
+ chand->resolver = NULL;
chand->mdctx = metadata_context;
+ grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
}
/* Destructor for channel_data */
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- grpc_transport_setup_cancel(chand->transport_setup);
-
- if (chand->active_child) {
- grpc_child_channel_destroy(chand->active_child, 1);
- chand->active_child = NULL;
+ if (chand->resolver) {
+ grpc_resolver_unref(chand->resolver);
}
-
- grpc_channel_args_destroy(chand->args);
-
- gpr_mu_destroy(&chand->mu);
- GPR_ASSERT(chand->waiting_child_count == 0);
- gpr_free(chand->waiting_children);
+ if (chand->lb_policy) {
+ grpc_lb_policy_unref(chand->lb_policy);
+ }
+ gpr_mu_destroy(&chand->mu_config);
}
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_op, channel_op, sizeof(call_data),
+ cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "client-channel",
};
+#if 0
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
grpc_channel_stack *channel_stack, grpc_transport *transport,
grpc_channel_filter const **channel_filters, size_t num_channel_filters,
@@ -620,6 +661,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
return result;
}
+#endif
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver) {
@@ -628,5 +670,6 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
channel_data *chand = elem->channel_data;
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
+ grpc_resolver_ref(resolver);
grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed);
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 9128aaeda7..0c6c9b3e64 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -44,22 +44,6 @@
typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
-/** Connectivity state of a channel.
- TODO(ctiller): move to grpc.h when we implement the public
- version of the connectivity apis */
-typedef enum {
- /** channel is connecting */
- GRPC_CHANNEL_CONNECTING,
- /** channel is ready for work */
- GRPC_CHANNEL_READY,
- /** channel has seen a failure but expects to recover */
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- /** channel is idle */
- GRPC_CHANNEL_IDLE,
- /** channel has seen a failure that it cannot recover from */
- GRPC_CHANNEL_FATAL_FAILURE
-} grpc_connectivity_state;
-
void grpc_subchannel_ref(grpc_subchannel *channel);
void grpc_subchannel_unref(grpc_subchannel *channel);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 9943dbdbac..685098bcba 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -114,26 +114,9 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset);
-<<<<<<< HEAD
-static void schedule_cb(transport *t, op_closure closure, int success);
-static void maybe_finish_read(transport *t, stream *s);
-static void maybe_join_window_updates(transport *t, stream *s);
-static void finish_reads(transport *t);
-static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
-static void perform_op_locked(transport *t, stream *s,
- grpc_transport_stream_op *op);
-static void add_metadata_batch(transport *t, stream *s);
-
-static void flowctl_trace(transport *t, const char *flow, gpr_int32 window,
- gpr_uint32 id, gpr_int32 delta) {
- gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window,
- delta, window + delta);
-}
-=======
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
grpc_chttp2_transport_global *transport_global);
->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
@@ -385,16 +368,9 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
}
static int init_stream(grpc_transport *gt, grpc_stream *gs,
-<<<<<<< HEAD
- const void *server_data,
- grpc_transport_stream_op *initial_op) {
- transport *t = (transport *)gt;
- stream *s = (stream *)gs;
-=======
const void *server_data, grpc_transport_stream_op *initial_op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b
memset(s, 0, sizeof(*s));
@@ -635,14 +611,9 @@ static void maybe_start_some_streams(
}
}
-<<<<<<< HEAD
-static void perform_op_locked(transport *t, stream *s,
- grpc_transport_stream_op *op) {
-=======
static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_transport_stream_op *op) {
->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(transport_global, stream_global, op->cancel_with_status);
}
@@ -699,15 +670,9 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
}
static void perform_op(grpc_transport *gt, grpc_stream *gs,
-<<<<<<< HEAD
- grpc_transport_stream_op *op) {
- transport *t = (transport *)gt;
- stream *s = (stream *)gs;
-=======
grpc_transport_stream_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
->>>>>>> 48f0a13f3872876787f4d7588b396db914319b1b
lock(t);
perform_op_locked(&t->global, &s->global, op);
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index a73c32da1a..40faa27211 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -58,9 +58,14 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
initial_op);
}
-void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream,
grpc_transport_stream_op *op) {
- transport->vtable->perform_op(transport, stream, op);
+ transport->vtable->perform_stream_op(transport, stream, op);
+}
+
+void grpc_transport_perform_op(grpc_transport *transport,
+ grpc_transport_op *op) {
+ transport->vtable->perform_op(transport, op);
}
void grpc_transport_add_to_pollset(grpc_transport *transport,
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index fbfed46626..7f6a37d048 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -43,7 +43,6 @@
/* forward declarations */
typedef struct grpc_transport grpc_transport;
-typedef struct grpc_transport_callbacks grpc_transport_callbacks;
/* grpc_stream doesn't actually exist. It's used as a typesafe
opaque pointer for whatever data the transport wants to track
@@ -62,7 +61,8 @@ typedef enum grpc_stream_state {
GRPC_STREAM_CLOSED
} grpc_stream_state;
-/* Transport op: a set of operations to perform on a transport */
+/* Transport stream op: a set of operations to perform on a transport
+ against a single stream */
typedef struct grpc_transport_stream_op {
grpc_iomgr_closure *on_consumed;
@@ -83,30 +83,27 @@ typedef struct grpc_transport_stream_op {
grpc_call_context_element *context;
} grpc_transport_stream_op;
-/* Callbacks made from the transport to the upper layers of grpc. */
-struct grpc_transport_callbacks {
- /* Initialize a new stream on behalf of the transport.
- Must result in a call to
- grpc_transport_init_stream(transport, ..., request) in the same call
- stack.
- Must not result in any other calls to the transport.
-
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- request - request parameters for this stream (owned by the caller)
- server_data - opaque transport dependent argument that should be passed
- to grpc_transport_init_stream
- */
- void (*accept_stream)(void *user_data, grpc_transport *transport,
- const void *server_data);
-
- void (*goaway)(void *user_data, grpc_transport *transport,
- grpc_status_code status, gpr_slice debug);
-
- /* The transport has been closed */
- void (*closed)(void *user_data, grpc_transport *transport);
-};
+/** Transport op: a set of operations to perform on a transport as a whole */
+typedef struct grpc_transport_op {
+ /** connectivity monitoring */
+ grpc_iomgr_closure *on_connectivity_state_change;
+ grpc_connectivity_state *connectivity_state;
+ /** should the transport be disconnected */
+ int disconnect;
+ /** should we send a goaway? */
+ int send_goaway;
+ /** what should the goaway contain? */
+ grpc_status_code goaway_status;
+ gpr_slice *goaway_message;
+ /** set the callback for accepting new streams;
+ this is a permanent callback, unlike the other one-shot closures */
+ void (*set_accept_stream)(void *user_data, grpc_transport *transport, const void *server_data);
+ void *set_accept_stream_user_data;
+ /** add this transport to a pollset */
+ grpc_pollset *bind_pollset;
+ /** send a ping, call this back if not NULL */
+ grpc_iomgr_closure *send_ping;
+} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this
transport */
@@ -144,10 +141,6 @@ void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
grpc_status_code status,
grpc_mdstr *message);
-/* TODO(ctiller): remove this */
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset);
-
char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);
/* Send a batch of operations on a transport
@@ -159,9 +152,11 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
op - a grpc_transport_stream_op specifying the op to perform */
-void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream,
grpc_transport_stream_op *op);
+void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op);
+
/* Send a ping on a transport
Calls cb with user data when a response is received. */
@@ -180,7 +175,6 @@ void grpc_transport_destroy(grpc_transport *transport);
/* Return type for grpc_transport_setup_callback */
typedef struct grpc_transport_setup_result {
void *user_data;
- const grpc_transport_callbacks *callbacks;
} grpc_transport_setup_result;
/* Given a transport, return callbacks for that transport. Used to finalize
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index 442b64ca59..b65b1d5607 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -46,26 +46,17 @@ typedef struct grpc_transport_vtable {
const void *server_data,
grpc_transport_stream_op *initial_op);
- /* implementation of grpc_transport_send_batch */
- void (*perform_op)(grpc_transport *self, grpc_stream *stream,
+ /* implementation of grpc_transport_perform_stream_op */
+ void (*perform_stream_op)(grpc_transport *self, grpc_stream *stream,
grpc_transport_stream_op *op);
- /* implementation of grpc_transport_add_to_pollset */
- void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
+ /* implementation of grpc_transport_perform_op */
+ void (*perform_op)(grpc_transport *self, grpc_stream *stream,
+ grpc_transport_stream_op *op);
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
- /* implementation of grpc_transport_goaway */
- void (*goaway)(grpc_transport *self, grpc_status_code status,
- gpr_slice debug_data);
-
- /* implementation of grpc_transport_close */
- void (*close)(grpc_transport *self);
-
- /* implementation of grpc_transport_ping */
- void (*ping)(grpc_transport *self, grpc_iomgr_closure *cb);
-
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_transport *self);
} grpc_transport_vtable;