aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel/connected_channel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel/connected_channel.cc')
-rw-r--r--src/core/lib/channel/connected_channel.cc130
1 files changed, 65 insertions, 65 deletions
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index 4f37908958..49b9f140c0 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -33,41 +33,41 @@
#define MAX_BUFFER_LENGTH 8192
typedef struct connected_channel_channel_data {
- grpc_transport *transport;
+ grpc_transport* transport;
} channel_data;
typedef struct {
grpc_closure closure;
- grpc_closure *original_closure;
- grpc_call_combiner *call_combiner;
- const char *reason;
+ grpc_closure* original_closure;
+ grpc_call_combiner* call_combiner;
+ const char* reason;
} callback_state;
typedef struct connected_channel_call_data {
- grpc_call_combiner *call_combiner;
+ grpc_call_combiner* call_combiner;
// Closures used for returning results on the call combiner.
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
} call_data;
-static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- callback_state *state = (callback_state *)arg;
+static void run_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ callback_state* state = (callback_state*)arg;
GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner,
state->original_closure, GRPC_ERROR_REF(error),
state->reason);
}
-static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void run_cancel_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
run_in_call_combiner(exec_ctx, arg, error);
gpr_free(arg);
}
-static void intercept_callback(call_data *calld, callback_state *state,
- bool free_when_done, const char *reason,
- grpc_closure **original_closure) {
+static void intercept_callback(call_data* calld, callback_state* state,
+ bool free_when_done, const char* reason,
+ grpc_closure** original_closure) {
state->original_closure = *original_closure;
state->call_combiner = calld->call_combiner;
state->reason = reason;
@@ -77,8 +77,8 @@ static void intercept_callback(call_data *calld, callback_state *state,
state, grpc_schedule_on_exec_ctx);
}
-static callback_state *get_state_for_batch(
- call_data *calld, grpc_transport_stream_op_batch *batch) {
+static callback_state* get_state_for_batch(
+ call_data* calld, grpc_transport_stream_op_batch* batch) {
if (batch->send_initial_metadata) return &calld->on_complete[0];
if (batch->send_message) return &calld->on_complete[1];
if (batch->send_trailing_metadata) return &calld->on_complete[2];
@@ -91,25 +91,25 @@ static callback_state *get_state_for_batch(
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
cache line requests */
-#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream *)((calld) + 1))
+#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream*)((calld) + 1))
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
- (((call_data *)(transport_stream)) - 1)
+ (((call_data*)(transport_stream)) - 1)
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
static void con_start_transport_stream_op_batch(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *batch) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* batch) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
if (batch->recv_initial_metadata) {
- callback_state *state = &calld->recv_initial_metadata_ready;
+ callback_state* state = &calld->recv_initial_metadata_ready;
intercept_callback(
calld, state, false, "recv_initial_metadata_ready",
&batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
}
if (batch->recv_message) {
- callback_state *state = &calld->recv_message_ready;
+ callback_state* state = &calld->recv_message_ready;
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
@@ -119,11 +119,11 @@ static void con_start_transport_stream_op_batch(
// calld->on_complete like we can for the other ops. However,
// cancellation isn't in the fast path, so we just allocate a new
// closure for each one.
- callback_state *state = (callback_state *)gpr_malloc(sizeof(*state));
+ callback_state* state = (callback_state*)gpr_malloc(sizeof(*state));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
} else {
- callback_state *state = get_state_for_batch(calld, batch);
+ callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}
grpc_transport_perform_stream_op(exec_ctx, chand->transport,
@@ -133,19 +133,19 @@ static void con_start_transport_stream_op_batch(
"passed batch to transport");
}
-static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_transport_op *op) {
- channel_data *chand = (channel_data *)elem->channel_data;
+static void con_start_transport_op(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_transport_op* op) {
+ channel_data* chand = (channel_data*)elem->channel_data;
grpc_transport_perform_op(exec_ctx, chand->transport, op);
}
/* Constructor for call_data */
-static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- const grpc_call_element_args *args) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
calld->call_combiner = args->call_combiner;
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
@@ -155,49 +155,49 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
"transport stream initialization failed");
}
-static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_polling_entity *pollent) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+static void set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_polling_entity* pollent) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
grpc_transport_set_pops(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollent);
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_final_info *final_info,
- grpc_closure *then_schedule_closure) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_schedule_closure) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
grpc_transport_destroy_stream(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
then_schedule_closure);
}
/* Constructor for channel_data */
-static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- channel_data *cd = (channel_data *)elem->channel_data;
+static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ channel_data* cd = (channel_data*)elem->channel_data;
GPR_ASSERT(args->is_last);
cd->transport = NULL;
return GRPC_ERROR_NONE;
}
/* Destructor for channel_data */
-static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {
- channel_data *cd = (channel_data *)elem->channel_data;
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {
+ channel_data* cd = (channel_data*)elem->channel_data;
if (cd->transport) {
grpc_transport_destroy(exec_ctx, cd->transport);
}
}
/* No-op. */
-static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- const grpc_channel_info *channel_info) {}
+static void con_get_channel_info(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ const grpc_channel_info* channel_info) {}
const grpc_channel_filter grpc_connected_filter = {
con_start_transport_stream_op_batch,
@@ -213,12 +213,12 @@ const grpc_channel_filter grpc_connected_filter = {
"connected",
};
-static void bind_transport(grpc_channel_stack *channel_stack,
- grpc_channel_element *elem, void *t) {
- channel_data *cd = (channel_data *)elem->channel_data;
+static void bind_transport(grpc_channel_stack* channel_stack,
+ grpc_channel_element* elem, void* t) {
+ channel_data* cd = (channel_data*)elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_filter);
GPR_ASSERT(cd->transport == NULL);
- cd->transport = (grpc_transport *)t;
+ cd->transport = (grpc_transport*)t;
/* HACK(ctiller): increase call stack size for the channel to make space
for channel data. We need a cleaner (but performant) way to do this,
@@ -227,20 +227,20 @@ static void bind_transport(grpc_channel_stack *channel_stack,
the last call element, and the last call element MUST be the connected
channel. */
channel_stack->call_stack_size +=
- grpc_transport_stream_size((grpc_transport *)t);
+ grpc_transport_stream_size((grpc_transport*)t);
}
-bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack_builder *builder,
- void *arg_must_be_null) {
+bool grpc_add_connected_filter(grpc_exec_ctx* exec_ctx,
+ grpc_channel_stack_builder* builder,
+ void* arg_must_be_null) {
GPR_ASSERT(arg_must_be_null == NULL);
- grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
+ grpc_transport* t = grpc_channel_stack_builder_get_transport(builder);
GPR_ASSERT(t != NULL);
return grpc_channel_stack_builder_append_filter(
builder, &grpc_connected_filter, bind_transport, t);
}
-grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) {
- call_data *calld = (call_data *)elem->call_data;
+grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem) {
+ call_data* calld = (call_data*)elem->call_data;
return TRANSPORT_STREAM_FROM_CALL_DATA(calld);
}