diff options
-rw-r--r-- | src/core/channel/census_filter.c | 14 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 6 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 11 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 16 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 4 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 7 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 7 | ||||
-rw-r--r-- | src/core/channel/noop_filter.c | 7 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 7 | ||||
-rw-r--r-- | src/core/security/client_auth_filter.c | 12 | ||||
-rw-r--r-- | src/core/security/server_auth_filter.c | 4 | ||||
-rw-r--r-- | src/core/surface/call.c | 14 | ||||
-rw-r--r-- | src/core/surface/client.c | 4 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 6 | ||||
-rw-r--r-- | src/core/surface/server.c | 7 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 13 | ||||
-rw-r--r-- | src/core/transport/transport.c | 13 | ||||
-rw-r--r-- | src/core/transport/transport.h | 20 | ||||
-rw-r--r-- | src/core/transport/transport_impl.h | 5 | ||||
-rw-r--r-- | src/core/transport/transport_op_string.c | 6 | ||||
-rw-r--r-- | test/core/channel/channel_stack_test.c | 4 |
21 files changed, 101 insertions, 86 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index 7e393a01a6..ea0bece587 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -84,7 +84,8 @@ static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, } } -static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) { +static void client_mutate_op(grpc_call_element* elem, + grpc_transport_stream_op* op) { call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; if (op->send_ops) { @@ -93,7 +94,7 @@ static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) { } static void client_start_transport_op(grpc_call_element* elem, - grpc_transport_op* op) { + grpc_transport_stream_op* op) { call_data* calld = elem->call_data; GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); client_mutate_op(elem, op); @@ -110,7 +111,8 @@ static void server_on_done_recv(void* ptr, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) { +static void server_mutate_op(grpc_call_element* elem, + grpc_transport_stream_op* op) { call_data* calld = elem->call_data; if (op->recv_ops) { /* substitute our callback for the op callback */ @@ -123,7 +125,7 @@ static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) { } static void server_start_transport_op(grpc_call_element* elem, - grpc_transport_op* op) { + grpc_transport_stream_op* op) { call_data* calld = elem->call_data; GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); server_mutate_op(elem, op); @@ -145,7 +147,7 @@ static void channel_op(grpc_channel_element* elem, static void client_init_call_elem(grpc_call_element* elem, const void* server_transport_data, - grpc_transport_op* initial_op) { + grpc_transport_stream_op* initial_op) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); init_rpc_stats(&d->stats); @@ -163,7 +165,7 @@ static void client_destroy_call_elem(grpc_call_element* elem) { static void server_init_call_elem(grpc_call_element* elem, const void* server_transport_data, - grpc_transport_op* initial_op) { + grpc_transport_stream_op* initial_op) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); init_rpc_stats(&d->stats); diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 9eec8163f5..b575367b52 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -148,7 +148,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack) { void grpc_call_stack_init(grpc_channel_stack *channel_stack, const void *transport_server_data, - grpc_transport_op *initial_op, + grpc_transport_stream_op *initial_op, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); size_t count = channel_stack->count; @@ -184,7 +184,7 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) { } } -void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) { +void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) { grpc_call_element *next_elem = elem + 1; next_elem->filter->start_transport_op(next_elem, op); } @@ -206,7 +206,7 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { } void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { - grpc_transport_op op; + grpc_transport_stream_op op; memset(&op, 0, sizeof(op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; grpc_call_next_op(cur_elem, &op); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index de0e4e4518..873a8ac4d3 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -103,7 +103,8 @@ typedef struct { typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op); + void (*start_transport_op)(grpc_call_element *elem, + grpc_transport_stream_op *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ @@ -122,7 +123,7 @@ typedef struct { argument.*/ void (*init_call_elem)(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op); + grpc_transport_stream_op *initial_op); /* Destroy per call data. The filter does not need to do any chaining */ void (*destroy_call_elem)(grpc_call_element *elem); @@ -201,13 +202,13 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack); server. */ void grpc_call_stack_init(grpc_channel_stack *channel_stack, const void *transport_server_data, - grpc_transport_op *initial_op, + grpc_transport_stream_op *initial_op, grpc_call_stack *call_stack); /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_call_stack *stack); /* Call the next operation in a call stack */ -void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op); +void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op); @@ -219,7 +220,7 @@ grpc_channel_stack *grpc_channel_stack_from_top_element( grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_transport_op *op); + grpc_call_element *elem, grpc_transport_stream_op *op); void grpc_call_element_send_cancel(grpc_call_element *cur_elem); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index ea79d539ec..e171113dbc 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -91,7 +91,7 @@ struct call_data { /* our child call stack */ grpc_subchannel_call *subchannel_call; } active; - grpc_transport_op waiting_op; + grpc_transport_stream_op waiting_op; struct { grpc_linked_mdelem status; grpc_linked_mdelem details; @@ -124,7 +124,7 @@ static int prepare_activate(grpc_call_element *elem, return 1; } -static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) { +static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; grpc_call_element *child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); @@ -154,7 +154,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { #endif static void handle_op_after_cancellation(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (op->send_ops) { @@ -194,12 +194,12 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { } static void cc_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; - grpc_transport_op waiting_op; + grpc_transport_stream_op waiting_op; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -457,7 +457,7 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; /* TODO(ctiller): is there something useful we can do here? */ @@ -555,7 +555,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_data **waiting_children; size_t waiting_child_count; size_t i; - grpc_transport_op *call_ops; + grpc_transport_stream_op *call_ops; /* build the child filter stack */ child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters); @@ -597,7 +597,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_ops[i] = waiting_children[i]->s.waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; - grpc_transport_op_finish_with_failure(&call_ops[i]); + grpc_transport_stream_op_finish_with_failure(&call_ops[i]); } } diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 14dda88698..6fad077c62 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -62,7 +62,7 @@ typedef struct connected_channel_call_data { void *unused; } call_data; /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ static void con_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); @@ -96,7 +96,7 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r; diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 9805f325a6..23f5a9bfe9 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -87,7 +87,8 @@ static void hc_on_recv(void *user_data, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { +static void hc_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -124,7 +125,7 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } static void hc_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); hc_mutate_op(elem, op); grpc_call_next_op(elem, op); @@ -150,7 +151,7 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 11a53b1e70..d653938ff2 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -177,7 +177,8 @@ static void hs_on_recv(void *user_data, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { +static void hs_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -207,7 +208,7 @@ static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } static void hs_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); hs_mutate_op(elem, op); grpc_call_next_op(elem, op); @@ -233,7 +234,7 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 1d2be716d7..d472b80744 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -45,7 +45,8 @@ typedef struct channel_data { /* used to silence 'variable not used' warnings */ static void ignore_unused(void *ignored) {} -static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { +static void noop_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -62,7 +63,7 @@ static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { op contains type and call direction information, in addition to the data that is being sent or received. */ static void noop_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { noop_mutate_op(elem, op); /* pass control down the stack */ @@ -89,7 +90,7 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index c87d294d8c..9128aaeda7 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -77,9 +77,12 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, grpc_iomgr_closure *notify); /** construct a call */ -grpc_subchannel_call *grpc_subchannel_create_call(grpc_subchannel *subchannel, grpc_call_element *parent, grpc_transport_op *initial_op); +grpc_subchannel_call *grpc_subchannel_create_call( + grpc_subchannel *subchannel, grpc_call_element *parent, + grpc_transport_stream_op *initial_op); /** continue processing a transport op */ -void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, grpc_transport_op *op); +void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, + grpc_transport_stream_op *op); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index e9bd45db68..9ad4723ac5 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -58,7 +58,7 @@ typedef struct { so that work can progress when this call wants work to progress */ grpc_pollset *pollset; - grpc_transport_op op; + grpc_transport_stream_op op; size_t op_md_idx; int sent_initial_metadata; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; @@ -77,7 +77,7 @@ typedef struct { static void bubble_up_error(grpc_call_element *elem, const char *error_msg) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_transport_op_add_cancellation( + grpc_transport_stream_op_add_cancellation( &calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(chand->md_ctx, error_msg)); grpc_call_next_op(elem, &calld->op); @@ -90,7 +90,7 @@ static void on_credentials_metadata(void *user_data, grpc_call_element *elem = (grpc_call_element *)user_data; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_transport_op *op = &calld->op; + grpc_transport_stream_op *op = &calld->op; grpc_metadata_batch *mdb; size_t i; if (status != GRPC_CREDENTIALS_OK) { @@ -131,7 +131,7 @@ static char *build_service_url(const char *url_scheme, call_data *calld) { } static void send_security_metadata(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = @@ -193,7 +193,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) { op contains type and call direction information, in addition to the data that is being sent or received. */ static void auth_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; @@ -263,7 +263,7 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; calld->creds = NULL; calld->host = NULL; diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index b19160b8ed..2cb4bf612d 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -51,7 +51,7 @@ typedef struct channel_data { op contains type and call direction information, in addition to the data that is being sent or received. */ static void auth_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { /* TODO(jboeuf): Get the metadata and get a new context from it. */ /* pass control down the stack */ @@ -68,7 +68,7 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dd8eaa943e..d96b2d648f 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -252,8 +252,8 @@ struct grpc_call { static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); static void call_on_done_recv(void *call, int success); static void call_on_done_send(void *call, int success); -static int fill_send_ops(grpc_call *call, grpc_transport_op *op); -static void execute_op(grpc_call *call, grpc_transport_op *op); +static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op); +static void execute_op(grpc_call *call, grpc_transport_stream_op *op); static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, @@ -268,8 +268,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, size_t add_initial_metadata_count, gpr_timespec send_deadline) { size_t i; - grpc_transport_op initial_op; - grpc_transport_op *initial_op_ptr = NULL; + grpc_transport_stream_op initial_op; + grpc_transport_stream_op *initial_op_ptr = NULL; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); @@ -454,7 +454,7 @@ static int need_more_data(grpc_call *call) { } static void unlock(grpc_call *call) { - grpc_transport_op op; + grpc_transport_stream_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; int start_op = 0; @@ -868,7 +868,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, } } -static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { +static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { grpc_ioreq_data data; gpr_uint32 flags; grpc_metadata_batch mdb; @@ -1115,7 +1115,7 @@ static void finished_loose_op(void *call, int success_ignored) { GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0); } -static void execute_op(grpc_call *call, grpc_transport_op *op) { +static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { grpc_call_element *elem; GPR_ASSERT(op->on_consumed == NULL); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 8ac4dd1e0e..9c9cba5771 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -44,7 +44,7 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; static void client_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); grpc_call_next_op(elem, op); } @@ -69,7 +69,7 @@ static void channel_op(grpc_channel_element *elem, static void init_call_elem(grpc_call_element *elem, const void *transport_server_data, - grpc_transport_op *initial_op) {} + grpc_transport_stream_op *initial_op) {} static void destroy_call_elem(grpc_call_element *elem) {} diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b667128aef..d3fe9fffb7 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -50,7 +50,7 @@ typedef struct { typedef struct { grpc_mdctx *mdctx; } channel_data; static void lame_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -98,9 +98,9 @@ static void channel_op(grpc_channel_element *elem, static void init_call_elem(grpc_call_element *elem, const void *transport_server_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { if (initial_op) { - grpc_transport_op_finish_with_failure(initial_op); + grpc_transport_stream_op_finish_with_failure(initial_op); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 546b17c1ff..445bd1a103 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -526,7 +526,8 @@ static void server_on_recv(void *ptr, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { +static void server_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; if (op->recv_ops) { @@ -541,7 +542,7 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } static void server_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(elem, op); @@ -625,7 +626,7 @@ static void shutdown_channel(channel_data *chand, int send_goaway, static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1cd1dc822d..93d7715919 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -232,7 +232,7 @@ struct transport { gpr_uint8 writing; /** are we calling back (via cb) with a channel-level event */ gpr_uint8 calling_back_channel; - /** are we calling back any grpc_transport_op completion events */ + /** are we calling back any grpc_transport_stream_op completion events */ gpr_uint8 calling_back_ops; gpr_uint8 destroying; gpr_uint8 closed; @@ -399,7 +399,8 @@ static void maybe_finish_read(transport *t, stream *s); static void maybe_join_window_updates(transport *t, stream *s); static void finish_reads(transport *t); static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); -static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op); +static void perform_op_locked(transport *t, stream *s, + grpc_transport_stream_op *op); static void add_metadata_batch(transport *t, stream *s); static void flowctl_trace(transport *t, const char *flow, gpr_int32 window, @@ -644,7 +645,8 @@ static void goaway(grpc_transport *gt, grpc_status_code status, } static int init_stream(grpc_transport *gt, grpc_stream *gs, - const void *server_data, grpc_transport_op *initial_op) { + const void *server_data, + grpc_transport_stream_op *initial_op) { transport *t = (transport *)gt; stream *s = (stream *)gs; @@ -1127,7 +1129,8 @@ static void maybe_start_some_streams(transport *t) { } } -static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { +static void perform_op_locked(transport *t, stream *s, + grpc_transport_stream_op *op) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_stream( t, s, op->cancel_with_status, @@ -1186,7 +1189,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { } static void perform_op(grpc_transport *gt, grpc_stream *gs, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { transport *t = (transport *)gt; stream *s = (stream *)gs; diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index a9948cd4b2..400c2023a9 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -53,13 +53,13 @@ void grpc_transport_destroy(grpc_transport *transport) { int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, const void *server_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { return transport->vtable->init_stream(transport, stream, server_data, initial_op); } void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, - grpc_transport_op *op) { + grpc_transport_stream_op *op) { transport->vtable->perform_op(transport, stream, op); } @@ -96,7 +96,8 @@ void grpc_transport_setup_del_interested_party(grpc_transport_setup *setup, setup->vtable->del_interested_party(setup, pollset); } -void grpc_transport_op_finish_with_failure(grpc_transport_op *op) { +void grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op *op) { if (op->send_ops) { op->on_done_send(op->send_user_data, 0); } @@ -108,9 +109,9 @@ void grpc_transport_op_finish_with_failure(grpc_transport_op *op) { } } -void grpc_transport_op_add_cancellation(grpc_transport_op *op, - grpc_status_code status, - grpc_mdstr *message) { +void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, + grpc_status_code status, + grpc_mdstr *message) { if (op->cancel_with_status == GRPC_STATUS_OK) { op->cancel_with_status = status; op->cancel_message = message; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7f60fdc037..72bc492d80 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -63,7 +63,7 @@ typedef enum grpc_stream_state { } grpc_stream_state; /* Transport op: a set of operations to perform on a transport */ -typedef struct grpc_transport_op { +typedef struct grpc_transport_stream_op { void (*on_consumed)(void *user_data, int success); void *on_consumed_user_data; @@ -84,7 +84,7 @@ typedef struct grpc_transport_op { /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; -} grpc_transport_op; +} grpc_transport_stream_op; /* Callbacks made from the transport to the upper layers of grpc. */ struct grpc_transport_callbacks { @@ -126,7 +126,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport); supplied from the accept_stream callback function */ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, const void *server_data, - grpc_transport_op *initial_op); + grpc_transport_stream_op *initial_op); /* Destroy transport data for a stream. @@ -141,17 +141,17 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream); -void grpc_transport_op_finish_with_failure(grpc_transport_op *op); +void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op); -void grpc_transport_op_add_cancellation(grpc_transport_op *op, - grpc_status_code status, - grpc_mdstr *message); +void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, + grpc_status_code status, + grpc_mdstr *message); /* TODO(ctiller): remove this */ void grpc_transport_add_to_pollset(grpc_transport *transport, grpc_pollset *pollset); -char *grpc_transport_op_string(grpc_transport_op *op); +char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); /* Send a batch of operations on a transport @@ -161,9 +161,9 @@ char *grpc_transport_op_string(grpc_transport_op *op); transport - the transport on which to initiate the stream stream - the stream on which to send the operations. This must be non-NULL and previously initialized by the same transport. - op - a grpc_transport_op specifying the op to perform */ + op - a grpc_transport_stream_op specifying the op to perform */ void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, - grpc_transport_op *op); + grpc_transport_stream_op *op); /* Send a ping on a transport diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 479e15338f..2f940dde1c 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -43,11 +43,12 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_init_stream */ int (*init_stream)(grpc_transport *self, grpc_stream *stream, - const void *server_data, grpc_transport_op *initial_op); + const void *server_data, + grpc_transport_stream_op *initial_op); /* implementation of grpc_transport_send_batch */ void (*perform_op)(grpc_transport *self, grpc_stream *stream, - grpc_transport_op *op); + grpc_transport_stream_op *op); /* implementation of grpc_transport_add_to_pollset */ void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset); diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 5c4edb006a..43e14a7792 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -107,7 +107,7 @@ char *grpc_sopb_string(grpc_stream_op_buffer *sopb) { return out; } -char *grpc_transport_op_string(grpc_transport_op *op) { +char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { char *tmp; char *out; int first = 1; @@ -158,8 +158,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) { } void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_transport_op *op) { - char *str = grpc_transport_op_string(op); + grpc_call_element *elem, grpc_transport_stream_op *op) { + char *str = grpc_transport_stream_op_string(op); gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); gpr_free(str); } diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index fe92f2f023..78779484b2 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -54,7 +54,7 @@ static void channel_init_func(grpc_channel_element *elem, static void call_init_func(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { ++*(int *)(elem->channel_data); *(int *)(elem->call_data) = 0; } @@ -65,7 +65,7 @@ static void call_destroy_func(grpc_call_element *elem) { ++*(int *)(elem->channel_data); } -static void call_func(grpc_call_element *elem, grpc_transport_op *op) { +static void call_func(grpc_call_element *elem, grpc_transport_stream_op *op) { ++*(int *)(elem->call_data); } |