aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-31 15:42:16 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-03-31 15:42:16 -0700
commita0f3abd92502e1bed89b28e9fc03e70a12a7cfb5 (patch)
treeac923813eb22f79f9cba25e7aa047ad628c7d818 /src/core/ext
parente198b719895dd3a94dd89476655f4b15b026ffb8 (diff)
Review feedback: bikeshedding round
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/grpc_filter.c8
-rw-r--r--src/core/ext/client_channel/client_channel.c56
-rw-r--r--src/core/ext/client_channel/subchannel.c4
-rw-r--r--src/core/ext/client_channel/subchannel.h2
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c10
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c11
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c14
7 files changed, 53 insertions, 52 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 8c3d450dc1..bcf59a4efe 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -74,7 +74,7 @@ static void extract_and_annotate_method_tag(grpc_metadata_batch *md,
}
static void client_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_initial_metadata) {
@@ -85,7 +85,7 @@ static void client_mutate_op(grpc_call_element *elem,
static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
client_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -104,7 +104,7 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
}
static void server_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
/* substitute our callback for the op callback */
@@ -119,7 +119,7 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
/* TODO(ctiller): this code fails. I don't know why. I expect it's
incomplete, and someone should look at it soon.
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 6e3b472527..ae25973d4e 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -755,7 +755,7 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
- grpc_transport_stream_op **waiting_ops;
+ grpc_transport_stream_op_batch **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
@@ -775,7 +775,7 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
return scc == CANCELLED_CALL ? NULL : scc;
}
-static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
+static void add_waiting_locked(call_data *calld, grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("add_waiting_locked", 0);
if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
@@ -791,7 +791,7 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
grpc_error *error) {
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
- grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
@@ -804,7 +804,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
}
grpc_subchannel_call *call = GET_CALL(calld);
- grpc_transport_stream_op **ops = calld->waiting_ops;
+ grpc_transport_stream_op_batch **ops = calld->waiting_ops;
size_t nops = calld->waiting_ops_count;
if (call == CANCELLED_CALL) {
fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
@@ -1052,8 +1052,8 @@ static bool pick_subchannel_locked(
return false;
}
-static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
- grpc_transport_stream_op *op,
+static void start_transport_stream_op_batch_locked_inner(grpc_exec_ctx *exec_ctx,
+ grpc_transport_stream_op_batch *op,
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
@@ -1062,7 +1062,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
/* need to recheck that another thread hasn't set the call */
call = GET_CALL(calld);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
/* early out */
return;
@@ -1077,7 +1077,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
/* recurse to retry */
- start_transport_stream_op_locked_inner(exec_ctx, op, elem);
+ start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
/* early out */
return;
} else {
@@ -1099,7 +1099,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
}
- grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
/* early out */
@@ -1143,13 +1143,13 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
}
gpr_atm_rel_store(&calld->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, calld);
/* recurse to retry */
- start_transport_stream_op_locked_inner(exec_ctx, op, elem);
+ start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
/* early out */
return;
}
@@ -1177,11 +1177,11 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF(error));
}
-static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
+static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
- GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
+ GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
- grpc_transport_stream_op *op = arg;
+ grpc_transport_stream_op_batch *op = arg;
grpc_call_element *elem = op->handler_private.extra_arg;
call_data *calld = elem->call_data;
@@ -1193,11 +1193,11 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
op->on_complete = &calld->on_complete;
}
- start_transport_stream_op_locked_inner(exec_ctx, op, elem);
+ start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
- "start_transport_stream_op");
- GPR_TIMER_END("start_transport_stream_op_locked", 0);
+ "start_transport_stream_op_batch");
+ GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
}
/* The logic here is fairly complicated, due to (a) the fact that we
@@ -1208,39 +1208,39 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
We use double-checked locking to initially see if initialization has been
performed. If it has not, we acquire the combiner and perform initialization.
If it has, we proceed on the fast path. */
-static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+static void cc_start_transport_stream_op_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
+ grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
- GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
+ GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(
+ grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
- GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
if (call != NULL) {
grpc_subchannel_call_process_op(exec_ctx, call, op);
- GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
/* we failed; lock and figure out what to do */
- GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op");
+ GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
op->handler_private.extra_arg = elem;
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->handler_private.closure,
- start_transport_stream_op_locked, op,
+ start_transport_stream_op_batch_locked, op,
grpc_combiner_scheduler(chand->combiner, false)),
GRPC_ERROR_NONE);
- GPR_TIMER_END("cc_start_transport_stream_op", 0);
+ GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@@ -1299,7 +1299,7 @@ static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
*/
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_stream_op,
+ cc_start_transport_stream_op_batch,
cc_start_transport_op,
sizeof(call_data),
cc_init_call_elem,
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 063c0badff..681a342ea3 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -749,11 +749,11 @@ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
+ top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, op);
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 3e64a2507c..ba96c92df8 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -157,7 +157,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call,
- grpc_transport_stream_op *op);
+ grpc_transport_stream_op_batch *op);
/** continue querying for peer */
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index cb6bc95dd3..10f14ab6f5 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -183,10 +183,10 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
*/
}
-static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+static void lr_start_transport_stream_op_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- GPR_TIMER_BEGIN("lr_start_transport_stream_op", 0);
+ grpc_transport_stream_op_batch *op) {
+ GPR_TIMER_BEGIN("lr_start_transport_stream_op_batch", 0);
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
@@ -200,11 +200,11 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("lr_start_transport_stream_op", 0);
+ GPR_TIMER_END("lr_start_transport_stream_op_batch", 0);
}
const grpc_channel_filter grpc_load_reporting_filter = {
- lr_start_transport_stream_op,
+ lr_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 172179fe23..fab0ac5c9b 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1140,13 +1140,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
- grpc_transport_stream_op *op = stream_op;
+ grpc_transport_stream_op_batch *op = stream_op;
grpc_chttp2_stream *s = op->handler_private.extra_arg;
- grpc_transport_stream_op_payload *op_payload = op->payload;
+ grpc_transport_stream_op_batch_payload *op_payload = op->payload;
grpc_chttp2_transport *t = s->t;
if (grpc_http_trace) {
- char *str = grpc_transport_stream_op_string(op);
+ char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
@@ -1374,13 +1374,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_stream *gs,
+ grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
if (grpc_http_trace) {
- char *str = grpc_transport_stream_op_string(op);
+ char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op[s=%p/%d]: %s", s, s->id, str);
gpr_free(str);
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index b6f1b729fd..0d1b180dd5 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -172,7 +172,7 @@ struct op_state {
};
struct op_and_state {
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
struct op_state state;
bool done;
struct stream_obj *s; /* Pointer back to the stream object */
@@ -187,7 +187,7 @@ struct op_storage {
struct stream_obj {
gpr_arena *arena;
struct op_and_state *oas;
- grpc_transport_stream_op *curr_op;
+ grpc_transport_stream_op_batch *curr_op;
grpc_cronet_transport *curr_ct;
grpc_stream *curr_gs;
bidirectional_stream *cbs;
@@ -298,12 +298,12 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) {
/*
Add a new stream op to op storage.
*/
-static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
+static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op_batch *op) {
struct op_storage *storage = &s->storage;
/* add new op at the beginning of the linked list. The memory is freed
in remove_from_storage */
struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state));
- memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op));
+ memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch));
memset(&new_op->state, 0, sizeof(new_op->state));
new_op->s = s;
new_op->done = false;
@@ -768,7 +768,7 @@ static bool header_has_authority(grpc_linked_mdelem *head) {
Op Execution: Decide if one of the actions contained in the stream op can be
executed. This is the heart of the state machine.
*/
-static bool op_can_be_run(grpc_transport_stream_op *curr_op,
+static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
struct stream_obj *s, struct op_state *op_state,
enum e_op_id op_id) {
struct op_state *stream_state = &s->state;
@@ -919,7 +919,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
*/
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas) {
- grpc_transport_stream_op *stream_op = &oas->op;
+ grpc_transport_stream_op_batch *stream_op = &oas->op;
struct stream_obj *s = oas->s;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
struct op_state *stream_state = &s->state;
@@ -1301,7 +1301,7 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set) {}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_stream *gs, grpc_transport_stream_op_batch *op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
if (op->send_initial_metadata &&
header_has_authority(op->payload->send_initial_metadata