aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel/connected_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel/connected_channel.c')
-rw-r--r--src/core/lib/channel/connected_channel.c92
1 files changed, 11 insertions, 81 deletions
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 8285226fc4..af06ca802e 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -36,57 +36,7 @@ typedef struct connected_channel_channel_data {
grpc_transport *transport;
} channel_data;
-typedef struct {
- grpc_closure closure;
- grpc_closure *original_closure;
- grpc_call_combiner *call_combiner;
- const char *reason;
-} callback_state;
-
-typedef struct connected_channel_call_data {
- grpc_call_combiner *call_combiner;
- // Closures used for returning results on the call combiner.
- callback_state on_complete[6]; // Max number of pending batches.
- callback_state recv_initial_metadata_ready;
- callback_state recv_message_ready;
-} call_data;
-
-static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- callback_state *state = (callback_state *)arg;
- GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner,
- state->original_closure, GRPC_ERROR_REF(error),
- state->reason);
-}
-
-static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- run_in_call_combiner(exec_ctx, arg, error);
- gpr_free(arg);
-}
-
-static void intercept_callback(call_data *calld, callback_state *state,
- bool free_when_done, const char *reason,
- grpc_closure **original_closure) {
- state->original_closure = *original_closure;
- state->call_combiner = calld->call_combiner;
- state->reason = reason;
- *original_closure = GRPC_CLOSURE_INIT(
- &state->closure,
- free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
- state, grpc_schedule_on_exec_ctx);
-}
-
-static callback_state *get_state_for_batch(
- call_data *calld, grpc_transport_stream_op_batch *batch) {
- if (batch->send_initial_metadata) return &calld->on_complete[0];
- if (batch->send_message) return &calld->on_complete[1];
- if (batch->send_trailing_metadata) return &calld->on_complete[2];
- if (batch->recv_initial_metadata) return &calld->on_complete[3];
- if (batch->recv_message) return &calld->on_complete[4];
- if (batch->recv_trailing_metadata) return &calld->on_complete[5];
- GPR_UNREACHABLE_CODE(return NULL);
-}
+typedef struct connected_channel_call_data { void *unused; } call_data;
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -99,38 +49,13 @@ static callback_state *get_state_for_batch(
into transport stream operations */
static void con_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *batch) {
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (batch->recv_initial_metadata) {
- callback_state *state = &calld->recv_initial_metadata_ready;
- intercept_callback(
- calld, state, false, "recv_initial_metadata_ready",
- &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
- }
- if (batch->recv_message) {
- callback_state *state = &calld->recv_message_ready;
- intercept_callback(calld, state, false, "recv_message_ready",
- &batch->payload->recv_message.recv_message_ready);
- }
- if (batch->cancel_stream) {
- // There can be more than one cancellation batch in flight at any
- // given time, so we can't just pick out a fixed index into
- // calld->on_complete like we can for the other ops. However,
- // cancellation isn't in the fast path, so we just allocate a new
- // closure for each one.
- callback_state *state = (callback_state *)gpr_malloc(sizeof(*state));
- intercept_callback(calld, state, true, "on_complete (cancel_stream)",
- &batch->on_complete);
- } else {
- callback_state *state = get_state_for_batch(calld, batch);
- intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
- }
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
grpc_transport_perform_stream_op(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- batch);
- GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
- "passed batch to transport");
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -146,7 +71,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- calld->call_combiner = args->call_combiner;
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data, args->arena);
@@ -194,6 +118,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
}
+static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ return grpc_transport_get_peer(exec_ctx, chand->transport);
+}
+
/* No-op. */
static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -209,6 +138,7 @@ const grpc_channel_filter grpc_connected_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ con_get_peer,
con_get_channel_info,
"connected",
};