aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/channel_stack.c50
-rw-r--r--src/core/channel/channel_stack.h57
-rw-r--r--src/core/channel/client_channel.c494
-rw-r--r--src/core/channel/client_uchannel.c355
-rw-r--r--src/core/channel/compress_filter.c293
-rw-r--r--src/core/channel/connected_channel.c29
-rw-r--r--src/core/channel/connected_channel.h2
-rw-r--r--src/core/channel/http_client_filter.c113
-rw-r--r--src/core/channel/http_server_filter.c148
-rw-r--r--src/core/channel/noop_filter.c22
-rw-r--r--src/core/channel/subchannel_call_holder.c283
-rw-r--r--src/core/channel/subchannel_call_holder.h83
12 files changed, 859 insertions, 1070 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index abd7f719e7..02e33a09ab 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -104,13 +104,14 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter **filters,
size_t filter_count, grpc_channel *master,
- const grpc_channel_args *args,
+ const grpc_channel_args *channel_args,
grpc_mdctx *metadata_context,
grpc_channel_stack *stack) {
size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
grpc_channel_element *elems;
+ grpc_channel_element_args args;
char *user_data;
size_t i;
@@ -122,11 +123,14 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
/* init per-filter data */
for (i = 0; i < filter_count; i++) {
+ args.master = master;
+ args.channel_args = channel_args;
+ args.metadata_context = metadata_context;
+ args.is_first = i == 0;
+ args.is_last = i == (filter_count - 1);
elems[i].filter = filters[i];
elems[i].channel_data = user_data;
- elems[i].filter->init_channel_elem(exec_ctx, &elems[i], master, args,
- metadata_context, i == 0,
- i == (filter_count - 1));
+ elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args);
user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
}
@@ -151,33 +155,63 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
+ grpc_channel_stack *channel_stack, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context,
const void *transport_server_data,
- grpc_transport_stream_op *initial_op,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
+ grpc_call_element_args args;
size_t count = channel_stack->count;
grpc_call_element *call_elems;
char *user_data;
size_t i;
call_stack->count = count;
+ gpr_ref_init(&call_stack->refcount.refs, initial_refs);
+ grpc_closure_init(&call_stack->refcount.destroy, destroy, destroy_arg);
call_elems = CALL_ELEMS_FROM_STACK(call_stack);
user_data = ((char *)call_elems) +
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
for (i = 0; i < count; i++) {
+ args.refcount = &call_stack->refcount;
+ args.server_transport_data = transport_server_data;
+ args.context = context;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i],
- transport_server_data, initial_op);
+ call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
}
+void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_call_stack *call_stack,
+ grpc_pollset *pollset) {
+ size_t count = call_stack->count;
+ grpc_call_element *call_elems;
+ char *user_data;
+ size_t i;
+
+ call_elems = CALL_ELEMS_FROM_STACK(call_stack);
+ user_data = ((char *)call_elems) +
+ ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
+
+ /* init per-filter data */
+ for (i = 0; i < count; i++) {
+ call_elems[i].filter->set_pollset(exec_ctx, &call_elems[i], pollset);
+ user_data +=
+ ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
+ }
+}
+
+void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_pollset *pollset) {}
+
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 6732cc3018..1279fec080 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,6 +51,20 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
+typedef struct {
+ grpc_channel *master;
+ const grpc_channel_args *channel_args;
+ grpc_mdctx *metadata_context;
+ int is_first;
+ int is_last;
+} grpc_channel_element_args;
+
+typedef struct {
+ grpc_stream_refcount *refcount;
+ const void *server_transport_data;
+ grpc_call_context_element *context;
+} grpc_call_element_args;
+
/* Channel filters specify:
1. the amount of memory needed in the channel & call (via the sizeof_XXX
members)
@@ -84,8 +98,9 @@ typedef struct {
transport and is on the server. Most filters want to ignore this
argument. */
void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op);
+ grpc_call_element_args *args);
+ void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_pollset *pollset);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
@@ -99,9 +114,7 @@ typedef struct {
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
- grpc_channel *master, const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last);
+ grpc_channel_element_args *args);
/* Destroy per channel data.
The filter does not need to do any chaining */
void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
@@ -141,7 +154,14 @@ typedef struct {
/* A call stack tracks a set of related filters for one call, and guarantees
they live within a single malloc() allocation */
-typedef struct { size_t count; } grpc_call_stack;
+typedef struct {
+ /* shared refcount for this channel stack.
+ MUST be the first element: the underlying code calls destroy
+ with the address of the refcount, but higher layers prefer to think
+ about the address of the call stack itself. */
+ grpc_stream_refcount refcount;
+ size_t count;
+} grpc_call_stack;
/* Get a channel element given a channel stack and its index */
grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack,
@@ -170,13 +190,34 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
+ grpc_channel_stack *channel_stack, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context,
const void *transport_server_data,
- grpc_transport_stream_op *initial_op,
grpc_call_stack *call_stack);
+/* Set a pollset for a call stack: must occur before the first op is started */
+void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_call_stack *call_stack,
+ grpc_pollset *pollset);
+
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#define grpc_call_stack_ref(call_stack, reason) \
+ grpc_stream_ref(&(call_stack)->refcount, reason)
+#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason)
+#else
+#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount)
+#define grpc_call_stack_unref(exec_ctx, call_stack) \
+ grpc_stream_unref(exec_ctx, &(call_stack)->refcount)
+#endif
+
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
+/* Ignore set pollset */
+void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_pollset *pollset);
/* Call the next operation in a call stack */
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op);
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9f85557ea1..16d91d4277 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -43,6 +43,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/subchannel_call_holder.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
@@ -51,7 +52,7 @@
/* Client channel implementation */
-typedef struct call_data call_data;
+typedef grpc_subchannel_call_holder call_data;
typedef struct client_channel_channel_data {
/** metadata context for this channel */
@@ -98,360 +99,22 @@ typedef struct {
grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher;
-typedef enum {
- CALL_CREATED,
- CALL_WAITING_FOR_SEND,
- CALL_WAITING_FOR_CONFIG,
- CALL_WAITING_FOR_PICK,
- CALL_WAITING_FOR_CALL,
- CALL_ACTIVE,
- CALL_CANCELLED
-} call_state;
-
-struct call_data {
- /* owning element */
- grpc_call_element *elem;
-
- gpr_mu mu_state;
-
- call_state state;
- gpr_timespec deadline;
- grpc_subchannel *picked_channel;
- grpc_closure async_setup_task;
- grpc_transport_stream_op waiting_op;
- /* our child call stack */
- grpc_subchannel_call *subchannel_call;
- grpc_linked_mdelem status;
- grpc_linked_mdelem details;
-};
-
-static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
- grpc_transport_stream_op *new_op)
- GRPC_MUST_USE_RESULT;
-
-static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- if (op->send_ops) {
- grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
- }
- if (op->recv_ops) {
- char status[GPR_LTOA_MIN_BUFSIZE];
- grpc_metadata_batch mdb;
- gpr_ltoa(GRPC_STATUS_CANCELLED, status);
- calld->status.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
- calld->details.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
- calld->status.prev = calld->details.next = NULL;
- calld->status.next = &calld->details;
- calld->details.prev = &calld->status;
- mdb.list.head = &calld->status;
- mdb.list.tail = &calld->details;
- mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- grpc_sopb_add_metadata(op->recv_ops, mdb);
- *op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
- }
- if (op->on_consumed) {
- op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
- }
-}
-
typedef struct {
grpc_closure closure;
grpc_call_element *elem;
} waiting_call;
-static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op,
- int continuation);
-
-static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- waiting_call *wc = arg;
- call_data *calld = wc->elem->call_data;
- perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1);
- gpr_free(wc);
-}
-
-static void add_to_lb_policy_wait_queue_locked_state_config(
- grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- waiting_call *wc = gpr_malloc(sizeof(*wc));
- grpc_closure_init(&wc->closure, continue_with_pick, wc);
- wc->elem = elem;
- grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
-}
-
-static int is_empty(void *p, int len) {
- char *ptr = p;
- int i;
- for (i = 0; i < len; i++) {
- if (ptr[i] != 0) return 0;
- }
- return 1;
-}
-
-static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- call_data *calld = arg;
- grpc_transport_stream_op op;
- int have_waiting;
-
- if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
- memset(&op, 0, sizeof(op));
- op.cancel_with_status = GRPC_STATUS_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
- } else if (calld->state == CALL_WAITING_FOR_CALL) {
- have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
- if (calld->subchannel_call != NULL) {
- calld->state = CALL_ACTIVE;
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
- &calld->waiting_op);
- }
- } else {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
- }
- }
- } else {
- GPR_ASSERT(calld->state == CALL_CANCELLED);
- gpr_mu_unlock(&calld->mu_state);
- }
-}
-
-static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- call_data *calld = arg;
- gpr_mu_lock(&calld->mu_state);
- started_call_locked(exec_ctx, arg, iomgr_success);
-}
-
-static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- call_data *calld = arg;
- grpc_pollset *pollset;
- grpc_subchannel_call_create_status call_creation_status;
-
- GPR_TIMER_BEGIN("picked_target", 0);
-
- if (calld->picked_channel == NULL) {
- /* treat this like a cancellation */
- calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
- perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1);
- } else {
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == CALL_CANCELLED) {
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
- } else {
- GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
- calld->state = CALL_WAITING_FOR_CALL;
- pollset = calld->waiting_op.bind_pollset;
- grpc_closure_init(&calld->async_setup_task, started_call, calld);
- call_creation_status = grpc_subchannel_create_call(
- exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call,
- &calld->async_setup_task);
- if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
- started_call_locked(exec_ctx, calld, iomgr_success);
- } else {
- gpr_mu_unlock(&calld->mu_state);
- }
- }
- }
-
- GPR_TIMER_END("picked_target", 0);
-}
-
-static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
- grpc_transport_stream_op *new_op) {
- call_data *calld = elem->call_data;
- grpc_closure *consumed_op = NULL;
- grpc_transport_stream_op *waiting_op = &calld->waiting_op;
- GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
- GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
- if (new_op->send_ops != NULL) {
- waiting_op->send_ops = new_op->send_ops;
- waiting_op->is_last_send = new_op->is_last_send;
- waiting_op->on_done_send = new_op->on_done_send;
- }
- if (new_op->recv_ops != NULL) {
- waiting_op->recv_ops = new_op->recv_ops;
- waiting_op->recv_state = new_op->recv_state;
- waiting_op->on_done_recv = new_op->on_done_recv;
- }
- if (new_op->on_consumed != NULL) {
- if (waiting_op->on_consumed != NULL) {
- consumed_op = waiting_op->on_consumed;
- }
- waiting_op->on_consumed = new_op->on_consumed;
- }
- if (new_op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op->cancel_with_status = new_op->cancel_with_status;
- }
- return consumed_op;
-}
-
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_subchannel_call *subchannel_call;
- char *result;
-
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == CALL_ACTIVE) {
- subchannel_call = calld->subchannel_call;
- GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
- gpr_mu_unlock(&calld->mu_state);
- result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
- return result;
- } else {
- gpr_mu_unlock(&calld->mu_state);
- return grpc_channel_get_target(chand->master);
- }
-}
-
-static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op,
- int continuation) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- grpc_subchannel_call *subchannel_call;
- grpc_lb_policy *lb_policy;
- grpc_transport_stream_op op2;
- GPR_TIMER_BEGIN("perform_transport_stream_op", 0);
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- gpr_mu_lock(&calld->mu_state);
- switch (calld->state) {
- case CALL_ACTIVE:
- GPR_ASSERT(!continuation);
- subchannel_call = calld->subchannel_call;
- gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
- break;
- case CALL_CANCELLED:
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- break;
- case CALL_WAITING_FOR_SEND:
- GPR_ASSERT(!continuation);
- grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
- if (!calld->waiting_op.send_ops &&
- calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
- gpr_mu_unlock(&calld->mu_state);
- break;
- }
- *op = calld->waiting_op;
- memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
- continuation = 1;
- /* fall through */
- case CALL_WAITING_FOR_CONFIG:
- case CALL_WAITING_FOR_PICK:
- case CALL_WAITING_FOR_CALL:
- if (!continuation) {
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- op2 = calld->waiting_op;
- memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
- if (op->on_consumed) {
- calld->waiting_op.on_consumed = op->on_consumed;
- op->on_consumed = NULL;
- } else if (op2.on_consumed) {
- calld->waiting_op.on_consumed = op2.on_consumed;
- op2.on_consumed = NULL;
- }
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- handle_op_after_cancellation(exec_ctx, elem, &op2);
- } else {
- grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
- gpr_mu_unlock(&calld->mu_state);
- }
- break;
- }
- /* fall through */
- case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- } else {
- calld->waiting_op = *op;
-
- if (op->send_ops == NULL) {
- /* need to have some send ops before we can select the
- lb target */
- calld->state = CALL_WAITING_FOR_SEND;
- gpr_mu_unlock(&calld->mu_state);
- } else {
- gpr_mu_lock(&chand->mu_config);
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- grpc_transport_stream_op *waiting_op = &calld->waiting_op;
- grpc_pollset *bind_pollset = waiting_op->bind_pollset;
- grpc_metadata_batch *initial_metadata =
- &waiting_op->send_ops->ops[0].data.metadata;
- GRPC_LB_POLICY_REF(lb_policy, "pick");
- gpr_mu_unlock(&chand->mu_config);
- calld->state = CALL_WAITING_FOR_PICK;
-
- GPR_ASSERT(waiting_op->bind_pollset);
- GPR_ASSERT(waiting_op->send_ops);
- GPR_ASSERT(waiting_op->send_ops->nops >= 1);
- GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA);
- gpr_mu_unlock(&calld->mu_state);
-
- grpc_closure_init(&calld->async_setup_task, picked_target, calld);
- grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset,
- initial_metadata, &calld->picked_channel,
- &calld->async_setup_task);
-
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick");
- } else if (chand->resolver != NULL) {
- calld->state = CALL_WAITING_FOR_CONFIG;
- add_to_lb_policy_wait_queue_locked_state_config(elem);
- if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- chand->started_resolving = 1;
- grpc_resolver_next(exec_ctx, chand->resolver,
- &chand->incoming_configuration,
- &chand->on_config_changed);
- }
- gpr_mu_unlock(&chand->mu_config);
- gpr_mu_unlock(&calld->mu_state);
- } else {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu_config);
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- }
- }
- }
- break;
- }
-
- GPR_TIMER_END("perform_transport_stream_op", 0);
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
+ chand->master);
}
static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
- perform_transport_stream_op(exec_ctx, elem, op, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
}
static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
@@ -593,11 +256,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->connectivity_state = NULL;
}
- if (!is_empty(op, sizeof(*op))) {
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "broadcast");
- }
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "broadcast");
}
if (op->disconnect && chand->resolver != NULL) {
@@ -624,67 +285,110 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
}
-/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+typedef struct {
+ grpc_metadata_batch *initial_metadata;
+ grpc_subchannel **subchannel;
+ grpc_closure *on_ready;
+ grpc_call_element *elem;
+ grpc_closure closure;
+} continue_picking_args;
+
+static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_metadata_batch *initial_metadata,
+ grpc_subchannel **subchannel,
+ grpc_closure *on_ready);
+
+static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ continue_picking_args *cpa = arg;
+ if (!success) {
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
+ } else if (cpa->subchannel == NULL) {
+ /* cancelled, do nothing */
+ } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->subchannel, cpa->on_ready)) {
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1);
+ }
+ gpr_free(cpa);
+}
+
+static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
+ grpc_metadata_batch *initial_metadata,
+ grpc_subchannel **subchannel,
+ grpc_closure *on_ready) {
+ grpc_call_element *elem = elemp;
+ channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
+ continue_picking_args *cpa;
+ grpc_closure *closure;
- /* TODO(ctiller): is there something useful we can do here? */
- GPR_ASSERT(initial_op == NULL);
+ GPR_ASSERT(subchannel);
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- GPR_ASSERT(server_transport_data == NULL);
- gpr_mu_init(&calld->mu_state);
- calld->elem = elem;
- calld->state = CALL_CREATED;
- calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ gpr_mu_lock(&chand->mu_config);
+ if (initial_metadata == NULL) {
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel);
+ }
+ for (closure = chand->waiting_for_config_closures.head; closure != NULL;
+ closure = grpc_closure_next(closure)) {
+ cpa = closure->cb_arg;
+ if (cpa->subchannel == subchannel) {
+ cpa->subchannel = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+ return 1;
+ }
+ if (chand->lb_policy != NULL) {
+ int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
+ initial_metadata, subchannel, on_ready);
+ gpr_mu_unlock(&chand->mu_config);
+ return r;
+ }
+ if (chand->resolver != NULL && !chand->started_resolving) {
+ chand->started_resolving = 1;
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_configuration,
+ &chand->on_config_changed);
+ }
+ cpa = gpr_malloc(sizeof(*cpa));
+ cpa->initial_metadata = initial_metadata;
+ cpa->subchannel = subchannel;
+ cpa->on_ready = on_ready;
+ cpa->elem = elem;
+ grpc_closure_init(&cpa->closure, continue_picking, cpa);
+ grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1);
+ gpr_mu_unlock(&chand->mu_config);
+ return 0;
+}
+
+/* Constructor for call_data */
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem);
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- grpc_subchannel_call *subchannel_call;
-
- /* if the call got activated, we need to destroy the child stack also, and
- remove it from the in-flight requests tracked by the child_entry we
- picked */
- gpr_mu_lock(&calld->mu_state);
- switch (calld->state) {
- case CALL_ACTIVE:
- subchannel_call = calld->subchannel_call;
- gpr_mu_unlock(&calld->mu_state);
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel");
- break;
- case CALL_CREATED:
- case CALL_CANCELLED:
- gpr_mu_unlock(&calld->mu_state);
- break;
- case CALL_WAITING_FOR_PICK:
- case CALL_WAITING_FOR_CONFIG:
- case CALL_WAITING_FOR_CALL:
- case CALL_WAITING_FOR_SEND:
- GPR_UNREACHABLE_CODE(return );
- }
+ grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last) {
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
- GPR_ASSERT(is_last);
+ GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
gpr_mu_init(&chand->mu_config);
- chand->mdctx = metadata_context;
- chand->master = master;
+ chand->mdctx = args->metadata_context;
+ chand->master = args->master;
grpc_pollset_set_init(&chand->pollset_set);
grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
@@ -709,10 +413,16 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_destroy(&chand->mu_config);
}
+static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_pollset *pollset) {
+ call_data *calld = elem->call_data;
+ calld->pollset = pollset;
+}
+
const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, cc_get_peer, "client-channel",
+ init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel",
};
void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index 510677a844..ec6b02381a 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -39,6 +39,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
#include "src/core/channel/compress_filter.h"
+#include "src/core/channel/subchannel_call_holder.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include "src/core/surface/channel.h"
@@ -52,8 +53,6 @@
/** Microchannel (uchannel) implementation: a lightweight channel without any
* load-balancing mechanisms meant for communication from within the core. */
-typedef struct call_data call_data;
-
typedef struct client_uchannel_channel_data {
/** metadata context for this channel */
grpc_mdctx *mdctx;
@@ -80,85 +79,7 @@ typedef struct client_uchannel_channel_data {
gpr_mu mu_state;
} channel_data;
-typedef enum {
- CALL_CREATED,
- CALL_WAITING_FOR_SEND,
- CALL_WAITING_FOR_CALL,
- CALL_ACTIVE,
- CALL_CANCELLED
-} call_state;
-
-struct call_data {
- /* owning element */
- grpc_call_element *elem;
-
- gpr_mu mu_state;
-
- call_state state;
- gpr_timespec deadline;
- grpc_closure async_setup_task;
- grpc_transport_stream_op waiting_op;
- /* our child call stack */
- grpc_subchannel_call *subchannel_call;
- grpc_linked_mdelem status;
- grpc_linked_mdelem details;
-};
-
-static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
- grpc_transport_stream_op *new_op)
- GRPC_MUST_USE_RESULT;
-
-static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- if (op->send_ops) {
- grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
- }
- if (op->recv_ops) {
- char status[GPR_LTOA_MIN_BUFSIZE];
- grpc_metadata_batch mdb;
- gpr_ltoa(GRPC_STATUS_CANCELLED, status);
- calld->status.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
- calld->details.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
- calld->status.prev = calld->details.next = NULL;
- calld->status.next = &calld->details;
- calld->details.prev = &calld->status;
- mdb.list.head = &calld->status;
- mdb.list.tail = &calld->details;
- mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- grpc_sopb_add_metadata(op->recv_ops, mdb);
- *op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
- }
- if (op->on_consumed) {
- op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
- }
-}
-
-typedef struct {
- grpc_closure closure;
- grpc_call_element *elem;
-} waiting_call;
-
-static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op,
- int continuation);
-
-static int is_empty(void *p, int len) {
- char *ptr = p;
- int i;
- for (i = 0; i < len; i++) {
- if (ptr[i] != 0) return 0;
- }
- return 1;
-}
+typedef grpc_subchannel_call_holder call_data;
static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
@@ -171,201 +92,17 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
&chand->connectivity_cb);
}
-static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- call_data *calld = arg;
- grpc_transport_stream_op op;
- int have_waiting;
-
- if (calld->state == CALL_CANCELLED && iomgr_success == 0) {
- have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
- }
- } else if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
- memset(&op, 0, sizeof(op));
- op.cancel_with_status = GRPC_STATUS_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
- } else if (calld->state == CALL_WAITING_FOR_CALL) {
- have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
- if (calld->subchannel_call != NULL) {
- calld->state = CALL_ACTIVE;
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
- &calld->waiting_op);
- }
- } else {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
- }
- }
- } else {
- GPR_ASSERT(calld->state == CALL_CANCELLED);
- gpr_mu_unlock(&calld->mu_state);
- have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
- if (have_waiting) {
- handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
- }
- }
-}
-
-static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- call_data *calld = arg;
- gpr_mu_lock(&calld->mu_state);
- started_call_locked(exec_ctx, arg, iomgr_success);
-}
-
-static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
- grpc_transport_stream_op *new_op) {
- call_data *calld = elem->call_data;
- grpc_closure *consumed_op = NULL;
- grpc_transport_stream_op *waiting_op = &calld->waiting_op;
- GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
- GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
- if (new_op->send_ops != NULL) {
- waiting_op->send_ops = new_op->send_ops;
- waiting_op->is_last_send = new_op->is_last_send;
- waiting_op->on_done_send = new_op->on_done_send;
- }
- if (new_op->recv_ops != NULL) {
- waiting_op->recv_ops = new_op->recv_ops;
- waiting_op->recv_state = new_op->recv_state;
- waiting_op->on_done_recv = new_op->on_done_recv;
- }
- if (new_op->on_consumed != NULL) {
- if (waiting_op->on_consumed != NULL) {
- consumed_op = waiting_op->on_consumed;
- }
- waiting_op->on_consumed = new_op->on_consumed;
- }
- if (new_op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op->cancel_with_status = new_op->cancel_with_status;
- }
- return consumed_op;
-}
-
static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- grpc_subchannel_call *subchannel_call;
- char *result;
-
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == CALL_ACTIVE) {
- subchannel_call = calld->subchannel_call;
- GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
- gpr_mu_unlock(&calld->mu_state);
- result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
- return result;
- } else {
- gpr_mu_unlock(&calld->mu_state);
- return grpc_channel_get_target(chand->master);
- }
-}
-
-static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op,
- int continuation) {
- call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_subchannel_call *subchannel_call;
- grpc_transport_stream_op op2;
- GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- gpr_mu_lock(&calld->mu_state);
- /* make sure the wrapped subchannel has been set (see
- * grpc_client_uchannel_set_subchannel) */
- GPR_ASSERT(chand->subchannel != NULL);
-
- switch (calld->state) {
- case CALL_ACTIVE:
- GPR_ASSERT(!continuation);
- subchannel_call = calld->subchannel_call;
- gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
- break;
- case CALL_CANCELLED:
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- break;
- case CALL_WAITING_FOR_SEND:
- GPR_ASSERT(!continuation);
- grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
- if (!calld->waiting_op.send_ops &&
- calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
- gpr_mu_unlock(&calld->mu_state);
- break;
- }
- *op = calld->waiting_op;
- memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
- continuation = 1;
- /* fall through */
- case CALL_WAITING_FOR_CALL:
- if (!continuation) {
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- op2 = calld->waiting_op;
- memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
- if (op->on_consumed) {
- calld->waiting_op.on_consumed = op->on_consumed;
- op->on_consumed = NULL;
- } else if (op2.on_consumed) {
- calld->waiting_op.on_consumed = op2.on_consumed;
- op2.on_consumed = NULL;
- }
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- handle_op_after_cancellation(exec_ctx, elem, &op2);
- grpc_subchannel_cancel_waiting_call(exec_ctx, chand->subchannel, 1);
- } else {
- grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
- gpr_mu_unlock(&calld->mu_state);
- }
- break;
- }
- /* fall through */
- case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- } else {
- calld->waiting_op = *op;
- if (op->send_ops == NULL) {
- calld->state = CALL_WAITING_FOR_SEND;
- gpr_mu_unlock(&calld->mu_state);
- } else {
- grpc_subchannel_call_create_status call_creation_status;
- grpc_pollset *pollset = calld->waiting_op.bind_pollset;
- calld->state = CALL_WAITING_FOR_CALL;
- grpc_closure_init(&calld->async_setup_task, started_call, calld);
- call_creation_status = grpc_subchannel_create_call(
- exec_ctx, chand->subchannel, pollset, &calld->subchannel_call,
- &calld->async_setup_task);
- if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
- started_call_locked(exec_ctx, calld, 1);
- } else {
- gpr_mu_unlock(&calld->mu_state);
- }
- }
- }
- break;
- }
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
+ chand->master);
}
static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
- perform_transport_stream_op(exec_ctx, elem, op, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
}
static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -392,64 +129,40 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
}
+static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_metadata_batch *initial_metadata,
+ grpc_subchannel **subchannel,
+ grpc_closure *on_ready) {
+ channel_data *chand = arg;
+ GPR_ASSERT(initial_metadata != NULL);
+ *subchannel = chand->subchannel;
+ return 1;
+}
+
/* Constructor for call_data */
static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
- call_data *calld = elem->call_data;
- memset(calld, 0, sizeof(call_data));
-
- /* TODO(ctiller): is there something useful we can do here? */
- GPR_ASSERT(initial_op == NULL);
-
- GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- GPR_ASSERT(server_transport_data == NULL);
- gpr_mu_init(&calld->mu_state);
- calld->elem = elem;
- calld->state = CALL_CREATED;
- calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ grpc_call_element_args *args) {
+ grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
+ elem->channel_data);
}
/* Destructor for call_data */
static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- grpc_subchannel_call *subchannel_call;
-
- /* if the call got activated, we need to destroy the child stack also, and
- remove it from the in-flight requests tracked by the child_entry we
- picked */
- gpr_mu_lock(&calld->mu_state);
- switch (calld->state) {
- case CALL_ACTIVE:
- subchannel_call = calld->subchannel_call;
- gpr_mu_unlock(&calld->mu_state);
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_uchannel");
- break;
- case CALL_CREATED:
- case CALL_CANCELLED:
- gpr_mu_unlock(&calld->mu_state);
- break;
- case CALL_WAITING_FOR_CALL:
- case CALL_WAITING_FOR_SEND:
- GPR_UNREACHABLE_CODE(return );
- }
+ grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
}
/* Constructor for channel_data */
static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
- grpc_channel *master,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last) {
+ grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
- GPR_ASSERT(is_last);
+ GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- chand->mdctx = metadata_context;
- chand->master = master;
+ chand->mdctx = args->metadata_context;
+ chand->master = args->master;
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_uchannel");
gpr_mu_init(&chand->mu_state);
@@ -465,17 +178,17 @@ static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_destroy(&chand->mu_state);
}
+static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_pollset *pollset) {
+ call_data *calld = elem->call_data;
+ calld->pollset = pollset;
+}
+
const grpc_channel_filter grpc_client_uchannel_filter = {
- cuc_start_transport_stream_op,
- cuc_start_transport_op,
- sizeof(call_data),
- cuc_init_call_elem,
- cuc_destroy_call_elem,
- sizeof(channel_data),
- cuc_init_channel_elem,
- cuc_destroy_channel_elem,
- cuc_get_peer,
- "client-uchannel",
+ cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data),
+ cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem,
+ sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem,
+ cuc_get_peer, "client-uchannel",
};
grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 20b5084044..f23d8052f3 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -50,13 +50,20 @@ typedef struct call_data {
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
gpr_uint32 remaining_slice_bytes;
- /**< Input data to be read, as per BEGIN_MESSAGE */
- int written_initial_metadata; /**< Already processed initial md? */
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
/** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
+
+ grpc_transport_stream_op send_op;
+ gpr_uint32 send_length;
+ gpr_uint32 send_flags;
+ gpr_slice incoming_slice;
+ grpc_slice_buffer_stream replacement_stream;
+ grpc_closure *post_send;
+ grpc_closure send_done;
+ grpc_closure got_slice;
} call_data;
typedef struct channel_data {
@@ -76,24 +83,6 @@ typedef struct channel_data {
grpc_compression_options compression_options;
} channel_data;
-/** Compress \a slices in place using \a algorithm. Returns 1 if compression did
- * actually happen, 0 otherwise (for example if the compressed output size was
- * larger than the raw input).
- *
- * Returns 1 if the data was actually compress and 0 otherwise. */
-static int compress_send_sb(grpc_compression_algorithm algorithm,
- gpr_slice_buffer *slices) {
- int did_compress;
- gpr_slice_buffer tmp;
- gpr_slice_buffer_init(&tmp);
- did_compress = grpc_msg_compress(algorithm, slices, &tmp);
- if (did_compress) {
- gpr_slice_buffer_swap(slices, &tmp);
- }
- gpr_slice_buffer_destroy(&tmp);
- return did_compress;
-}
-
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
@@ -127,7 +116,9 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static int skip_compression(channel_data *channeld, call_data *calld) {
+static int skip_compression(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
if (calld->has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
return 1;
@@ -138,169 +129,127 @@ static int skip_compression(channel_data *channeld, call_data *calld) {
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
}
-/** Assembles a new grpc_stream_op_buffer with the compressed slices, modifying
- * the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length,
- * flags indicating compression is in effect) and replaces \a send_ops with it.
- * */
-static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
- grpc_call_element *elem) {
- size_t i;
+/** Filter initial metadata */
+static void process_send_initial_metadata(
+ grpc_call_element *elem, grpc_metadata_batch *initial_metadata) {
call_data *calld = elem->call_data;
- int new_slices_added = 0; /* GPR_FALSE */
- grpc_metadata_batch metadata;
- grpc_stream_op_buffer new_send_ops;
- grpc_sopb_init(&new_send_ops);
-
- for (i = 0; i < send_ops->nops; i++) {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type) {
- case GRPC_OP_BEGIN_MESSAGE:
- GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX);
- grpc_sopb_add_begin_message(
- &new_send_ops, (gpr_uint32)calld->slices.length,
- sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
- break;
- case GRPC_OP_SLICE:
- /* Once we reach the slices section of the original buffer, simply add
- * all the new (compressed) slices. We obviously want to do this only
- * once, hence the "new_slices_added" guard. */
- if (!new_slices_added) {
- size_t j;
- for (j = 0; j < calld->slices.count; ++j) {
- grpc_sopb_add_slice(&new_send_ops,
- gpr_slice_ref(calld->slices.slices[j]));
- }
- new_slices_added = 1; /* GPR_TRUE */
- }
- break;
- case GRPC_OP_METADATA:
- /* move the metadata to the new buffer. */
- grpc_metadata_batch_move(&metadata, &sop->data.metadata);
- grpc_sopb_add_metadata(&new_send_ops, metadata);
- break;
- case GRPC_NO_OP:
- break;
- }
+ channel_data *channeld = elem->channel_data;
+ /* Parse incoming request for compression. If any, it'll be available
+ * at calld->compression_algorithm */
+ grpc_metadata_batch_filter(initial_metadata, compression_md_filter, elem);
+ if (!calld->has_compression_algorithm) {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm = channeld->default_compression_algorithm;
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
- grpc_sopb_swap(send_ops, &new_send_ops);
- grpc_sopb_destroy(&new_send_ops);
+ /* hint compression algorithm */
+ grpc_metadata_batch_add_tail(
+ initial_metadata, &calld->compression_algorithm_storage,
+ GRPC_MDELEM_REF(
+ channeld
+ ->mdelem_compression_algorithms[calld->compression_algorithm]));
+
+ /* convey supported compression algorithms */
+ grpc_metadata_batch_add_tail(
+ initial_metadata, &calld->accept_encoding_storage,
+ GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
}
-/** Filter's "main" function, called for any incoming grpc_transport_stream_op
- * instance that holds a non-zero number of send operations, accesible to this
- * function in \a send_ops. */
-static void process_send_ops(grpc_call_element *elem,
- grpc_stream_op_buffer *send_ops) {
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- size_t i;
- int did_compress = 0;
+static void continue_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem);
- /* In streaming calls, we need to reset the previously accumulated slices */
+static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
gpr_slice_buffer_reset_and_unref(&calld->slices);
- for (i = 0; i < send_ops->nops; ++i) {
- grpc_stream_op *sop = &send_ops->ops[i];
- switch (sop->type) {
- case GRPC_OP_BEGIN_MESSAGE:
- /* buffer up slices until we've processed all the expected ones (as
- * given by GRPC_OP_BEGIN_MESSAGE) */
- calld->remaining_slice_bytes = sop->data.begin_message.length;
- if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- break;
- case GRPC_OP_METADATA:
- if (!calld->written_initial_metadata) {
- /* Parse incoming request for compression. If any, it'll be available
- * at calld->compression_algorithm */
- grpc_metadata_batch_filter(&(sop->data.metadata),
- compression_md_filter, elem);
- if (!calld->has_compression_algorithm) {
- /* If no algorithm was found in the metadata and we aren't
- * exceptionally skipping compression, fall back to the channel
- * default */
- calld->compression_algorithm =
- channeld->default_compression_algorithm;
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
- }
- /* hint compression algorithm */
- grpc_metadata_batch_add_tail(
- &(sop->data.metadata), &calld->compression_algorithm_storage,
- GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
- [calld->compression_algorithm]));
-
- /* convey supported compression algorithms */
- grpc_metadata_batch_add_tail(
- &(sop->data.metadata), &calld->accept_encoding_storage,
- GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
-
- calld->written_initial_metadata = 1; /* GPR_TRUE */
- }
- break;
- case GRPC_OP_SLICE:
- if (skip_compression(channeld, calld)) continue;
- GPR_ASSERT(calld->remaining_slice_bytes > 0);
- /* Increase input ref count, gpr_slice_buffer_add takes ownership. */
- gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice));
- GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) <=
- calld->remaining_slice_bytes);
- calld->remaining_slice_bytes -=
- (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice);
- if (calld->remaining_slice_bytes == 0) {
- did_compress =
- compress_send_sb(calld->compression_algorithm, &calld->slices);
- }
- break;
- case GRPC_NO_OP:
- break;
- }
- }
+ calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, success);
+}
- /* Modify the send_ops stream_op_buffer depending on whether compression was
- * carried out */
+static void finish_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ int did_compress;
+ gpr_slice_buffer tmp;
+ gpr_slice_buffer_init(&tmp);
+ did_compress =
+ grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
- finish_compressed_sopb(send_ops, elem);
+ gpr_slice_buffer_swap(&calld->slices, &tmp);
+ calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ }
+ gpr_slice_buffer_destroy(&tmp);
+
+ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
+ calld->send_flags);
+ calld->send_op.send_message = &calld->replacement_stream.base;
+ calld->post_send = calld->send_op.on_complete;
+ calld->send_op.on_complete = &calld->send_done;
+
+ grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+}
+
+static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ finish_send_message(exec_ctx, elem);
+ } else {
+ continue_send_message(exec_ctx, elem);
+ }
+}
+
+static void continue_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
+ &calld->incoming_slice, ~(size_t)0,
+ &calld->got_slice)) {
+ gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ finish_send_message(exec_ctx, elem);
+ break;
+ }
}
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
+ call_data *calld = elem->call_data;
+
GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0);
- if (op->send_ops && op->send_ops->nops > 0) {
- process_send_ops(elem, op->send_ops);
+ if (op->send_initial_metadata) {
+ process_send_initial_metadata(elem, op->send_initial_metadata);
+ }
+ if (op->send_message != NULL && !skip_compression(elem) &&
+ 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
+ calld->send_op = *op;
+ calld->send_length = op->send_message->length;
+ calld->send_flags = op->send_message->flags;
+ continue_send_message(exec_ctx, elem);
+ } else {
+ /* pass control down the stack */
+ grpc_call_next_op(exec_ctx, elem, op);
}
GPR_TIMER_END("compress_start_transport_stream_op", 0);
-
- /* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
}
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
gpr_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
- calld->written_initial_metadata = 0; /* GPR_FALSE */
-
- if (initial_op) {
- if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
- process_send_ops(elem, initial_op->send_ops);
- }
- }
+ grpc_closure_init(&calld->got_slice, got_slice, elem);
+ grpc_closure_init(&calld->send_done, send_done, elem);
}
/* Destructor for call_data */
@@ -313,9 +262,8 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
@@ -325,24 +273,25 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_compression_options_init(&channeld->compression_options);
channeld->compression_options.enabled_algorithms_bitset =
- (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args);
+ (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(
+ args->channel_args);
channeld->default_compression_algorithm =
- grpc_channel_args_get_compression_algorithm(args);
+ grpc_channel_args_get_compression_algorithm(args->channel_args);
/* Make sure the default isn't disabled. */
GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
&channeld->compression_options, channeld->default_compression_algorithm));
channeld->compression_options.default_compression_algorithm =
channeld->default_compression_algorithm;
- channeld->mdstr_request_compression_algorithm_key =
- grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
+ channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string(
+ args->metadata_context, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
channeld->mdstr_outgoing_compression_algorithm_key =
- grpc_mdstr_from_string(mdctx, "grpc-encoding");
+ grpc_mdstr_from_string(args->metadata_context, "grpc-encoding");
channeld->mdstr_compression_capabilities_key =
- grpc_mdstr_from_string(mdctx, "grpc-accept-encoding");
+ grpc_mdstr_from_string(args->metadata_context, "grpc-accept-encoding");
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorithm_name;
@@ -354,9 +303,9 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings(
- mdctx,
+ args->metadata_context,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
- grpc_mdstr_from_string(mdctx, algorithm_name));
+ grpc_mdstr_from_string(args->metadata_context, algorithm_name));
if (algo_idx > 0) {
supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
}
@@ -369,11 +318,12 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
&accept_encoding_str_len);
channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
- mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
- grpc_mdstr_from_string(mdctx, accept_encoding_str));
+ args->metadata_context,
+ GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
+ grpc_mdstr_from_string(args->metadata_context, accept_encoding_str));
gpr_free(accept_encoding_str);
- GPR_ASSERT(!is_last);
+ GPR_ASSERT(!args->is_last);
}
/* Destructor for channel data */
@@ -393,5 +343,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter grpc_compress_filter = {
compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, grpc_call_next_get_peer, "compress"};
+ init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ grpc_call_next_get_peer, "compress"};
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 6d4d7be632..0e1efd965a 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -83,8 +83,7 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
@@ -92,10 +91,18 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
r = grpc_transport_init_stream(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- server_transport_data, initial_op);
+ args->refcount, args->server_transport_data);
GPR_ASSERT(r == 0);
}
+static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_pollset *pollset) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_transport_set_pollset(exec_ctx, chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollset);
+}
+
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
@@ -108,11 +115,10 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(is_last);
+ GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
}
@@ -132,8 +138,8 @@ static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
const grpc_channel_filter grpc_connected_channel_filter = {
con_start_transport_stream_op, con_start_transport_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, con_get_peer, "connected",
+ init_call_elem, set_pollset, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, con_get_peer, "connected",
};
void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
@@ -154,3 +160,8 @@ void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
channel. */
channel_stack->call_stack_size += grpc_transport_stream_size(transport);
}
+
+grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ return TRANSPORT_STREAM_FROM_CALL_DATA(calld);
+}
diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h
index eac6eb7ebe..95c1834bfa 100644
--- a/src/core/channel/connected_channel.h
+++ b/src/core/channel/connected_channel.h
@@ -46,4 +46,6 @@ extern const grpc_channel_filter grpc_connected_channel_filter;
void grpc_connected_channel_bind_transport(grpc_channel_stack* channel_stack,
grpc_transport* transport);
+grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem);
+
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index f78a5cc315..3a0f68f30f 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -45,10 +45,8 @@ typedef struct call_data {
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent;
- int sent_initial_metadata;
- int got_initial_metadata;
- grpc_stream_op_buffer *recv_ops;
+ grpc_metadata_batch *recv_initial_metadata;
/** Closure to call when finished with the hc_on_recv hook */
grpc_closure *on_done_recv;
@@ -91,18 +89,11 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- size_t i;
- size_t nops = calld->recv_ops->nops;
- grpc_stream_op *ops = calld->recv_ops->ops;
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- client_recv_filter_args a;
- if (op->type != GRPC_OP_METADATA) continue;
- calld->got_initial_metadata = 1;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
- grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a);
- }
+ client_recv_filter_args a;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter,
+ &a);
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
}
@@ -123,40 +114,29 @@ static void hc_mutate_op(grpc_call_element *elem,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- size_t i;
- if (op->send_ops && !calld->sent_initial_metadata) {
- size_t nops = op->send_ops->nops;
- grpc_stream_op *ops = op->send_ops->ops;
- for (i = 0; i < nops; i++) {
- grpc_stream_op *stream_op = &ops[i];
- if (stream_op->type != GRPC_OP_METADATA) continue;
- calld->sent_initial_metadata = 1;
- grpc_metadata_batch_filter(&stream_op->data.metadata, client_strip_filter,
- elem);
- /* Send : prefixed headers, which have to be before any application
- layer headers. */
- grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->method,
- GRPC_MDELEM_REF(channeld->method));
- grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->scheme,
- GRPC_MDELEM_REF(channeld->scheme));
- grpc_metadata_batch_add_tail(&stream_op->data.metadata,
- &calld->te_trailers,
- GRPC_MDELEM_REF(channeld->te_trailers));
- grpc_metadata_batch_add_tail(&stream_op->data.metadata,
- &calld->content_type,
- GRPC_MDELEM_REF(channeld->content_type));
- grpc_metadata_batch_add_tail(&stream_op->data.metadata,
- &calld->user_agent,
- GRPC_MDELEM_REF(channeld->user_agent));
- break;
- }
+ if (op->send_initial_metadata != NULL) {
+ grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter,
+ elem);
+ /* Send : prefixed headers, which have to be before any application
+ layer headers. */
+ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
+ GRPC_MDELEM_REF(channeld->method));
+ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
+ GRPC_MDELEM_REF(channeld->scheme));
+ grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
+ GRPC_MDELEM_REF(channeld->te_trailers));
+ grpc_metadata_batch_add_tail(op->send_initial_metadata,
+ &calld->content_type,
+ GRPC_MDELEM_REF(channeld->content_type));
+ grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->user_agent,
+ GRPC_MDELEM_REF(channeld->user_agent));
}
- if (op->recv_ops && !calld->got_initial_metadata) {
+ if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
- calld->recv_ops = op->recv_ops;
- calld->on_done_recv = op->on_done_recv;
- op->on_done_recv = &calld->hc_on_recv;
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ calld->on_done_recv = op->on_complete;
+ op->on_complete = &calld->hc_on_recv;
}
}
@@ -172,14 +152,10 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- calld->sent_initial_metadata = 0;
- calld->got_initial_metadata = 0;
calld->on_done_recv = NULL;
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
- if (initial_op) hc_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -250,28 +226,31 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx,
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *channel_args,
- grpc_mdctx *mdctx, int is_first, int is_last) {
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT(!is_last);
+ GPR_ASSERT(!args->is_last);
/* initialize members */
- channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
- channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme",
- scheme_from_args(channel_args));
- channeld->content_type =
- grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
- channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200");
+ channeld->te_trailers =
+ grpc_mdelem_from_strings(args->metadata_context, "te", "trailers");
+ channeld->method =
+ grpc_mdelem_from_strings(args->metadata_context, ":method", "POST");
+ channeld->scheme = grpc_mdelem_from_strings(
+ args->metadata_context, ":scheme", scheme_from_args(args->channel_args));
+ channeld->content_type = grpc_mdelem_from_strings(
+ args->metadata_context, "content-type", "application/grpc");
+ channeld->status =
+ grpc_mdelem_from_strings(args->metadata_context, ":status", "200");
channeld->user_agent = grpc_mdelem_from_metadata_strings(
- mdctx, grpc_mdstr_from_string(mdctx, "user-agent"),
- user_agent_from_args(mdctx, channel_args));
+ args->metadata_context,
+ grpc_mdstr_from_string(args->metadata_context, "user-agent"),
+ user_agent_from_args(args->metadata_context, args->channel_args));
}
/* Destructor for channel data */
@@ -290,6 +269,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter grpc_http_client_filter = {
hc_start_transport_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
- "http-client"};
+ init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ grpc_call_next_get_peer, "http-client"};
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 99e5066a4e..2adfe2bb61 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -39,7 +39,6 @@
#include "src/core/profiling/timers.h"
typedef struct call_data {
- gpr_uint8 got_initial_metadata;
gpr_uint8 seen_path;
gpr_uint8 seen_post;
gpr_uint8 sent_status;
@@ -49,7 +48,7 @@ typedef struct call_data {
grpc_linked_mdelem status;
grpc_linked_mdelem content_type;
- grpc_stream_op_buffer *recv_ops;
+ grpc_metadata_batch *recv_initial_metadata;
/** Closure to call when finished with the hs_on_recv hook */
grpc_closure *on_done_recv;
/** Receive closures are chained: we inject this closure as the on_done_recv
@@ -154,43 +153,35 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (success) {
- size_t i;
- size_t nops = calld->recv_ops->nops;
- grpc_stream_op *ops = calld->recv_ops->ops;
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- server_filter_args a;
- if (op->type != GRPC_OP_METADATA) continue;
- calld->got_initial_metadata = 1;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
- grpc_metadata_batch_filter(&op->data.metadata, server_filter, &a);
- /* Have we seen the required http2 transport headers?
- (:method, :scheme, content-type, with :path and :authority covered
- at the channel level right now) */
- if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
- calld->seen_path && calld->seen_authority) {
- /* do nothing */
- } else {
- if (!calld->seen_path) {
- gpr_log(GPR_ERROR, "Missing :path header");
- }
- if (!calld->seen_authority) {
- gpr_log(GPR_ERROR, "Missing :authority header");
- }
- if (!calld->seen_post) {
- gpr_log(GPR_ERROR, "Missing :method header");
- }
- if (!calld->seen_scheme) {
- gpr_log(GPR_ERROR, "Missing :scheme header");
- }
- if (!calld->seen_te_trailers) {
- gpr_log(GPR_ERROR, "Missing te trailers header");
- }
- /* Error this call out */
- success = 0;
- grpc_call_element_send_cancel(exec_ctx, elem);
+ server_filter_args a;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, &a);
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path && calld->seen_authority) {
+ /* do nothing */
+ } else {
+ if (!calld->seen_path) {
+ gpr_log(GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_authority) {
+ gpr_log(GPR_ERROR, "Missing :authority header");
+ }
+ if (!calld->seen_post) {
+ gpr_log(GPR_ERROR, "Missing :method header");
}
+ if (!calld->seen_scheme) {
+ gpr_log(GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers) {
+ gpr_log(GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ success = 0;
+ grpc_call_element_send_cancel(exec_ctx, elem);
}
}
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
@@ -201,29 +192,21 @@ static void hs_mutate_op(grpc_call_element *elem,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- size_t i;
- if (op->send_ops && !calld->sent_status) {
- size_t nops = op->send_ops->nops;
- grpc_stream_op *ops = op->send_ops->ops;
- for (i = 0; i < nops; i++) {
- grpc_stream_op *stream_op = &ops[i];
- if (stream_op->type != GRPC_OP_METADATA) continue;
- calld->sent_status = 1;
- grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->status,
- GRPC_MDELEM_REF(channeld->status_ok));
- grpc_metadata_batch_add_tail(&stream_op->data.metadata,
- &calld->content_type,
- GRPC_MDELEM_REF(channeld->content_type));
- break;
- }
+ if (op->send_initial_metadata != NULL && !calld->sent_status) {
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->status,
+ GRPC_MDELEM_REF(channeld->status_ok));
+ grpc_metadata_batch_add_tail(op->send_initial_metadata,
+ &calld->content_type,
+ GRPC_MDELEM_REF(channeld->content_type));
}
- if (op->recv_ops && !calld->got_initial_metadata) {
+ if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
- calld->recv_ops = op->recv_ops;
- calld->on_done_recv = op->on_done_recv;
- op->on_done_recv = &calld->hs_on_recv;
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ calld->on_done_recv = op->on_complete;
+ op->on_complete = &calld->hs_on_recv;
}
}
@@ -239,14 +222,12 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
- if (initial_op) hs_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -255,34 +236,39 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
+ GPR_ASSERT(!args->is_last);
/* initialize members */
- channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
- channeld->status_ok = grpc_mdelem_from_strings(mdctx, ":status", "200");
+ channeld->te_trailers =
+ grpc_mdelem_from_strings(args->metadata_context, "te", "trailers");
+ channeld->status_ok =
+ grpc_mdelem_from_strings(args->metadata_context, ":status", "200");
channeld->status_not_found =
- grpc_mdelem_from_strings(mdctx, ":status", "404");
- channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http");
- channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https");
- channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
- channeld->path_key = grpc_mdstr_from_string(mdctx, ":path");
- channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority");
- channeld->host_key = grpc_mdstr_from_string(mdctx, "host");
- channeld->content_type =
- grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
+ grpc_mdelem_from_strings(args->metadata_context, ":status", "404");
+ channeld->method_post =
+ grpc_mdelem_from_strings(args->metadata_context, ":method", "POST");
+ channeld->http_scheme =
+ grpc_mdelem_from_strings(args->metadata_context, ":scheme", "http");
+ channeld->https_scheme =
+ grpc_mdelem_from_strings(args->metadata_context, ":scheme", "https");
+ channeld->grpc_scheme =
+ grpc_mdelem_from_strings(args->metadata_context, ":scheme", "grpc");
+ channeld->path_key = grpc_mdstr_from_string(args->metadata_context, ":path");
+ channeld->authority_key =
+ grpc_mdstr_from_string(args->metadata_context, ":authority");
+ channeld->host_key = grpc_mdstr_from_string(args->metadata_context, "host");
+ channeld->content_type = grpc_mdelem_from_strings(
+ args->metadata_context, "content-type", "application/grpc");
- channeld->mdctx = mdctx;
+ channeld->mdctx = args->metadata_context;
}
/* Destructor for channel data */
@@ -306,6 +292,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter grpc_http_server_filter = {
hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
- "http-server"};
+ init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ grpc_call_next_get_peer, "http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index 48f6b1c650..2fbf1c06bb 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -73,16 +73,13 @@ static void noop_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
/* initialize members */
calld->unused = channeld->unused;
-
- if (initial_op) noop_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -91,17 +88,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- /* The first and the last filters tend to be implemented differently to
- handle the case that there's no 'next' filter to call on the up or down
+ /* The last filter tends to be implemented differently to
+ handle the case that there's no 'next' filter to call on the down
path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
+ GPR_ASSERT(!args->is_last);
/* initialize members */
channeld->unused = 0;
@@ -118,5 +113,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
const grpc_channel_filter grpc_no_op_filter = {
noop_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, grpc_call_next_get_peer, "no-op"};
+ init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ grpc_call_next_get_peer, "no-op"};
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
new file mode 100644
index 0000000000..7251714519
--- /dev/null
+++ b/src/core/channel/subchannel_call_holder.c
@@ -0,0 +1,283 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/subchannel_call_holder.h"
+
+#include <grpc/support/alloc.h>
+
+#include "src/core/profiling/timers.h"
+
+#define GET_CALL(holder) \
+ ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call)))
+
+#define CANCELLED_CALL ((grpc_subchannel_call *)1)
+
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
+ int success);
+static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success);
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
+ int success);
+
+static void add_waiting_locked(grpc_subchannel_call_holder *holder,
+ grpc_transport_stream_op *op);
+static void fail_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder);
+static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder);
+
+void grpc_subchannel_call_holder_init(
+ grpc_subchannel_call_holder *holder,
+ grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
+ void *pick_subchannel_arg) {
+ gpr_atm_rel_store(&holder->subchannel_call, 0);
+ holder->pick_subchannel = pick_subchannel;
+ holder->pick_subchannel_arg = pick_subchannel_arg;
+ gpr_mu_init(&holder->mu);
+ holder->subchannel = NULL;
+ holder->waiting_ops = NULL;
+ holder->waiting_ops_count = 0;
+ holder->waiting_ops_capacity = 0;
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+}
+
+void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder) {
+ grpc_subchannel_call *call = GET_CALL(holder);
+ if (call != NULL && call != CANCELLED_CALL) {
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder");
+ }
+ GPR_ASSERT(holder->creation_phase ==
+ GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
+ gpr_mu_destroy(&holder->mu);
+ GPR_ASSERT(holder->waiting_ops_count == 0);
+ gpr_free(holder->waiting_ops);
+}
+
+void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder,
+ grpc_transport_stream_op *op) {
+ /* try to (atomically) get the call */
+ grpc_subchannel_call *call = GET_CALL(holder);
+ GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
+ if (call == CANCELLED_CALL) {
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ if (call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, call, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ /* we failed; lock and figure out what to do */
+ gpr_mu_lock(&holder->mu);
+retry:
+ /* need to recheck that another thread hasn't set the call */
+ call = GET_CALL(holder);
+ if (call == CANCELLED_CALL) {
+ gpr_mu_unlock(&holder->mu);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ if (call != NULL) {
+ gpr_mu_unlock(&holder->mu);
+ grpc_subchannel_call_process_op(exec_ctx, call, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ /* if this is a cancellation, then we can raise our cancelled flag */
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
+ goto retry;
+ } else {
+ switch (holder->creation_phase) {
+ case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
+ fail_locked(exec_ctx, holder);
+ break;
+ case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL:
+ grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel,
+ &holder->subchannel_call);
+ break;
+ case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
+ holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
+ &holder->subchannel, NULL);
+ break;
+ }
+ gpr_mu_unlock(&holder->mu);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+ return;
+ }
+ }
+ /* if we don't have a subchannel, try to get one */
+ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
+ holder->subchannel == NULL && op->send_initial_metadata != NULL) {
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
+ grpc_closure_init(&holder->next_step, subchannel_ready, holder);
+ if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg,
+ op->send_initial_metadata, &holder->subchannel,
+ &holder->next_step)) {
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ }
+ }
+ /* if we've got a subchannel, then let's ask it to create a call */
+ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
+ holder->subchannel != NULL) {
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL;
+ grpc_closure_init(&holder->next_step, call_ready, holder);
+ if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
+ holder->pollset, &holder->subchannel_call,
+ &holder->next_step)) {
+ /* got one immediately - continue the op (and any waiting ops) */
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ retry_waiting_locked(exec_ctx, holder);
+ goto retry;
+ }
+ }
+ /* nothing to be done but wait */
+ add_waiting_locked(holder, op);
+ gpr_mu_unlock(&holder->mu);
+ GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
+}
+
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ grpc_subchannel_call_holder *holder = arg;
+ grpc_subchannel_call *call;
+ gpr_mu_lock(&holder->mu);
+ GPR_ASSERT(holder->creation_phase ==
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
+ call = GET_CALL(holder);
+ GPR_ASSERT(call == NULL || call == CANCELLED_CALL);
+ if (holder->subchannel == NULL) {
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ fail_locked(exec_ctx, holder);
+ } else {
+ grpc_closure_init(&holder->next_step, call_ready, holder);
+ if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
+ holder->pollset, &holder->subchannel_call,
+ &holder->next_step)) {
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ /* got one immediately - continue the op (and any waiting ops) */
+ retry_waiting_locked(exec_ctx, holder);
+ }
+ }
+ gpr_mu_unlock(&holder->mu);
+}
+
+static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ grpc_subchannel_call_holder *holder = arg;
+ GPR_TIMER_BEGIN("call_ready", 0);
+ gpr_mu_lock(&holder->mu);
+ GPR_ASSERT(holder->creation_phase ==
+ GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL);
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ if (GET_CALL(holder) != NULL) {
+ retry_waiting_locked(exec_ctx, holder);
+ } else {
+ fail_locked(exec_ctx, holder);
+ }
+ gpr_mu_unlock(&holder->mu);
+ GPR_TIMER_END("call_ready", 0);
+}
+
+typedef struct {
+ grpc_transport_stream_op *ops;
+ size_t nops;
+ grpc_subchannel_call *call;
+} retry_ops_args;
+
+static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder) {
+ retry_ops_args *a = gpr_malloc(sizeof(*a));
+ a->ops = holder->waiting_ops;
+ a->nops = holder->waiting_ops_count;
+ a->call = GET_CALL(holder);
+ if (a->call == CANCELLED_CALL) {
+ gpr_free(a);
+ fail_locked(exec_ctx, holder);
+ return;
+ }
+ holder->waiting_ops = NULL;
+ holder->waiting_ops_count = 0;
+ holder->waiting_ops_capacity = 0;
+ GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1);
+}
+
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) {
+ retry_ops_args *a = args;
+ size_t i;
+ for (i = 0; i < a->nops; i++) {
+ grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
+ }
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
+ gpr_free(a->ops);
+ gpr_free(a);
+}
+
+static void add_waiting_locked(grpc_subchannel_call_holder *holder,
+ grpc_transport_stream_op *op) {
+ GPR_TIMER_BEGIN("add_waiting_locked", 0);
+ if (holder->waiting_ops_count == holder->waiting_ops_capacity) {
+ holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity);
+ holder->waiting_ops =
+ gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity *
+ sizeof(*holder->waiting_ops));
+ }
+ holder->waiting_ops[holder->waiting_ops_count++] = *op;
+ GPR_TIMER_END("add_waiting_locked", 0);
+}
+
+static void fail_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder) {
+ size_t i;
+ for (i = 0; i < holder->waiting_ops_count; i++) {
+ grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
+ 0);
+ }
+ holder->waiting_ops_count = 0;
+}
+
+char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder,
+ grpc_channel *master) {
+ grpc_subchannel_call *subchannel_call = GET_CALL(holder);
+
+ if (subchannel_call) {
+ return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
+ } else {
+ return grpc_channel_get_target(master);
+ }
+}
diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h
new file mode 100644
index 0000000000..3dd43c9c3f
--- /dev/null
+++ b/src/core/channel/subchannel_call_holder.h
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
+#define GRPC_INTERNAL_CORE_CHANNEL_SUBCHANNEL_CALL_HOLDER_H
+
+#include "src/core/client_config/subchannel.h"
+
+typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
+ grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
+ grpc_subchannel **subchannel, grpc_closure *on_ready);
+
+typedef enum {
+ GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL,
+ GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL
+} grpc_subchannel_call_holder_creation_phase;
+
+typedef struct grpc_subchannel_call_holder {
+ /* either 0 for no call, 1 for cancelled, or a pointer to a
+ grpc_subchannel_call */
+ gpr_atm subchannel_call;
+ grpc_subchannel_call_holder_pick_subchannel pick_subchannel;
+ void *pick_subchannel_arg;
+
+ gpr_mu mu;
+
+ grpc_subchannel_call_holder_creation_phase creation_phase;
+ grpc_subchannel *subchannel;
+ grpc_pollset *pollset;
+
+ grpc_transport_stream_op *waiting_ops;
+ size_t waiting_ops_count;
+ size_t waiting_ops_capacity;
+
+ grpc_closure next_step;
+} grpc_subchannel_call_holder;
+
+void grpc_subchannel_call_holder_init(
+ grpc_subchannel_call_holder *holder,
+ grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
+ void *pick_subchannel_arg);
+void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder);
+
+void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder,
+ grpc_transport_stream_op *op);
+char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call_holder *holder,
+ grpc_channel *master);
+
+#endif