aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/census_filter.c38
-rw-r--r--src/core/channel/channel_args.c22
-rw-r--r--src/core/channel/channel_args.h13
-rw-r--r--src/core/channel/channel_stack.c22
-rw-r--r--src/core/channel/channel_stack.h60
-rw-r--r--src/core/channel/child_channel.c308
-rw-r--r--src/core/channel/child_channel.h65
-rw-r--r--src/core/channel/client_channel.c801
-rw-r--r--src/core/channel/client_channel.h15
-rw-r--r--src/core/channel/client_setup.c302
-rw-r--r--src/core/channel/client_setup.h77
-rw-r--r--src/core/channel/connected_channel.c107
-rw-r--r--src/core/channel/connected_channel.h6
-rw-r--r--src/core/channel/http_client_filter.c33
-rw-r--r--src/core/channel/http_server_filter.c35
-rw-r--r--src/core/channel/noop_filter.c41
16 files changed, 565 insertions, 1380 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index 7e393a01a6..83b7682848 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -84,7 +84,8 @@ static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
}
}
-static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
+static void client_mutate_op(grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
if (op->send_ops) {
@@ -93,7 +94,7 @@ static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
}
static void client_start_transport_op(grpc_call_element* elem,
- grpc_transport_op* op) {
+ grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
client_mutate_op(elem, op);
@@ -110,7 +111,8 @@ static void server_on_done_recv(void* ptr, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
-static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
+static void server_mutate_op(grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
if (op->recv_ops) {
/* substitute our callback for the op callback */
@@ -123,7 +125,7 @@ static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
}
static void server_start_transport_op(grpc_call_element* elem,
- grpc_transport_op* op) {
+ grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
server_mutate_op(elem, op);
@@ -145,7 +147,7 @@ static void channel_op(grpc_channel_element* elem,
static void client_init_call_elem(grpc_call_element* elem,
const void* server_transport_data,
- grpc_transport_op* initial_op) {
+ grpc_transport_stream_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
@@ -163,7 +165,7 @@ static void client_destroy_call_elem(grpc_call_element* elem) {
static void server_init_call_elem(grpc_call_element* elem,
const void* server_transport_data,
- grpc_transport_op* initial_op) {
+ grpc_transport_stream_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
@@ -200,11 +202,23 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
- client_start_transport_op, channel_op, sizeof(call_data),
- client_init_call_elem, client_destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "census-client"};
+ client_start_transport_op,
+ channel_op,
+ sizeof(call_data),
+ client_init_call_elem,
+ client_destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
- server_start_transport_op, channel_op, sizeof(call_data),
- server_init_call_elem, server_destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "census-server"};
+ server_start_transport_op,
+ channel_op,
+ sizeof(call_data),
+ server_init_call_elem,
+ server_destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "census-server"};
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 166d559a45..140f8bd656 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -62,7 +62,8 @@ static grpc_arg copy_arg(const grpc_arg *src) {
}
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
- const grpc_arg *to_add) {
+ const grpc_arg *to_add,
+ size_t num_to_add) {
grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args));
size_t i;
size_t src_num_args = (src == NULL) ? 0 : src->num_args;
@@ -71,17 +72,24 @@ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
dst->args = NULL;
return dst;
}
- dst->num_args = src_num_args + ((to_add == NULL) ? 0 : 1);
+ dst->num_args = src_num_args + num_to_add;
dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args);
for (i = 0; i < src_num_args; i++) {
dst->args[i] = copy_arg(&src->args[i]);
}
- if (to_add != NULL) dst->args[src_num_args] = copy_arg(to_add);
+ for (i = 0; i < num_to_add; i++) {
+ dst->args[i + src_num_args] = copy_arg(&to_add[i]);
+ }
return dst;
}
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) {
- return grpc_channel_args_copy_and_add(src, NULL);
+ return grpc_channel_args_copy_and_add(src, NULL, 0);
+}
+
+grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+ const grpc_channel_args *b) {
+ return grpc_channel_args_copy_and_add(a, b->args, b->num_args);
}
void grpc_channel_args_destroy(grpc_channel_args *a) {
@@ -131,11 +139,11 @@ grpc_compression_level grpc_channel_args_get_compression_level(
return GRPC_COMPRESS_LEVEL_NONE;
}
-void grpc_channel_args_set_compression_level(
- grpc_channel_args **a, grpc_compression_level level) {
+void grpc_channel_args_set_compression_level(grpc_channel_args **a,
+ grpc_compression_level level) {
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_LEVEL_ARG;
tmp.value.integer = level;
- *a = grpc_channel_args_copy_and_add(*a, &tmp);
+ *a = grpc_channel_args_copy_and_add(*a, &tmp, 1);
}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index bf747b26e6..17849b7e59 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -43,7 +43,12 @@ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
/** Copy some arguments and add the to_add parameter in the end.
If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
- const grpc_arg *to_add);
+ const grpc_arg *to_add,
+ size_t num_to_add);
+
+/** Copy args from a then args from b into a new channel args */
+grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+ const grpc_channel_args *b);
/** Destroy arguments created by grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
@@ -58,7 +63,7 @@ grpc_compression_level grpc_channel_args_get_compression_level(
/** Sets the compression level in \a a to \a level. Setting it to
* GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */
-void grpc_channel_args_set_compression_level(
- grpc_channel_args **a, grpc_compression_level level);
+void grpc_channel_args_set_compression_level(grpc_channel_args **a,
+ grpc_compression_level level);
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 9eec8163f5..e38dcb58b7 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -102,7 +102,8 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
}
void grpc_channel_stack_init(const grpc_channel_filter **filters,
- size_t filter_count, const grpc_channel_args *args,
+ size_t filter_count, grpc_channel *master,
+ const grpc_channel_args *args,
grpc_mdctx *metadata_context,
grpc_channel_stack *stack) {
size_t call_size =
@@ -122,8 +123,9 @@ void grpc_channel_stack_init(const grpc_channel_filter **filters,
for (i = 0; i < filter_count; i++) {
elems[i].filter = filters[i];
elems[i].channel_data = user_data;
- elems[i].filter->init_channel_elem(&elems[i], args, metadata_context,
- i == 0, i == (filter_count - 1));
+ elems[i].filter->init_channel_elem(&elems[i], master, args,
+ metadata_context, i == 0,
+ i == (filter_count - 1));
user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
}
@@ -148,7 +150,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack) {
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
- grpc_transport_op *initial_op,
+ grpc_transport_stream_op *initial_op,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
@@ -184,14 +186,14 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
}
}
-void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) {
grpc_call_element *next_elem = elem + 1;
- next_elem->filter->start_transport_op(next_elem, op);
+ next_elem->filter->start_transport_stream_op(next_elem, op);
}
-void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
- grpc_channel_element *next_elem = elem + op->dir;
- next_elem->filter->channel_op(next_elem, elem, op);
+void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) {
+ grpc_channel_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_op(next_elem, op);
}
grpc_channel_stack *grpc_channel_stack_from_top_element(
@@ -206,7 +208,7 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
}
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
- grpc_transport_op op;
+ grpc_transport_stream_op op;
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
grpc_call_next_op(cur_elem, &op);
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index de0e4e4518..785be8925b 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,45 +51,6 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
-/* The direction of the call.
- The values of the enums (1, -1) matter here - they are used to increment
- or decrement a pointer to find the next element to call */
-typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
-
-typedef enum {
- /* send a goaway message to remote channels indicating that we are going
- to disconnect in the future */
- GRPC_CHANNEL_GOAWAY,
- /* disconnect any underlying transports */
- GRPC_CHANNEL_DISCONNECT,
- /* transport received a new call */
- GRPC_ACCEPT_CALL,
- /* an underlying transport was closed */
- GRPC_TRANSPORT_CLOSED,
- /* an underlying transport is about to be closed */
- GRPC_TRANSPORT_GOAWAY
-} grpc_channel_op_type;
-
-/* A single filterable operation to be performed on a channel */
-typedef struct {
- /* The type of operation we're performing */
- grpc_channel_op_type type;
- /* The directionality of this call - is it bubbling up the stack, or down? */
- grpc_call_dir dir;
-
- /* Argument data, matching up with grpc_channel_op_type names */
- union {
- struct {
- grpc_transport *transport;
- const void *transport_server_data;
- } accept_call;
- struct {
- grpc_status_code status;
- gpr_slice message;
- } goaway;
- } data;
-} grpc_channel_op;
-
/* Channel filters specify:
1. the amount of memory needed in the channel & call (via the sizeof_XXX
members)
@@ -103,12 +64,12 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
+ void (*start_transport_stream_op)(grpc_call_element *elem,
+ grpc_transport_stream_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
- void (*channel_op)(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op);
+ void (*start_transport_op)(grpc_channel_element *elem, grpc_transport_op *op);
/* sizeof(per call data) */
size_t sizeof_call_data;
@@ -122,7 +83,7 @@ typedef struct {
argument.*/
void (*init_call_elem)(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op);
+ grpc_transport_stream_op *initial_op);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_call_element *elem);
@@ -135,7 +96,7 @@ typedef struct {
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
- void (*init_channel_elem)(grpc_channel_element *elem,
+ void (*init_channel_elem)(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last);
@@ -190,7 +151,8 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
size_t filter_count);
/* Initialize a channel stack given some filters */
void grpc_channel_stack_init(const grpc_channel_filter **filters,
- size_t filter_count, const grpc_channel_args *args,
+ size_t filter_count, grpc_channel *master,
+ const grpc_channel_args *args,
grpc_mdctx *metadata_context,
grpc_channel_stack *stack);
/* Destroy a channel stack */
@@ -201,16 +163,16 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack);
server. */
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
- grpc_transport_op *initial_op,
+ grpc_transport_stream_op *initial_op,
grpc_call_stack *call_stack);
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);
/* Call the next operation in a call stack */
-void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
-void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
+void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op);
/* Given the top element of a channel stack, get the channel stack itself */
grpc_channel_stack *grpc_channel_stack_from_top_element(
@@ -219,7 +181,7 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_transport_op *op);
+ grpc_call_element *elem, grpc_transport_stream_op *op);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
deleted file mode 100644
index 6690265d75..0000000000
--- a/src/core/channel/child_channel.c
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/child_channel.h"
-#include "src/core/iomgr/iomgr.h"
-#include <grpc/support/alloc.h>
-
-/* Link back filter: passes up calls to the client channel, pushes down calls
- down */
-
-static void maybe_destroy_channel(grpc_child_channel *channel);
-
-typedef struct {
- gpr_mu mu;
- gpr_cv cv;
- grpc_channel_element *back;
- /* # of active calls on the channel */
- gpr_uint32 active_calls;
- /* has grpc_child_channel_destroy been called? */
- gpr_uint8 destroyed;
- /* has the transport reported itself disconnected? */
- gpr_uint8 disconnected;
- /* are we calling 'back' - our parent channel */
- gpr_uint8 calling_back;
- /* have we or our parent sent goaway yet? - dup suppression */
- gpr_uint8 sent_goaway;
- /* are we currently sending farewell (in this file: goaway + disconnect) */
- gpr_uint8 sending_farewell;
- /* have we sent farewell (goaway + disconnect) */
- gpr_uint8 sent_farewell;
-
- grpc_iomgr_closure finally_destroy_channel_closure;
- grpc_iomgr_closure send_farewells_closure;
-} lb_channel_data;
-
-typedef struct { grpc_child_channel *channel; } lb_call_data;
-
-static void lb_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
- grpc_call_next_op(elem, op);
-}
-
-/* Currently we assume all channel operations should just be pushed up. */
-static void lb_channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem,
- grpc_channel_op *op) {
- lb_channel_data *chand = elem->channel_data;
- grpc_channel_element *back;
- int calling_back = 0;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- gpr_mu_lock(&chand->mu);
- back = chand->back;
- if (back) {
- chand->calling_back++;
- calling_back = 1;
- }
- gpr_mu_unlock(&chand->mu);
- if (back) {
- back->filter->channel_op(chand->back, elem, op);
- } else if (op->type == GRPC_TRANSPORT_GOAWAY) {
- gpr_slice_unref(op->data.goaway.message);
- }
- break;
- case GRPC_CALL_DOWN:
- grpc_channel_next_op(elem, op);
- break;
- }
-
- gpr_mu_lock(&chand->mu);
- switch (op->type) {
- case GRPC_TRANSPORT_CLOSED:
- chand->disconnected = 1;
- maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
- break;
- case GRPC_CHANNEL_GOAWAY:
- chand->sent_goaway = 1;
- break;
- case GRPC_CHANNEL_DISCONNECT:
- case GRPC_TRANSPORT_GOAWAY:
- case GRPC_ACCEPT_CALL:
- break;
- }
-
- if (calling_back) {
- chand->calling_back--;
- gpr_cv_signal(&chand->cv);
- maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
- }
- gpr_mu_unlock(&chand->mu);
-}
-
-/* Constructor for call_data */
-static void lb_init_call_elem(grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_op *initial_op) {}
-
-/* Destructor for call_data */
-static void lb_destroy_call_elem(grpc_call_element *elem) {}
-
-/* Constructor for channel_data */
-static void lb_init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last) {
- lb_channel_data *chand = elem->channel_data;
- GPR_ASSERT(is_first);
- GPR_ASSERT(!is_last);
- gpr_mu_init(&chand->mu);
- gpr_cv_init(&chand->cv);
- chand->back = NULL;
- chand->destroyed = 0;
- chand->disconnected = 0;
- chand->active_calls = 0;
- chand->sent_goaway = 0;
- chand->calling_back = 0;
- chand->sending_farewell = 0;
- chand->sent_farewell = 0;
-}
-
-/* Destructor for channel_data */
-static void lb_destroy_channel_elem(grpc_channel_element *elem) {
- lb_channel_data *chand = elem->channel_data;
- gpr_mu_destroy(&chand->mu);
- gpr_cv_destroy(&chand->cv);
-}
-
-const grpc_channel_filter grpc_child_channel_top_filter = {
- lb_start_transport_op, lb_channel_op,
- sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
- sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
- "child-channel",
-};
-
-/* grpc_child_channel proper */
-
-#define LINK_BACK_ELEM_FROM_CHANNEL(channel) \
- grpc_channel_stack_element((channel), 0)
-
-#define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0)
-
-static void finally_destroy_channel(void *c, int success) {
- /* ignore success or not... this is a destruction callback and will only
- happen once - the only purpose here is to release resources */
- grpc_child_channel *channel = c;
- lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
- /* wait for the initiator to leave the mutex */
- gpr_mu_lock(&chand->mu);
- gpr_mu_unlock(&chand->mu);
- grpc_channel_stack_destroy(channel);
- gpr_free(channel);
-}
-
-static void send_farewells(void *c, int success) {
- grpc_child_channel *channel = c;
- grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
- lb_channel_data *chand = lbelem->channel_data;
- int send_goaway;
- grpc_channel_op op;
-
- gpr_mu_lock(&chand->mu);
- send_goaway = !chand->sent_goaway;
- chand->sent_goaway = 1;
- gpr_mu_unlock(&chand->mu);
-
- if (send_goaway) {
- op.type = GRPC_CHANNEL_GOAWAY;
- op.dir = GRPC_CALL_DOWN;
- op.data.goaway.status = GRPC_STATUS_OK;
- op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect");
- grpc_channel_next_op(lbelem, &op);
- }
-
- op.type = GRPC_CHANNEL_DISCONNECT;
- op.dir = GRPC_CALL_DOWN;
- grpc_channel_next_op(lbelem, &op);
-
- gpr_mu_lock(&chand->mu);
- chand->sending_farewell = 0;
- chand->sent_farewell = 1;
- maybe_destroy_channel(channel);
- gpr_mu_unlock(&chand->mu);
-}
-
-static void maybe_destroy_channel(grpc_child_channel *channel) {
- lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
- if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
- !chand->sending_farewell && !chand->calling_back) {
- chand->finally_destroy_channel_closure.cb = finally_destroy_channel;
- chand->finally_destroy_channel_closure.cb_arg = channel;
- grpc_iomgr_add_callback(&chand->finally_destroy_channel_closure);
- } else if (chand->destroyed && !chand->disconnected &&
- chand->active_calls == 0 && !chand->sending_farewell &&
- !chand->sent_farewell) {
- chand->sending_farewell = 1;
- chand->send_farewells_closure.cb = send_farewells;
- chand->send_farewells_closure.cb_arg = channel;
- grpc_iomgr_add_callback(&chand->send_farewells_closure);
- }
-}
-
-grpc_child_channel *grpc_child_channel_create(
- grpc_channel_element *parent, const grpc_channel_filter **filters,
- size_t filter_count, const grpc_channel_args *args,
- grpc_mdctx *metadata_context) {
- grpc_channel_stack *stk =
- gpr_malloc(grpc_channel_stack_size(filters, filter_count));
- lb_channel_data *lb;
-
- grpc_channel_stack_init(filters, filter_count, args, metadata_context, stk);
-
- lb = LINK_BACK_ELEM_FROM_CHANNEL(stk)->channel_data;
- gpr_mu_lock(&lb->mu);
- lb->back = parent;
- gpr_mu_unlock(&lb->mu);
-
- return stk;
-}
-
-void grpc_child_channel_destroy(grpc_child_channel *channel,
- int wait_for_callbacks) {
- grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
- lb_channel_data *chand = lbelem->channel_data;
-
- gpr_mu_lock(&chand->mu);
- while (wait_for_callbacks && chand->calling_back) {
- gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future);
- }
-
- chand->back = NULL;
- chand->destroyed = 1;
- maybe_destroy_channel(channel);
- gpr_mu_unlock(&chand->mu);
-}
-
-void grpc_child_channel_handle_op(grpc_child_channel *channel,
- grpc_channel_op *op) {
- grpc_channel_next_op(LINK_BACK_ELEM_FROM_CHANNEL(channel), op);
-}
-
-grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
- grpc_call_element *parent,
- grpc_transport_op *initial_op) {
- grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
- grpc_call_element *lbelem;
- lb_call_data *lbcalld;
- lb_channel_data *lbchand;
-
- grpc_call_stack_init(channel, NULL, initial_op, stk);
- lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
- lbchand = lbelem->channel_data;
- lbcalld = lbelem->call_data;
- lbcalld->channel = channel;
-
- gpr_mu_lock(&lbchand->mu);
- lbchand->active_calls++;
- gpr_mu_unlock(&lbchand->mu);
-
- return stk;
-}
-
-void grpc_child_call_destroy(grpc_child_call *call) {
- grpc_call_element *lbelem = LINK_BACK_ELEM_FROM_CALL(call);
- lb_call_data *calld = lbelem->call_data;
- lb_channel_data *chand = lbelem->channel_data;
- grpc_child_channel *channel = calld->channel;
- grpc_call_stack_destroy(call);
- gpr_free(call);
- gpr_mu_lock(&chand->mu);
- chand->active_calls--;
- maybe_destroy_channel(channel);
- gpr_mu_unlock(&chand->mu);
-}
-
-grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call) {
- return LINK_BACK_ELEM_FROM_CALL(call);
-}
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
deleted file mode 100644
index 556a1c731c..0000000000
--- a/src/core/channel/child_channel.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H
-#define GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* helper for filters that need to host child channel stacks... handles
- lifetime and upwards propagation cleanly */
-
-extern const grpc_channel_filter grpc_child_channel_top_filter;
-
-typedef grpc_channel_stack grpc_child_channel;
-typedef grpc_call_stack grpc_child_call;
-
-/* filters[0] must be &grpc_child_channel_top_filter */
-grpc_child_channel *grpc_child_channel_create(
- grpc_channel_element *parent, const grpc_channel_filter **filters,
- size_t filter_count, const grpc_channel_args *args,
- grpc_mdctx *metadata_context);
-void grpc_child_channel_handle_op(grpc_child_channel *channel,
- grpc_channel_op *op);
-grpc_channel_element *grpc_child_channel_get_bottom_element(
- grpc_child_channel *channel);
-void grpc_child_channel_destroy(grpc_child_channel *channel,
- int wait_for_callbacks);
-
-grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
- grpc_call_element *parent,
- grpc_transport_op *initial_op);
-grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
-void grpc_child_call_destroy(grpc_child_call *call);
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 711e105464..871e970eb8 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -34,13 +34,15 @@
#include "src/core/channel/client_channel.h"
#include <stdio.h>
+#include <string.h>
#include "src/core/channel/channel_args.h"
-#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/surface/channel.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
+#include "src/core/transport/connectivity_state.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -51,31 +53,38 @@
typedef struct call_data call_data;
typedef struct {
- /* protects children, child_count, child_capacity, active_child,
- transport_setup_initiated
- does not protect channel stacks held by children
- transport_setup is assumed to be set once during construction */
- gpr_mu mu;
-
- /* the sending child (may be null) */
- grpc_child_channel *active_child;
+ /** metadata context for this channel */
grpc_mdctx *mdctx;
-
- /* calls waiting for a channel to be ready */
- call_data **waiting_children;
- size_t waiting_child_count;
- size_t waiting_child_capacity;
-
- /* transport setup for this channel */
- grpc_transport_setup *transport_setup;
- int transport_setup_initiated;
-
- grpc_channel_args *args;
+ /** resolver for this channel */
+ grpc_resolver *resolver;
+ /** master channel - the grpc_channel instance that ultimately owns
+ this channel_data via its channel stack.
+ We occasionally use this to bump the refcount on the master channel
+ to keep ourselves alive through an asynchronous operation. */
+ grpc_channel *master;
+
+ /** mutex protecting client configuration, including all
+ variables below in this data structure */
+ gpr_mu mu_config;
+ /** currently active load balancer - guarded by mu_config */
+ grpc_lb_policy *lb_policy;
+ /** incoming configuration - set by resolver.next
+ guarded by mu_config */
+ grpc_client_config *incoming_configuration;
+ /** a list of closures that are all waiting for config to come in */
+ grpc_iomgr_closure *waiting_for_config_closures;
+ /** resolver callback */
+ grpc_iomgr_closure on_config_changed;
+ /** connectivity state being tracked */
+ grpc_connectivity_state_tracker state_tracker;
} channel_data;
typedef enum {
CALL_CREATED,
- CALL_WAITING,
+ CALL_WAITING_FOR_SEND,
+ CALL_WAITING_FOR_CONFIG,
+ CALL_WAITING_FOR_PICK,
+ CALL_WAITING_FOR_CALL,
CALL_ACTIVE,
CALL_CANCELLED
} call_state;
@@ -84,75 +93,25 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
+ gpr_mu mu_state;
+
call_state state;
gpr_timespec deadline;
- union {
- struct {
- /* our child call stack */
- grpc_child_call *child_call;
- } active;
- grpc_transport_op waiting_op;
- struct {
- grpc_linked_mdelem status;
- grpc_linked_mdelem details;
- } cancelled;
- } s;
+ grpc_subchannel *picked_channel;
+ grpc_iomgr_closure async_setup_task;
+ grpc_transport_stream_op waiting_op;
+ /* our child call stack */
+ grpc_subchannel_call *subchannel_call;
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
};
-static int prepare_activate(grpc_call_element *elem,
- grpc_child_channel *on_child) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- if (calld->state == CALL_CANCELLED) return 0;
-
- /* no more access to calld->s.waiting allowed */
- GPR_ASSERT(calld->state == CALL_WAITING);
-
- if (calld->s.waiting_op.bind_pollset) {
- grpc_transport_setup_del_interested_party(chand->transport_setup,
- calld->s.waiting_op.bind_pollset);
- }
-
- calld->state = CALL_ACTIVE;
-
- /* create a child call */
- /* TODO(ctiller): pass the waiting op down here */
- calld->s.active.child_call =
- grpc_child_channel_create_call(on_child, elem, NULL);
-
- return 1;
-}
-
-static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
- call_data *calld = elem->call_data;
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
-
- GPR_ASSERT(calld->state == CALL_ACTIVE);
-
- /* continue the start call down the stack, this nees to happen after metadata
- are flushed*/
- child_elem->filter->start_transport_op(child_elem, op);
-}
-
-static void remove_waiting_child(channel_data *chand, call_data *calld) {
- size_t new_count;
- size_t i;
- for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
- if (chand->waiting_children[i] == calld) {
- grpc_transport_setup_del_interested_party(
- chand->transport_setup, calld->s.waiting_op.bind_pollset);
- continue;
- }
- chand->waiting_children[new_count++] = chand->waiting_children[i];
- }
- GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
- new_count == chand->waiting_child_count);
- chand->waiting_child_count = new_count;
-}
+static grpc_iomgr_closure *merge_into_waiting_op(
+ grpc_call_element *elem,
+ grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
static void handle_op_after_cancellation(grpc_call_element *elem,
- grpc_transport_op *op) {
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
@@ -163,15 +122,15 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
char status[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_CANCELLED, status);
- calld->s.cancelled.status.md =
+ calld->status.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
- calld->s.cancelled.details.md =
+ calld->details.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
- calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
- calld->s.cancelled.status.next = &calld->s.cancelled.details;
- calld->s.cancelled.details.prev = &calld->s.cancelled.status;
- mdb.list.head = &calld->s.cancelled.status;
- mdb.list.tail = &calld->s.cancelled.details;
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb.list.head = &calld->status;
+ mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = gpr_inf_future;
grpc_sopb_add_metadata(op->recv_ops, mdb);
@@ -183,192 +142,372 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
}
}
-static void cc_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+typedef struct {
+ grpc_iomgr_closure closure;
+ grpc_call_element *elem;
+} waiting_call;
+
+static void perform_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op,
+ int continuation);
+
+static void continue_with_pick(void *arg, int iomgr_success) {
+ waiting_call *wc = arg;
+ call_data *calld = wc->elem->call_data;
+ perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
+ gpr_free(wc);
+}
+
+static void add_to_lb_policy_wait_queue_locked_state_config(
+ grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ waiting_call *wc = gpr_malloc(sizeof(*wc));
+ grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
+ wc->elem = elem;
+ wc->closure.next = chand->waiting_for_config_closures;
+ chand->waiting_for_config_closures = &wc->closure;
+}
+
+static int is_empty(void *p, int len) {
+ char *ptr = p;
+ int i;
+ for (i = 0; i < len; i++) {
+ if (ptr[i] != 0) return 0;
+ }
+ return 1;
+}
+
+static void started_call(void *arg, int iomgr_success) {
+ call_data *calld = arg;
+ grpc_transport_stream_op op;
+ int have_waiting;
+
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(calld->subchannel_call, &op);
+ } else if (calld->state == CALL_WAITING_FOR_CALL) {
+ have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+ if (calld->subchannel_call != NULL) {
+ calld->state = CALL_ACTIVE;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ grpc_subchannel_call_process_op(calld->subchannel_call,
+ &calld->waiting_op);
+ }
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ }
+ }
+ } else {
+ GPR_ASSERT(calld->state == CALL_CANCELLED);
+ gpr_mu_unlock(&calld->mu_state);
+ }
+}
+
+static void picked_target(void *arg, int iomgr_success) {
+ call_data *calld = arg;
+ grpc_pollset *pollset;
+
+ if (calld->picked_channel == NULL) {
+ /* treat this like a cancellation */
+ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
+ perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
+ } else {
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED) {
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ } else {
+ GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
+ calld->state = CALL_WAITING_FOR_CALL;
+ pollset = calld->waiting_op.bind_pollset;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
+ grpc_subchannel_create_call(calld->picked_channel, pollset,
+ &calld->subchannel_call,
+ &calld->async_setup_task);
+ }
+ }
+}
+
+static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
+ grpc_metadata_batch *initial_metadata;
+ grpc_transport_stream_op *op = &calld->waiting_op;
+
+ GPR_ASSERT(op->bind_pollset);
+ GPR_ASSERT(op->send_ops);
+ GPR_ASSERT(op->send_ops->nops >= 1);
+ GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ initial_metadata = &op->send_ops->ops[0].data.metadata;
+
+ grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_lb_policy_pick(lb_policy, op->bind_pollset, initial_metadata,
+ &calld->picked_channel, &calld->async_setup_task);
+}
+
+static grpc_iomgr_closure *merge_into_waiting_op(
+ grpc_call_element *elem, grpc_transport_stream_op *new_op) {
+ call_data *calld = elem->call_data;
+ grpc_iomgr_closure *consumed_op = NULL;
+ grpc_transport_stream_op *waiting_op = &calld->waiting_op;
+ GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
+ GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
+ if (new_op->send_ops != NULL) {
+ waiting_op->send_ops = new_op->send_ops;
+ waiting_op->is_last_send = new_op->is_last_send;
+ waiting_op->on_done_send = new_op->on_done_send;
+ }
+ if (new_op->recv_ops != NULL) {
+ waiting_op->recv_ops = new_op->recv_ops;
+ waiting_op->recv_state = new_op->recv_state;
+ waiting_op->on_done_recv = new_op->on_done_recv;
+ }
+ if (new_op->on_consumed != NULL) {
+ if (waiting_op->on_consumed != NULL) {
+ consumed_op = waiting_op->on_consumed;
+ }
+ waiting_op->on_consumed = new_op->on_consumed;
+ }
+ if (new_op->cancel_with_status != GRPC_STATUS_OK) {
+ waiting_op->cancel_with_status = new_op->cancel_with_status;
+ }
+ return consumed_op;
+}
+
+static void perform_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op,
+ int continuation) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_call_element *child_elem;
- grpc_transport_op waiting_op;
+ grpc_subchannel_call *subchannel_call;
+ grpc_lb_policy *lb_policy;
+ grpc_transport_stream_op op2;
+ grpc_iomgr_closure *consumed_op = NULL;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- gpr_mu_lock(&chand->mu);
+ gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
- gpr_mu_unlock(&chand->mu);
- child_elem->filter->start_transport_op(child_elem, op);
+ GPR_ASSERT(!continuation);
+ subchannel_call = calld->subchannel_call;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(subchannel_call, op);
break;
- case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, op);
- } else {
- calld->state = CALL_WAITING;
- calld->s.waiting_op.bind_pollset = NULL;
- if (chand->active_child) {
- /* channel is connected - use the connected stack */
- if (prepare_activate(elem, chand->active_child)) {
- gpr_mu_unlock(&chand->mu);
- /* activate the request (pass it down) outside the lock */
- complete_activate(elem, op);
- } else {
- gpr_mu_unlock(&chand->mu);
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
+ break;
+ case CALL_WAITING_FOR_SEND:
+ GPR_ASSERT(!continuation);
+ consumed_op = merge_into_waiting_op(elem, op);
+ if (!calld->waiting_op.send_ops &&
+ calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
+ gpr_mu_unlock(&calld->mu_state);
+ break;
+ }
+ *op = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ continuation = 1;
+ /* fall through */
+ case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_PICK:
+ case CALL_WAITING_FOR_CALL:
+ if (!continuation) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ op2 = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ if (op->on_consumed) {
+ calld->waiting_op.on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+ } else if (op2.on_consumed) {
+ calld->waiting_op.on_consumed = op2.on_consumed;
+ op2.on_consumed = NULL;
}
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
+ handle_op_after_cancellation(elem, &op2);
} else {
- /* check to see if we should initiate a connection (if we're not
- already),
- but don't do so until outside the lock to avoid re-entrancy
- problems if
- the callback is immediate */
- int initiate_transport_setup = 0;
- if (!chand->transport_setup_initiated) {
- chand->transport_setup_initiated = 1;
- initiate_transport_setup = 1;
- }
- /* add this call to the waiting set to be resumed once we have a child
- channel stack, growing the waiting set if needed */
- if (chand->waiting_child_count == chand->waiting_child_capacity) {
- chand->waiting_child_capacity =
- GPR_MAX(chand->waiting_child_capacity * 2, 8);
- chand->waiting_children = gpr_realloc(
- chand->waiting_children,
- chand->waiting_child_capacity * sizeof(call_data *));
- }
- calld->s.waiting_op = *op;
- chand->waiting_children[chand->waiting_child_count++] = calld;
- grpc_transport_setup_add_interested_party(chand->transport_setup,
- op->bind_pollset);
- gpr_mu_unlock(&chand->mu);
-
- /* finally initiate transport setup if needed */
- if (initiate_transport_setup) {
- grpc_transport_setup_initiate(chand->transport_setup);
- }
+ consumed_op = merge_into_waiting_op(elem, op);
+ gpr_mu_unlock(&calld->mu_state);
}
+ break;
}
- break;
- case CALL_WAITING:
+ /* fall through */
+ case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op = calld->s.waiting_op;
- remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, &waiting_op);
+ gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
} else {
- GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
- (op->send_ops == NULL));
- GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
- (op->recv_ops == NULL));
- if (op->send_ops) {
- calld->s.waiting_op.send_ops = op->send_ops;
- calld->s.waiting_op.is_last_send = op->is_last_send;
- calld->s.waiting_op.on_done_send = op->on_done_send;
- }
- if (op->recv_ops) {
- calld->s.waiting_op.recv_ops = op->recv_ops;
- calld->s.waiting_op.recv_state = op->recv_state;
- calld->s.waiting_op.on_done_recv = op->on_done_recv;
- }
- gpr_mu_unlock(&chand->mu);
- if (op->on_consumed) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
+ calld->waiting_op = *op;
+
+ if (op->send_ops == NULL) {
+ /* need to have some send ops before we can select the
+ lb target */
+ calld->state = CALL_WAITING_FOR_SEND;
+ gpr_mu_unlock(&calld->mu_state);
+ } else {
+ gpr_mu_lock(&chand->mu_config);
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "pick");
+ gpr_mu_unlock(&chand->mu_config);
+ calld->state = CALL_WAITING_FOR_PICK;
+ gpr_mu_unlock(&calld->mu_state);
+
+ pick_target(lb_policy, calld);
+
+ GRPC_LB_POLICY_UNREF(lb_policy, "pick");
+ } else if (chand->resolver != NULL) {
+ calld->state = CALL_WAITING_FOR_CONFIG;
+ add_to_lb_policy_wait_queue_locked_state_config(elem);
+ gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_unlock(&calld->mu_state);
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
+ }
}
}
break;
- case CALL_CANCELLED:
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, op);
- break;
}
+
+ if (consumed_op != NULL) {
+ consumed_op->cb(consumed_op->cb_arg, 1);
+ }
+}
+
+static void cc_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ perform_transport_stream_op(elem, op, 0);
+}
+
+static void cc_on_config_changed(void *arg, int iomgr_success) {
+ channel_data *chand = arg;
+ grpc_lb_policy *lb_policy = NULL;
+ grpc_lb_policy *old_lb_policy;
+ grpc_resolver *old_resolver;
+ grpc_iomgr_closure *wakeup_closures = NULL;
+
+ if (chand->incoming_configuration != NULL) {
+ lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
+ GRPC_LB_POLICY_REF(lb_policy, "channel");
+
+ grpc_client_config_unref(chand->incoming_configuration);
+ }
+
+ chand->incoming_configuration = NULL;
+
+ gpr_mu_lock(&chand->mu_config);
+ old_lb_policy = chand->lb_policy;
+ chand->lb_policy = lb_policy;
+ if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
+ wakeup_closures = chand->waiting_for_config_closures;
+ chand->waiting_for_config_closures = NULL;
+ }
+ gpr_mu_unlock(&chand->mu_config);
+
+ if (old_lb_policy) {
+ GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+ }
+
+ gpr_mu_lock(&chand->mu_config);
+ if (iomgr_success && chand->resolver) {
+ grpc_resolver *resolver = chand->resolver;
+ GRPC_RESOLVER_REF(resolver, "channel-next");
+ gpr_mu_unlock(&chand->mu_config);
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ grpc_resolver_next(chand->resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
+ GRPC_RESOLVER_UNREF(resolver, "channel-next");
+ } else {
+ old_resolver = chand->resolver;
+ chand->resolver = NULL;
+ grpc_connectivity_state_set(&chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE);
+ gpr_mu_unlock(&chand->mu_config);
+ if (old_resolver != NULL) {
+ grpc_resolver_shutdown(old_resolver);
+ GRPC_RESOLVER_UNREF(old_resolver, "channel");
+ }
+ }
+
+ while (wakeup_closures) {
+ grpc_iomgr_closure *next = wakeup_closures->next;
+ grpc_iomgr_add_callback(wakeup_closures);
+ wakeup_closures = next;
+ }
+
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
+static void cc_start_transport_op(grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
- grpc_child_channel *child_channel;
- grpc_channel_op rop;
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ grpc_resolver *destroy_resolver = NULL;
+ grpc_iomgr_closure *on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+
+ GPR_ASSERT(op->set_accept_stream == NULL);
+ GPR_ASSERT(op->bind_pollset == NULL);
+
+ gpr_mu_lock(&chand->mu_config);
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
- switch (op->type) {
- case GRPC_CHANNEL_GOAWAY:
- /* sending goaway: clear out the active child on the way through */
- gpr_mu_lock(&chand->mu);
- child_channel = chand->active_child;
- chand->active_child = NULL;
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_handle_op(child_channel, op);
- grpc_child_channel_destroy(child_channel, 1);
- } else {
- gpr_slice_unref(op->data.goaway.message);
- }
- break;
- case GRPC_CHANNEL_DISCONNECT:
- /* sending disconnect: clear out the active child on the way through */
- gpr_mu_lock(&chand->mu);
- child_channel = chand->active_child;
- chand->active_child = NULL;
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 1);
- }
- /* fake a transport closed to satisfy the refcounting in client */
- rop.type = GRPC_TRANSPORT_CLOSED;
- rop.dir = GRPC_CALL_UP;
- grpc_channel_next_op(elem, &rop);
- break;
- case GRPC_TRANSPORT_GOAWAY:
- /* receiving goaway: if it's from our active child, drop the active child;
- in all cases consume the event here */
- gpr_mu_lock(&chand->mu);
- child_channel = grpc_channel_stack_from_top_element(from_elem);
- if (child_channel == chand->active_child) {
- chand->active_child = NULL;
- } else {
- child_channel = NULL;
- }
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
- }
- gpr_slice_unref(op->data.goaway.message);
- break;
- case GRPC_TRANSPORT_CLOSED:
- /* receiving disconnect: if it's from our active child, drop the active
- child; in all cases consume the event here */
- gpr_mu_lock(&chand->mu);
- child_channel = grpc_channel_stack_from_top_element(from_elem);
- if (child_channel == chand->active_child) {
- chand->active_child = NULL;
- } else {
- child_channel = NULL;
- }
- gpr_mu_unlock(&chand->mu);
- if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
- }
- break;
- default:
- switch (op->dir) {
- case GRPC_CALL_UP:
- grpc_channel_next_op(elem, op);
- break;
- case GRPC_CALL_DOWN:
- gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type);
- abort();
- break;
- }
- break;
+ if (op->disconnect && chand->resolver != NULL) {
+ grpc_connectivity_state_set(&chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE);
+ destroy_resolver = chand->resolver;
+ chand->resolver = NULL;
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_shutdown(chand->lb_policy);
+ }
+ }
+
+ if (!is_empty(op, sizeof(*op))) {
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+
+ if (destroy_resolver) {
+ grpc_resolver_shutdown(destroy_resolver);
+ GRPC_RESOLVER_UNREF(destroy_resolver, "channel");
+ }
+
+ if (lb_policy) {
+ grpc_lb_policy_broadcast(lb_policy, op);
+ GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
+ }
+
+ if (on_consumed) {
+ grpc_iomgr_add_callback(on_consumed);
}
}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
/* TODO(ctiller): is there something useful we can do here? */
@@ -376,6 +515,7 @@ static void init_call_elem(grpc_call_element *elem,
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
+ gpr_mu_init(&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
@@ -384,161 +524,88 @@ static void init_call_elem(grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
+ grpc_subchannel_call *subchannel_call;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
- gpr_mu_lock(&chand->mu);
+ gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- gpr_mu_unlock(&chand->mu);
- grpc_child_call_destroy(calld->s.active.child_call);
+ subchannel_call = calld->subchannel_call;
+ gpr_mu_unlock(&calld->mu_state);
+ GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel");
break;
- case CALL_WAITING:
- remove_waiting_child(chand, calld);
- gpr_mu_unlock(&chand->mu);
+ case CALL_CREATED:
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&calld->mu_state);
break;
- default:
- gpr_mu_unlock(&chand->mu);
+ case CALL_WAITING_FOR_PICK:
+ case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_CALL:
+ case CALL_WAITING_FOR_SEND:
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
break;
}
- GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
channel_data *chand = elem->channel_data;
- GPR_ASSERT(!is_first);
+ memset(chand, 0, sizeof(*chand));
+
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- gpr_mu_init(&chand->mu);
- chand->active_child = NULL;
- chand->waiting_children = NULL;
- chand->waiting_child_count = 0;
- chand->waiting_child_capacity = 0;
- chand->transport_setup = NULL;
- chand->transport_setup_initiated = 0;
- chand->args = grpc_channel_args_copy(args);
+ gpr_mu_init(&chand->mu_config);
chand->mdctx = metadata_context;
+ chand->master = master;
+ grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
+ chand);
+
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
}
/* Destructor for channel_data */
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- grpc_transport_setup_cancel(chand->transport_setup);
-
- if (chand->active_child) {
- grpc_child_channel_destroy(chand->active_child, 1);
- chand->active_child = NULL;
+ if (chand->resolver != NULL) {
+ grpc_resolver_shutdown(chand->resolver);
+ GRPC_RESOLVER_UNREF(chand->resolver, "channel");
}
-
- grpc_channel_args_destroy(chand->args);
-
- gpr_mu_destroy(&chand->mu);
- GPR_ASSERT(chand->waiting_child_count == 0);
- gpr_free(chand->waiting_children);
+ if (chand->lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+ }
+ gpr_mu_destroy(&chand->mu_config);
}
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "client-channel",
+ cc_start_transport_stream_op,
+ cc_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "client-channel",
};
-grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
- grpc_channel_stack *channel_stack, grpc_transport *transport,
- grpc_channel_filter const **channel_filters, size_t num_channel_filters,
- grpc_mdctx *mdctx) {
- /* we just got a new transport: lets create a child channel stack for it */
- grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
- channel_data *chand = elem->channel_data;
- size_t num_child_filters = 2 + num_channel_filters;
- grpc_channel_filter const **child_filters;
- grpc_transport_setup_result result;
- grpc_child_channel *old_active = NULL;
- call_data **waiting_children;
- size_t waiting_child_count;
- size_t i;
- grpc_transport_op *call_ops;
-
- /* build the child filter stack */
- child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
- /* we always need a link back filter to get back to the connected channel */
- child_filters[0] = &grpc_child_channel_top_filter;
- for (i = 0; i < num_channel_filters; i++) {
- child_filters[i + 1] = channel_filters[i];
- }
- /* and we always need a connected channel to talk to the transport */
- child_filters[num_child_filters - 1] = &grpc_connected_channel_filter;
-
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
-
- /* BEGIN LOCKING CHANNEL */
- gpr_mu_lock(&chand->mu);
- chand->transport_setup_initiated = 0;
-
- if (chand->active_child) {
- old_active = chand->active_child;
- }
- chand->active_child = grpc_child_channel_create(
- elem, child_filters, num_child_filters, chand->args, mdctx);
- result =
- grpc_connected_channel_bind_transport(chand->active_child, transport);
-
- /* capture the waiting children - we'll activate them outside the lock
- to avoid re-entrancy problems */
- waiting_children = chand->waiting_children;
- waiting_child_count = chand->waiting_child_count;
- /* bumping up inflight_requests here avoids taking a lock per rpc below */
-
- chand->waiting_children = NULL;
- chand->waiting_child_count = 0;
- chand->waiting_child_capacity = 0;
-
- call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
-
- for (i = 0; i < waiting_child_count; i++) {
- call_ops[i] = waiting_children[i]->s.waiting_op;
- if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
- waiting_children[i] = NULL;
- grpc_transport_op_finish_with_failure(&call_ops[i]);
- }
- }
-
- /* END LOCKING CHANNEL */
- gpr_mu_unlock(&chand->mu);
-
- /* activate any pending operations - this is safe to do as we guarantee one
- and only one write operation per request at the surface api - if we lose
- that guarantee we need to do some curly locking here */
- for (i = 0; i < waiting_child_count; i++) {
- if (waiting_children[i]) {
- complete_activate(waiting_children[i]->elem, &call_ops[i]);
- }
- }
- gpr_free(waiting_children);
- gpr_free(call_ops);
- gpr_free(child_filters);
-
- if (old_active) {
- grpc_child_channel_destroy(old_active, 1);
- }
-
- return result;
-}
-
-void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
- grpc_transport_setup *setup) {
+void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver) {
/* post construction initialization: set the transport setup pointer */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
- GPR_ASSERT(!chand->transport_setup);
- chand->transport_setup = setup;
+ GPR_ASSERT(!chand->resolver);
+ chand->resolver = resolver;
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_RESOLVER_REF(resolver, "channel");
+ grpc_resolver_next(resolver, &chand->incoming_configuration,
+ &chand->on_config_changed);
}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 7a67a9f21f..fd2be46145 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H
#include "src/core/channel/channel_stack.h"
+#include "src/core/client_config/resolver.h"
/* A client channel is a channel that begins disconnected, and can connect
to some endpoint on demand. If that endpoint disconnects, it will be
@@ -48,15 +49,7 @@ extern const grpc_channel_filter grpc_client_channel_filter;
/* post-construction initializer to let the client channel know which
transport setup it should cancel upon destruction, or initiate when it needs
a connection */
-void grpc_client_channel_set_transport_setup(grpc_channel_stack *channel_stack,
- grpc_transport_setup *setup);
+void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver);
-/* grpc_transport_setup_callback for binding new transports into a client
- channel - user_data should be the channel stack containing the client
- channel */
-grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
- grpc_channel_stack *channel_stack, grpc_transport *transport,
- grpc_channel_filter const **channel_filters, size_t num_channel_filters,
- grpc_mdctx *mdctx);
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
deleted file mode 100644
index 5be8fa66e9..0000000000
--- a/src/core/channel/client_setup.c
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/client_setup.h"
-#include "src/core/channel/channel_args.h"
-#include "src/core/channel/channel_stack.h"
-#include "src/core/iomgr/alarm.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/time.h>
-
-struct grpc_client_setup {
- grpc_transport_setup base; /* must be first */
- void (*initiate)(void *user_data, grpc_client_setup_request *request);
- void (*done)(void *user_data);
- void *user_data;
- grpc_channel_args *args;
- grpc_mdctx *mdctx;
- grpc_alarm backoff_alarm;
- gpr_timespec current_backoff_interval;
- int in_alarm;
- int in_cb;
- int cancelled;
-
- gpr_mu mu;
- gpr_cv cv;
- grpc_client_setup_request *active_request;
- int refs;
- /** The set of pollsets that are currently interested in this
- connection being established */
- grpc_pollset_set interested_parties;
-};
-
-struct grpc_client_setup_request {
- /* pointer back to the setup object */
- grpc_client_setup *setup;
- gpr_timespec deadline;
-};
-
-gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) {
- return r->deadline;
-}
-
-grpc_pollset_set *grpc_client_setup_get_interested_parties(
- grpc_client_setup_request *r) {
- return &r->setup->interested_parties;
-}
-
-static void destroy_setup(grpc_client_setup *s) {
- gpr_mu_destroy(&s->mu);
- gpr_cv_destroy(&s->cv);
- s->done(s->user_data);
- grpc_channel_args_destroy(s->args);
- grpc_pollset_set_destroy(&s->interested_parties);
- gpr_free(s);
-}
-
-static void destroy_request(grpc_client_setup_request *r) { gpr_free(r); }
-
-/* initiate handshaking */
-static void setup_initiate(grpc_transport_setup *sp) {
- grpc_client_setup *s = (grpc_client_setup *)sp;
- grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
- int in_alarm = 0;
-
- r->setup = s;
- r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
-
- gpr_mu_lock(&s->mu);
- GPR_ASSERT(s->refs > 0);
- /* there might be more than one request outstanding if the caller calls
- initiate in some kind of rapid-fire way: we try to connect each time,
- and keep track of the latest request (which is the only one that gets
- to finish) */
- if (!s->in_alarm) {
- s->active_request = r;
- s->refs++;
- } else {
- /* TODO(klempner): Maybe do something more clever here */
- in_alarm = 1;
- }
- gpr_mu_unlock(&s->mu);
-
- if (!in_alarm) {
- s->initiate(s->user_data, r);
- } else {
- destroy_request(r);
- }
-}
-
-/** implementation of add_interested_party for setup vtable */
-static void setup_add_interested_party(grpc_transport_setup *sp,
- grpc_pollset *pollset) {
- grpc_client_setup *s = (grpc_client_setup *)sp;
-
- gpr_mu_lock(&s->mu);
- grpc_pollset_set_add_pollset(&s->interested_parties, pollset);
- gpr_mu_unlock(&s->mu);
-}
-
-/** implementation of del_interested_party for setup vtable */
-static void setup_del_interested_party(grpc_transport_setup *sp,
- grpc_pollset *pollset) {
- grpc_client_setup *s = (grpc_client_setup *)sp;
-
- gpr_mu_lock(&s->mu);
- grpc_pollset_set_del_pollset(&s->interested_parties, pollset);
- gpr_mu_unlock(&s->mu);
-}
-
-/* cancel handshaking: cancel all requests, and shutdown (the caller promises
- not to initiate again) */
-static void setup_cancel(grpc_transport_setup *sp) {
- grpc_client_setup *s = (grpc_client_setup *)sp;
- int cancel_alarm = 0;
-
- gpr_mu_lock(&s->mu);
- s->cancelled = 1;
- while (s->in_cb) {
- gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
- }
-
- GPR_ASSERT(s->refs > 0);
- /* effectively cancels the current request (if any) */
- s->active_request = NULL;
- if (s->in_alarm) {
- cancel_alarm = 1;
- }
- if (--s->refs == 0) {
- gpr_mu_unlock(&s->mu);
- destroy_setup(s);
- } else {
- gpr_mu_unlock(&s->mu);
- }
- if (cancel_alarm) {
- grpc_alarm_cancel(&s->backoff_alarm);
- }
-}
-
-int grpc_client_setup_cb_begin(grpc_client_setup_request *r,
- const char *reason) {
- gpr_mu_lock(&r->setup->mu);
- if (r->setup->cancelled) {
- gpr_mu_unlock(&r->setup->mu);
- return 0;
- }
- r->setup->in_cb++;
- gpr_mu_unlock(&r->setup->mu);
- return 1;
-}
-
-void grpc_client_setup_cb_end(grpc_client_setup_request *r,
- const char *reason) {
- gpr_mu_lock(&r->setup->mu);
- r->setup->in_cb--;
- if (r->setup->cancelled) gpr_cv_signal(&r->setup->cv);
- gpr_mu_unlock(&r->setup->mu);
-}
-
-/* vtable for transport setup */
-static const grpc_transport_setup_vtable setup_vtable = {
- setup_initiate, setup_add_interested_party, setup_del_interested_party,
- setup_cancel};
-
-void grpc_client_setup_create_and_attach(
- grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
- grpc_mdctx *mdctx,
- void (*initiate)(void *user_data, grpc_client_setup_request *request),
- void (*done)(void *user_data), void *user_data) {
- grpc_client_setup *s = gpr_malloc(sizeof(grpc_client_setup));
-
- s->base.vtable = &setup_vtable;
- gpr_mu_init(&s->mu);
- gpr_cv_init(&s->cv);
- s->refs = 1;
- s->mdctx = mdctx;
- s->initiate = initiate;
- s->done = done;
- s->user_data = user_data;
- s->active_request = NULL;
- s->args = grpc_channel_args_copy(args);
- s->current_backoff_interval = gpr_time_from_micros(1000000);
- s->in_alarm = 0;
- s->in_cb = 0;
- s->cancelled = 0;
- grpc_pollset_set_init(&s->interested_parties);
-
- grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base);
-}
-
-int grpc_client_setup_request_should_continue(grpc_client_setup_request *r,
- const char *reason) {
- int result;
- if (gpr_time_cmp(gpr_now(), r->deadline) > 0) {
- result = 0;
- } else {
- gpr_mu_lock(&r->setup->mu);
- result = r->setup->active_request == r;
- gpr_mu_unlock(&r->setup->mu);
- }
- return result;
-}
-
-static void backoff_alarm_done(void *arg /* grpc_client_setup_request */,
- int success) {
- grpc_client_setup_request *r = arg;
- grpc_client_setup *s = r->setup;
- /* Handle status cancelled? */
- gpr_mu_lock(&s->mu);
- s->in_alarm = 0;
- if (s->active_request != NULL || !success) {
- if (0 == --s->refs) {
- gpr_mu_unlock(&s->mu);
- destroy_setup(s);
- destroy_request(r);
- return;
- } else {
- gpr_mu_unlock(&s->mu);
- destroy_request(r);
- return;
- }
- }
- s->active_request = r;
- gpr_mu_unlock(&s->mu);
- s->initiate(s->user_data, r);
-}
-
-void grpc_client_setup_request_finish(grpc_client_setup_request *r,
- int was_successful) {
- int retry = !was_successful;
- grpc_client_setup *s = r->setup;
-
- gpr_mu_lock(&s->mu);
- if (s->active_request == r) {
- s->active_request = NULL;
- } else {
- retry = 0;
- }
-
- if (!retry && 0 == --s->refs) {
- gpr_mu_unlock(&s->mu);
- destroy_setup(s);
- destroy_request(r);
- } else if (retry) {
- /* TODO(klempner): Replace these values with further consideration. 2x is
- probably too aggressive of a backoff. */
- gpr_timespec max_backoff = gpr_time_from_minutes(2);
- gpr_timespec now = gpr_now();
- gpr_timespec deadline = gpr_time_add(s->current_backoff_interval, now);
- GPR_ASSERT(!s->in_alarm);
- s->in_alarm = 1;
- grpc_alarm_init(&s->backoff_alarm, deadline, backoff_alarm_done, r, now);
- s->current_backoff_interval =
- gpr_time_add(s->current_backoff_interval, s->current_backoff_interval);
- if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) {
- s->current_backoff_interval = max_backoff;
- }
- gpr_mu_unlock(&s->mu);
- } else {
- gpr_mu_unlock(&s->mu);
- destroy_request(r);
- }
-}
-
-const grpc_channel_args *grpc_client_setup_get_channel_args(
- grpc_client_setup_request *r) {
- return r->setup->args;
-}
-
-grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r) {
- return r->setup->mdctx;
-}
diff --git a/src/core/channel/client_setup.h b/src/core/channel/client_setup.h
deleted file mode 100644
index 7d40338840..0000000000
--- a/src/core/channel/client_setup.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_CLIENT_SETUP_H
-#define GRPC_INTERNAL_CORE_CHANNEL_CLIENT_SETUP_H
-
-#include "src/core/channel/client_channel.h"
-#include "src/core/transport/metadata.h"
-#include <grpc/support/time.h>
-
-/* Convenience API's to simplify transport setup */
-
-typedef struct grpc_client_setup grpc_client_setup;
-typedef struct grpc_client_setup_request grpc_client_setup_request;
-
-void grpc_client_setup_create_and_attach(
- grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
- grpc_mdctx *mdctx,
- void (*initiate)(void *user_data, grpc_client_setup_request *request),
- void (*done)(void *user_data), void *user_data);
-
-/* Check that r is the active request: needs to be performed at each callback.
- If this races, we'll have two connection attempts running at once and the
- old one will get cleaned up in due course, which is fine. */
-int grpc_client_setup_request_should_continue(grpc_client_setup_request *r,
- const char *reason);
-void grpc_client_setup_request_finish(grpc_client_setup_request *r,
- int was_successful);
-const grpc_channel_args *grpc_client_setup_get_channel_args(
- grpc_client_setup_request *r);
-
-/* Call before calling back into the setup listener, and call only if
- this function returns 1. If it returns 1, also promise to call
- grpc_client_setup_cb_end */
-int grpc_client_setup_cb_begin(grpc_client_setup_request *r,
- const char *reason);
-void grpc_client_setup_cb_end(grpc_client_setup_request *r, const char *reason);
-
-/* Get the deadline for a request passed in to initiate. Implementations should
- make a best effort to honor this deadline. */
-gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r);
-grpc_pollset_set *grpc_client_setup_get_interested_parties(
- grpc_client_setup_request *r);
-
-grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r);
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_SETUP_H */
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 14dda88698..34d07de519 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -61,42 +61,27 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void con_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+static void con_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_transport_perform_op(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
+ grpc_transport_perform_stream_op(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
-/* Currently we assume all channel operations should just be pushed up. */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
+static void con_start_transport_op(grpc_channel_element *elem,
+ grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- switch (op->type) {
- case GRPC_CHANNEL_GOAWAY:
- grpc_transport_goaway(chand->transport, op->data.goaway.status,
- op->data.goaway.message);
- break;
- case GRPC_CHANNEL_DISCONNECT:
- grpc_transport_close(chand->transport);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_channel_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport, op);
}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
@@ -118,11 +103,10 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
@@ -136,70 +120,23 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- con_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "connected",
+ con_start_transport_stream_op,
+ con_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "connected",
};
-/* Transport callback to accept a new stream... calls up to handle it */
-static void accept_stream(void *user_data, grpc_transport *transport,
- const void *transport_server_data) {
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_ACCEPT_CALL;
- op.dir = GRPC_CALL_UP;
- op.data.accept_call.transport = transport;
- op.data.accept_call.transport_server_data = transport_server_data;
- channel_op(elem, NULL, &op);
-}
-
-static void transport_goaway(void *user_data, grpc_transport *transport,
- grpc_status_code status, gpr_slice debug) {
- /* transport got goaway ==> call up and handle it */
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_TRANSPORT_GOAWAY;
- op.dir = GRPC_CALL_UP;
- op.data.goaway.status = status;
- op.data.goaway.message = debug;
- channel_op(elem, NULL, &op);
-}
-
-static void transport_closed(void *user_data, grpc_transport *transport) {
- /* transport was closed ==> call up and handle it */
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_TRANSPORT_CLOSED;
- op.dir = GRPC_CALL_UP;
- channel_op(elem, NULL, &op);
-}
-
-const grpc_transport_callbacks connected_channel_transport_callbacks = {
- accept_stream, transport_goaway, transport_closed,
-};
-
-grpc_transport_setup_result grpc_connected_channel_bind_transport(
- grpc_channel_stack *channel_stack, grpc_transport *transport) {
+void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
+ grpc_transport *transport) {
/* Assumes that the connected channel filter is always the last filter
in a channel stack */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *cd = (channel_data *)elem->channel_data;
- grpc_transport_setup_result ret;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GPR_ASSERT(cd->transport == NULL);
cd->transport = transport;
@@ -211,8 +148,4 @@ grpc_transport_setup_result grpc_connected_channel_bind_transport(
the last call element, and the last call element MUST be the connected
channel. */
channel_stack->call_stack_size += grpc_transport_stream_size(transport);
-
- ret.user_data = elem;
- ret.callbacks = &connected_channel_transport_callbacks;
- return ret;
}
diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h
index 8b35f69b26..b615b0d350 100644
--- a/src/core/channel/connected_channel.h
+++ b/src/core/channel/connected_channel.h
@@ -43,7 +43,7 @@ extern const grpc_channel_filter grpc_connected_channel_filter;
/* Post construction fixup: set the transport in the connected channel.
Must be called before any call stack using this filter is used. */
-grpc_transport_setup_result grpc_connected_channel_bind_transport(
- grpc_channel_stack *channel_stack, grpc_transport *transport);
+void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
+ grpc_transport *transport);
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 08a2c0df3c..581eb13f58 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -92,7 +92,8 @@ static void hc_on_recv(void *user_data, int success) {
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
-static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void hc_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -127,33 +128,16 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
}
static void hc_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+ grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
hc_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
@@ -186,7 +170,7 @@ static const char *scheme_from_args(const grpc_channel_args *args) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */
@@ -195,7 +179,6 @@ static void init_channel_elem(grpc_channel_element *elem,
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT(!is_first);
GPR_ASSERT(!is_last);
/* initialize members */
@@ -221,6 +204,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
- hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "http-client"};
+ hc_start_transport_op, grpc_channel_next_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "http-client"};
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index d3a01fd1a8..db0bf590c6 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -72,9 +72,6 @@ typedef struct channel_data {
grpc_mdctx *mdctx;
} channel_data;
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
@@ -181,7 +178,8 @@ static void hs_on_recv(void *user_data, int success) {
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
-static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void hs_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -209,33 +207,16 @@ static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
}
static void hs_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+ grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
hs_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
@@ -248,7 +229,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */
@@ -297,6 +278,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "http-server"};
+ hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index 1d2be716d7..5117723617 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -45,7 +45,8 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
-static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void noop_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -61,35 +62,18 @@ static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void noop_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+static void noop_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
noop_mutate_op(elem, op);
/* pass control down the stack */
grpc_call_next_op(elem, op);
}
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -111,7 +95,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */
@@ -135,7 +119,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
ignore_unused(channeld);
}
-const grpc_channel_filter grpc_no_op_filter = {
- noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "no-op"};
+const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "no-op"};