aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_uchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_uchannel.c')
-rw-r--r--src/core/channel/client_uchannel.c355
1 files changed, 34 insertions, 321 deletions
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index 510677a844..ec6b02381a 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -39,6 +39,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
#include "src/core/channel/compress_filter.h"
+#include "src/core/channel/subchannel_call_holder.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include "src/core/surface/channel.h"
@@ -52,8 +53,6 @@
/** Microchannel (uchannel) implementation: a lightweight channel without any
* load-balancing mechanisms meant for communication from within the core. */
-typedef struct call_data call_data;
-
typedef struct client_uchannel_channel_data {
/** metadata context for this channel */
grpc_mdctx *mdctx;
@@ -80,85 +79,7 @@ typedef struct client_uchannel_channel_data {
gpr_mu mu_state;
} channel_data;
-typedef enum {
- CALL_CREATED,
- CALL_WAITING_FOR_SEND,
- CALL_WAITING_FOR_CALL,
- CALL_ACTIVE,
- CALL_CANCELLED
-} call_state;
-
-struct call_data {
- /* owning element */
- grpc_call_element *elem;
-
- gpr_mu mu_state;
-
- call_state state;
- gpr_timespec deadline;
- grpc_closure async_setup_task;
- grpc_transport_stream_op waiting_op;
- /* our child call stack */
- grpc_subchannel_call *subchannel_call;
- grpc_linked_mdelem status;
- grpc_linked_mdelem details;
-};
-
-static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
- grpc_transport_stream_op *new_op)
- GRPC_MUST_USE_RESULT;
-
-static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- if (op->send_ops) {
- grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
- }
- if (op->recv_ops) {
- char status[GPR_LTOA_MIN_BUFSIZE];
- grpc_metadata_batch mdb;
- gpr_ltoa(GRPC_STATUS_CANCELLED, status);
- calld->status.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
- calld->details.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
- calld->status.prev = calld->details.next = NULL;
- calld->status.next = &calld->details;
- calld->details.prev = &calld->status;
- mdb.list.head = &calld->status;
- mdb.list.tail = &calld->details;
- mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- grpc_sopb_add_metadata(op->recv_ops, mdb);
- *op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
- }
- if (op->on_consumed) {
- op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
- }
-}
-
-typedef struct {
- grpc_closure closure;
- grpc_call_element *elem;
-} waiting_call;
-
-static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op,
- int continuation);
-
-static int is_empty(void *p, int len) {
- char *ptr = p;
- int i;
- for (i = 0; i < len; i++) {
- if (ptr[i] != 0) return 0;
- }
- return 1;
-}
+typedef grpc_subchannel_call_holder call_data;
static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
@@ -171,201 +92,17 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
&chand->connectivity_cb);
}
-static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- call_data *calld = arg;
- grpc_transport_stream_op op;
- int have_waiting;
-
- if (calld->state == CALL_CANCELLED && iomgr_success == 0) {
- have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
- }
- } else if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
- memset(&op, 0, sizeof(op));
- op.cancel_with_status = GRPC_STATUS_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
- } else if (calld->state == CALL_WAITING_FOR_CALL) {
- have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
- if (calld->subchannel_call != NULL) {
- calld->state = CALL_ACTIVE;
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
- &calld->waiting_op);
- }
- } else {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- if (have_waiting) {
- handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
- }
- }
- } else {
- GPR_ASSERT(calld->state == CALL_CANCELLED);
- gpr_mu_unlock(&calld->mu_state);
- have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
- if (have_waiting) {
- handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
- }
- }
-}
-
-static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- call_data *calld = arg;
- gpr_mu_lock(&calld->mu_state);
- started_call_locked(exec_ctx, arg, iomgr_success);
-}
-
-static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
- grpc_transport_stream_op *new_op) {
- call_data *calld = elem->call_data;
- grpc_closure *consumed_op = NULL;
- grpc_transport_stream_op *waiting_op = &calld->waiting_op;
- GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
- GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
- if (new_op->send_ops != NULL) {
- waiting_op->send_ops = new_op->send_ops;
- waiting_op->is_last_send = new_op->is_last_send;
- waiting_op->on_done_send = new_op->on_done_send;
- }
- if (new_op->recv_ops != NULL) {
- waiting_op->recv_ops = new_op->recv_ops;
- waiting_op->recv_state = new_op->recv_state;
- waiting_op->on_done_recv = new_op->on_done_recv;
- }
- if (new_op->on_consumed != NULL) {
- if (waiting_op->on_consumed != NULL) {
- consumed_op = waiting_op->on_consumed;
- }
- waiting_op->on_consumed = new_op->on_consumed;
- }
- if (new_op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op->cancel_with_status = new_op->cancel_with_status;
- }
- return consumed_op;
-}
-
static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- grpc_subchannel_call *subchannel_call;
- char *result;
-
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == CALL_ACTIVE) {
- subchannel_call = calld->subchannel_call;
- GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
- gpr_mu_unlock(&calld->mu_state);
- result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
- return result;
- } else {
- gpr_mu_unlock(&calld->mu_state);
- return grpc_channel_get_target(chand->master);
- }
-}
-
-static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op,
- int continuation) {
- call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_subchannel_call *subchannel_call;
- grpc_transport_stream_op op2;
- GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- gpr_mu_lock(&calld->mu_state);
- /* make sure the wrapped subchannel has been set (see
- * grpc_client_uchannel_set_subchannel) */
- GPR_ASSERT(chand->subchannel != NULL);
-
- switch (calld->state) {
- case CALL_ACTIVE:
- GPR_ASSERT(!continuation);
- subchannel_call = calld->subchannel_call;
- gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
- break;
- case CALL_CANCELLED:
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- break;
- case CALL_WAITING_FOR_SEND:
- GPR_ASSERT(!continuation);
- grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
- if (!calld->waiting_op.send_ops &&
- calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
- gpr_mu_unlock(&calld->mu_state);
- break;
- }
- *op = calld->waiting_op;
- memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
- continuation = 1;
- /* fall through */
- case CALL_WAITING_FOR_CALL:
- if (!continuation) {
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- op2 = calld->waiting_op;
- memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
- if (op->on_consumed) {
- calld->waiting_op.on_consumed = op->on_consumed;
- op->on_consumed = NULL;
- } else if (op2.on_consumed) {
- calld->waiting_op.on_consumed = op2.on_consumed;
- op2.on_consumed = NULL;
- }
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- handle_op_after_cancellation(exec_ctx, elem, &op2);
- grpc_subchannel_cancel_waiting_call(exec_ctx, chand->subchannel, 1);
- } else {
- grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
- gpr_mu_unlock(&calld->mu_state);
- }
- break;
- }
- /* fall through */
- case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(exec_ctx, elem, op);
- } else {
- calld->waiting_op = *op;
- if (op->send_ops == NULL) {
- calld->state = CALL_WAITING_FOR_SEND;
- gpr_mu_unlock(&calld->mu_state);
- } else {
- grpc_subchannel_call_create_status call_creation_status;
- grpc_pollset *pollset = calld->waiting_op.bind_pollset;
- calld->state = CALL_WAITING_FOR_CALL;
- grpc_closure_init(&calld->async_setup_task, started_call, calld);
- call_creation_status = grpc_subchannel_create_call(
- exec_ctx, chand->subchannel, pollset, &calld->subchannel_call,
- &calld->async_setup_task);
- if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
- started_call_locked(exec_ctx, calld, 1);
- } else {
- gpr_mu_unlock(&calld->mu_state);
- }
- }
- }
- break;
- }
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
+ chand->master);
}
static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
- perform_transport_stream_op(exec_ctx, elem, op, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
}
static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -392,64 +129,40 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
}
+static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_metadata_batch *initial_metadata,
+ grpc_subchannel **subchannel,
+ grpc_closure *on_ready) {
+ channel_data *chand = arg;
+ GPR_ASSERT(initial_metadata != NULL);
+ *subchannel = chand->subchannel;
+ return 1;
+}
+
/* Constructor for call_data */
static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
- call_data *calld = elem->call_data;
- memset(calld, 0, sizeof(call_data));
-
- /* TODO(ctiller): is there something useful we can do here? */
- GPR_ASSERT(initial_op == NULL);
-
- GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- GPR_ASSERT(server_transport_data == NULL);
- gpr_mu_init(&calld->mu_state);
- calld->elem = elem;
- calld->state = CALL_CREATED;
- calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ grpc_call_element_args *args) {
+ grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
+ elem->channel_data);
}
/* Destructor for call_data */
static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- grpc_subchannel_call *subchannel_call;
-
- /* if the call got activated, we need to destroy the child stack also, and
- remove it from the in-flight requests tracked by the child_entry we
- picked */
- gpr_mu_lock(&calld->mu_state);
- switch (calld->state) {
- case CALL_ACTIVE:
- subchannel_call = calld->subchannel_call;
- gpr_mu_unlock(&calld->mu_state);
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_uchannel");
- break;
- case CALL_CREATED:
- case CALL_CANCELLED:
- gpr_mu_unlock(&calld->mu_state);
- break;
- case CALL_WAITING_FOR_CALL:
- case CALL_WAITING_FOR_SEND:
- GPR_UNREACHABLE_CODE(return );
- }
+ grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
}
/* Constructor for channel_data */
static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
- grpc_channel *master,
- const grpc_channel_args *args,
- grpc_mdctx *metadata_context, int is_first,
- int is_last) {
+ grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
- GPR_ASSERT(is_last);
+ GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- chand->mdctx = metadata_context;
- chand->master = master;
+ chand->mdctx = args->metadata_context;
+ chand->master = args->master;
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_uchannel");
gpr_mu_init(&chand->mu_state);
@@ -465,17 +178,17 @@ static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_destroy(&chand->mu_state);
}
+static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_pollset *pollset) {
+ call_data *calld = elem->call_data;
+ calld->pollset = pollset;
+}
+
const grpc_channel_filter grpc_client_uchannel_filter = {
- cuc_start_transport_stream_op,
- cuc_start_transport_op,
- sizeof(call_data),
- cuc_init_call_elem,
- cuc_destroy_call_elem,
- sizeof(channel_data),
- cuc_init_channel_elem,
- cuc_destroy_channel_elem,
- cuc_get_peer,
- "client-uchannel",
+ cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data),
+ cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem,
+ sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem,
+ cuc_get_peer, "client-uchannel",
};
grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(