From be18b8db301b800edef382389749632e4099afaf Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 22 Apr 2015 14:00:47 -0700 Subject: Beginning transport work --- src/core/transport/chttp2/stream_encoder.c | 10 ---------- 1 file changed, 10 deletions(-) (limited to 'src/core/transport/chttp2') diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 5ca31d6bc7..203de99314 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -481,12 +481,6 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, break; case GRPC_OP_METADATA: grpc_metadata_batch_assert_ok(&op->data.metadata); - case GRPC_OP_FLOW_CTL_CB: - /* these just get copied as they don't impact the number of flow - controlled bytes */ - grpc_sopb_append(outops, op, 1); - curop++; - break; case GRPC_OP_BEGIN_MESSAGE: /* begin op: for now we just convert the op to a slice and fall through - this lets us reuse the slice framing code below */ @@ -567,10 +561,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, GPR_ERROR, "These stream ops should be filtered out by grpc_chttp2_preencode"); abort(); - case GRPC_OP_FLOW_CTL_CB: - op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK); - curop++; - break; case GRPC_OP_METADATA: /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata -- cgit v1.2.3 From e5cb23ba550878c2e562cd2df9ba0d857da63667 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 22 Apr 2015 16:58:37 -0700 Subject: Fix accidental delete --- src/core/transport/chttp2/stream_encoder.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'src/core/transport/chttp2') diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 203de99314..9b62aefd0b 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -481,6 +481,11 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, break; case GRPC_OP_METADATA: grpc_metadata_batch_assert_ok(&op->data.metadata); + /* these just get copied as they don't impact the number of flow + controlled bytes */ + grpc_sopb_append(outops, op, 1); + curop++; + break; case GRPC_OP_BEGIN_MESSAGE: /* begin op: for now we just convert the op to a slice and fall through - this lets us reuse the slice framing code below */ -- cgit v1.2.3 From 06aeea7e9440cccd019c910cb68e306063d28632 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 23 Apr 2015 10:54:45 -0700 Subject: clang-format --- src/core/channel/census_filter.c | 50 ++++++++++++++----------- src/core/channel/channel_stack.c | 3 +- src/core/channel/child_channel.c | 20 +++++----- src/core/channel/child_channel.h | 5 ++- src/core/channel/client_channel.c | 20 +++++----- src/core/channel/connected_channel.c | 18 ++++----- src/core/channel/http_client_filter.c | 9 +++-- src/core/channel/http_server_filter.c | 12 +++--- src/core/channel/noop_filter.c | 12 +++--- src/core/surface/channel.h | 2 +- src/core/surface/channel_create.c | 3 +- src/core/surface/client.c | 11 ++++-- src/core/surface/lame_client.c | 12 +++--- src/core/surface/server.c | 17 ++++++--- src/core/surface/server_chttp2.c | 3 +- src/core/transport/chttp2/stream_encoder.c | 8 ++-- src/core/transport/chttp2_transport.c | 48 ++++++++++++++---------- src/core/transport/stream_op.h | 29 +++++++------- src/core/transport/transport.c | 6 ++- src/core/transport/transport.h | 11 ++++-- src/core/transport/transport_impl.h | 2 +- test/core/end2end/fixtures/chttp2_socket_pair.c | 9 +++-- 22 files changed, 177 insertions(+), 133 deletions(-) (limited to 'src/core/transport/chttp2') diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index 47461f7f2b..7e393a01a6 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -51,9 +51,9 @@ typedef struct call_data { gpr_timespec start_ts; /* recv callback */ - grpc_stream_op_buffer *recv_ops; - void (*on_done_recv)(void *user_data, int success); - void *recv_user_data; + grpc_stream_op_buffer* recv_ops; + void (*on_done_recv)(void* user_data, int success); + void* recv_user_data; } call_data; typedef struct channel_data { @@ -65,24 +65,26 @@ static void init_rpc_stats(census_rpc_stats* stats) { stats->cnt = 1; } -static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_data* calld, +static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, + call_data* calld, channel_data* chand) { grpc_linked_mdelem* m; size_t i; for (i = 0; i < sopb->nops; i++) { - grpc_stream_op * op = &sopb->ops[i]; + grpc_stream_op* op = &sopb->ops[i]; if (op->type != GRPC_OP_METADATA) continue; for (m = op->data.metadata.list.head; m != NULL; m = m->next) { if (m->md->key == chand->path_str) { - gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); - census_add_method_tag( - calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + gpr_log(GPR_DEBUG, "%s", + (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR( + m->md->value->slice)); } } } } -static void client_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { +static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) { call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; if (op->send_ops) { @@ -90,15 +92,16 @@ static void client_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } } -static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { +static void client_start_transport_op(grpc_call_element* elem, + grpc_transport_op* op) { call_data* calld = elem->call_data; GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); client_mutate_op(elem, op); grpc_call_next_op(elem, op); } -static void server_on_done_recv(void *ptr, int success) { - grpc_call_element *elem = ptr; +static void server_on_done_recv(void* ptr, int success) { + grpc_call_element* elem = ptr; call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; if (success) { @@ -107,7 +110,7 @@ static void server_on_done_recv(void *ptr, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { +static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) { call_data* calld = elem->call_data; if (op->recv_ops) { /* substitute our callback for the op callback */ @@ -119,7 +122,8 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } } -static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { +static void server_start_transport_op(grpc_call_element* elem, + grpc_transport_op* op) { call_data* calld = elem->call_data; GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); server_mutate_op(elem, op); @@ -140,7 +144,8 @@ static void channel_op(grpc_channel_element* elem, } static void client_init_call_elem(grpc_call_element* elem, - const void* server_transport_data, grpc_transport_op *initial_op) { + const void* server_transport_data, + grpc_transport_op* initial_op) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); init_rpc_stats(&d->stats); @@ -157,7 +162,8 @@ static void client_destroy_call_elem(grpc_call_element* elem) { } static void server_init_call_elem(grpc_call_element* elem, - const void* server_transport_data, grpc_transport_op *initial_op) { + const void* server_transport_data, + grpc_transport_op* initial_op) { call_data* d = elem->call_data; GPR_ASSERT(d != NULL); init_rpc_stats(&d->stats); @@ -194,11 +200,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) { } const grpc_channel_filter grpc_client_census_filter = { - client_start_transport_op, channel_op, sizeof(call_data), client_init_call_elem, - client_destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "census-client"}; + client_start_transport_op, channel_op, sizeof(call_data), + client_init_call_elem, client_destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { - server_start_transport_op, channel_op, sizeof(call_data), server_init_call_elem, - server_destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "census-server"}; + server_start_transport_op, channel_op, sizeof(call_data), + server_init_call_elem, server_destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, "census-server"}; diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 022100e8bd..311f4f08ce 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -166,7 +166,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack, call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; - call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data, initial_op); + call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data, + initial_op); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); } diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index 817a2a8c70..a2f3c54290 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -60,11 +60,10 @@ typedef struct { gpr_uint8 sent_farewell; } lb_channel_data; -typedef struct { - grpc_child_channel *channel; -} lb_call_data; +typedef struct { grpc_child_channel *channel; } lb_call_data; -static void lb_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void lb_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { grpc_call_next_op(elem, op); } @@ -121,7 +120,8 @@ static void lb_channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void lb_init_call_elem(grpc_call_element *elem, - const void *server_transport_data, grpc_transport_op *initial_op) {} + const void *server_transport_data, + grpc_transport_op *initial_op) {} /* Destructor for call_data */ static void lb_destroy_call_elem(grpc_call_element *elem) {} @@ -154,9 +154,10 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_child_channel_top_filter = { - lb_start_transport_op, lb_channel_op, sizeof(lb_call_data), - lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data), - lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", }; + lb_start_transport_op, lb_channel_op, sizeof(lb_call_data), + lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data), + lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", +}; /* grpc_child_channel proper */ @@ -261,7 +262,8 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel, } grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, - grpc_call_element *parent, grpc_transport_op *initial_op) { + grpc_call_element *parent, + grpc_transport_op *initial_op) { grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size); grpc_call_element *lbelem; lb_call_data *lbcalld; diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h index 264a8bbb82..556a1c731c 100644 --- a/src/core/channel/child_channel.h +++ b/src/core/channel/child_channel.h @@ -57,8 +57,9 @@ void grpc_child_channel_destroy(grpc_child_channel *channel, int wait_for_callbacks); grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, - grpc_call_element *parent, grpc_transport_op *initial_op); + grpc_call_element *parent, + grpc_transport_op *initial_op); grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call); void grpc_child_call_destroy(grpc_child_call *call); -#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 77c4951038..642822a267 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -106,7 +106,8 @@ static int prepare_activate(grpc_call_element *elem, /* create a child call */ /* TODO(ctiller): pass the waiting op down here */ - calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem, NULL); + calld->s.active.child_call = + grpc_child_channel_create_call(on_child, elem, NULL); return 1; } @@ -184,9 +185,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { chand->waiting_child_count = new_count; } -static void send_up_cancelled_ops(grpc_call_element *elem) { - abort(); -} +static void send_up_cancelled_ops(grpc_call_element *elem) { abort(); } static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; @@ -219,8 +218,8 @@ static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { abort(); } -static void cc_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { +static void cc_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -325,7 +324,8 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, grpc_transport_op *initial_op) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; /* TODO(ctiller): is there something useful we can do here? */ @@ -392,9 +392,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - "client-channel", + cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "client-channel", }; grpc_transport_setup_result grpc_client_channel_transport_setup_complete( diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 9b7db6124a..14dda88698 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -50,9 +50,7 @@ typedef struct connected_channel_channel_data { grpc_transport *transport; } channel_data; -typedef struct connected_channel_call_data { - void *unused; -} call_data; +typedef struct connected_channel_call_data { void *unused; } call_data; /* We perform a small hack to locate transport data alongside the connected channel data in call allocations, to allow everything to be pulled in minimal @@ -63,13 +61,15 @@ typedef struct connected_channel_call_data { /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void con_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_transport_perform_op(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); + grpc_transport_perform_op(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } /* Currently we assume all channel operations should just be pushed up. */ @@ -136,8 +136,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected", + con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "connected", }; /* Transport callback to accept a new stream... calls up to handle it */ @@ -189,8 +190,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) { } const grpc_transport_callbacks connected_channel_transport_callbacks = { - accept_stream, - transport_goaway, transport_closed, + accept_stream, transport_goaway, transport_closed, }; grpc_transport_setup_result grpc_connected_channel_bind_transport( diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 45a7436361..9805f325a6 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -123,7 +123,8 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } } -static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void hc_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); hc_mutate_op(elem, op); grpc_call_next_op(elem, op); @@ -215,6 +216,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_client_filter = { - hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - "http-client"}; + hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "http-client"}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index e5174acd68..1f64df68e3 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -207,7 +207,8 @@ static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } } -static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void hs_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); hs_mutate_op(elem, op); grpc_call_next_op(elem, op); @@ -232,7 +233,8 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, grpc_transport_op *initial_op) { + const void *server_transport_data, + grpc_transport_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ @@ -293,6 +295,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_server_filter = { - hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - "http-server"}; + hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 62e57ce285..1d2be716d7 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -61,7 +61,8 @@ static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void noop_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { noop_mutate_op(elem, op); /* pass control down the stack */ @@ -87,7 +88,8 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, grpc_transport_op *initial_op) { + const void *server_transport_data, + grpc_transport_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -134,6 +136,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_no_op_filter = { - noop_start_transport_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "no-op"}; + noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "no-op"}; diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 05d57a905b..388be35711 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -51,4 +51,4 @@ void grpc_client_channel_closed(grpc_channel_element *elem); void grpc_channel_internal_ref(grpc_channel *channel); void grpc_channel_internal_unref(grpc_channel *channel); -#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 5f3e9bec0c..daa8d3a7c6 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -175,7 +175,8 @@ static void done_setup(void *sp) { static grpc_transport_setup_result complete_setup(void *channel_stack, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_client_filter}; return grpc_client_channel_transport_setup_complete( channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 7eb99895f7..8ac4dd1e0e 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -43,7 +43,8 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; -static void client_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void client_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); grpc_call_next_op(elem, op); } @@ -67,7 +68,8 @@ static void channel_op(grpc_channel_element *elem, } static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data, grpc_transport_op *initial_op) {} + const void *transport_server_data, + grpc_transport_op *initial_op) {} static void destroy_call_elem(grpc_call_element *elem) {} @@ -81,6 +83,7 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} const grpc_channel_filter grpc_client_surface_filter = { - client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client", + client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "client", }; diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index f95e2a16c2..c93222344d 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -46,7 +46,8 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; -static void lame_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void lame_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); grpc_transport_op_finish_with_failure(op); } @@ -66,7 +67,8 @@ static void channel_op(grpc_channel_element *elem, } static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data, grpc_transport_op *initial_op) { + const void *transport_server_data, + grpc_transport_op *initial_op) { if (initial_op) { grpc_transport_op_finish_with_failure(initial_op); } @@ -84,9 +86,9 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - "lame-client", + lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "lame-client", }; grpc_channel *grpc_lame_client_channel_create(void) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e9d6f86734..202fafca25 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -414,8 +414,10 @@ static void server_on_recv(void *ptr, int success) { } switch (*calld->recv_state) { - case GRPC_STREAM_OPEN: break; - case GRPC_STREAM_SEND_CLOSED: break; + case GRPC_STREAM_OPEN: + break; + case GRPC_STREAM_SEND_CLOSED: + break; case GRPC_STREAM_RECV_CLOSED: gpr_mu_lock(&chand->server->mu); if (calld->state == NOT_STARTED) { @@ -453,7 +455,8 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } } -static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void server_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(elem, op); @@ -507,7 +510,8 @@ static void shutdown_channel(channel_data *chand) { } static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, grpc_transport_op *initial_op) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -599,8 +603,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", + server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "server", }; static void addcq(grpc_server *server, grpc_completion_queue *cq) { diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index ebde5095a9..7b5c2f227b 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -45,7 +45,8 @@ static grpc_transport_setup_result setup_transport(void *server, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_server_filter}; return grpc_server_setup_transport(server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 9b62aefd0b..cf1e66bf8b 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -481,10 +481,10 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, break; case GRPC_OP_METADATA: grpc_metadata_batch_assert_ok(&op->data.metadata); - /* these just get copied as they don't impact the number of flow - controlled bytes */ - grpc_sopb_append(outops, op, 1); - curop++; + /* these just get copied as they don't impact the number of flow + controlled bytes */ + grpc_sopb_append(outops, op, 1); + curop++; break; case GRPC_OP_BEGIN_MESSAGE: /* begin op: for now we just convert the op to a slice and fall diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index d7156142fb..ebc715f3b8 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -525,7 +525,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, gpr_mu_lock(&t->mu); t->calling_back = 1; - ref_transport(t); /* matches unref at end of this function */ + ref_transport(t); /* matches unref at end of this function */ gpr_mu_unlock(&t->mu); sr = setup(arg, &t->base, t->metadata_context); @@ -537,7 +537,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); - ref_transport(t); /* matches unref inside recv_data */ + ref_transport(t); /* matches unref inside recv_data */ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); unref_transport(t); @@ -884,7 +884,8 @@ static int prepare_write(transport *t) { t->outgoing_window -= window_delta; s->outgoing_window -= window_delta; - if (s->write_state == WRITE_STATE_QUEUED_CLOSE && s->outgoing_sopb->nops == 0) { + if (s->write_state == WRITE_STATE_QUEUED_CLOSE && + s->outgoing_sopb->nops == 0) { s->send_closed = 1; } if (s->writing_sopb.nops > 0 || s->send_closed) { @@ -927,8 +928,7 @@ static void finalize_outbuf(transport *t) { while ((s = stream_list_remove_head(t, WRITING))) { grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, - s->send_closed, s->id, &t->hpack_compressor, - &t->outbuf); + s->send_closed, s->id, &t->hpack_compressor, &t->outbuf); s->writing_sopb.nops = 0; if (s->send_closed) { stream_list_join(t, s, WRITTEN_CLOSED); @@ -1047,12 +1047,14 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { } if (op->cancel_with_status != GRPC_STATUS_OK) { - cancel_stream(t, s, op->cancel_with_status, grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), - 1); + cancel_stream( + t, s, op->cancel_with_status, + grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), 1); } } -static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { +static void perform_op(grpc_transport *gt, grpc_stream *gs, + grpc_transport_op *op) { transport *t = (transport *)gt; stream *s = (stream *)gs; @@ -1129,7 +1131,8 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, } if (s->cancelled) { send_rst = 0; - } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE || had_outgoing) { + } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE || + had_outgoing) { s->cancelled = 1; stream_list_join(t, s, CANCELLED); @@ -1586,11 +1589,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { } if (st.initial_window_update) { for (i = 0; i < t->stream_map.count; i++) { - stream *s = (stream*)(t->stream_map.values[i]); + stream *s = (stream *)(t->stream_map.values[i]); int was_window_empty = s->outgoing_window <= 0; s->outgoing_window += st.initial_window_update; - if (was_window_empty && s->outgoing_window > 0 && - s->outgoing_sopb && s->outgoing_sopb->nops > 0) { + if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb && + s->outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } @@ -1609,7 +1612,8 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { s->outgoing_window += st.window_update; /* if this window update makes outgoing ops writable again, flag that */ - if (was_window_empty && s->outgoing_sopb && s->outgoing_sopb->nops > 0) { + if (was_window_empty && s->outgoing_sopb && + s->outgoing_sopb->nops > 0) { stream_list_join(t, s, WRITABLE); } } @@ -1877,8 +1881,8 @@ static void finish_reads(transport *t) { while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { int publish = 0; GPR_ASSERT(s->incoming_sopb); - *s->publish_state = compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, - s->read_closed); + *s->publish_state = + compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); if (*s->publish_state != s->published_state) { s->published_state = *s->publish_state; publish = 1; @@ -1895,8 +1899,12 @@ static void finish_reads(transport *t) { static void schedule_cb(transport *t, op_closure closure, int success) { if (t->pending_callbacks.capacity == t->pending_callbacks.count) { - t->pending_callbacks.capacity = GPR_MAX(t->pending_callbacks.capacity * 2, 8); - t->pending_callbacks.callbacks = gpr_realloc(t->pending_callbacks.callbacks, t->pending_callbacks.capacity * sizeof(*t->pending_callbacks.callbacks)); + t->pending_callbacks.capacity = + GPR_MAX(t->pending_callbacks.capacity * 2, 8); + t->pending_callbacks.callbacks = + gpr_realloc(t->pending_callbacks.callbacks, + t->pending_callbacks.capacity * + sizeof(*t->pending_callbacks.callbacks)); } closure.success = success; t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure; @@ -1944,9 +1952,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { */ static const grpc_transport_vtable vtable = { - sizeof(stream), init_stream, perform_op, - add_to_pollset, destroy_stream, goaway, close_transport, - send_ping, destroy_transport}; + sizeof(stream), init_stream, perform_op, + add_to_pollset, destroy_stream, goaway, + close_transport, send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void *arg, diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index f5de64d583..95497a3cc8 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -86,29 +86,31 @@ typedef struct grpc_metadata_batch { void grpc_metadata_batch_init(grpc_metadata_batch *comd); void grpc_metadata_batch_destroy(grpc_metadata_batch *comd); void grpc_metadata_batch_merge(grpc_metadata_batch *target, - grpc_metadata_batch *add); + grpc_metadata_batch *add); void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, - grpc_linked_mdelem *storage); + grpc_linked_mdelem *storage); void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, - grpc_linked_mdelem *storage); + grpc_linked_mdelem *storage); void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, - grpc_linked_mdelem *storage, - grpc_mdelem *elem_to_add); + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, - grpc_linked_mdelem *storage, - grpc_mdelem *elem_to_add); + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); void grpc_metadata_batch_filter(grpc_metadata_batch *comd, - grpc_mdelem *(*filter)(void *user_data, - grpc_mdelem *elem), - void *user_data); + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data); #ifndef NDEBUG void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd); #else -#define grpc_metadata_batch_assert_ok(comd) do {} while (0) +#define grpc_metadata_batch_assert_ok(comd) \ + do { \ + } while (0) #endif /* Represents a single operation performed on a stream/transport */ @@ -151,7 +153,8 @@ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb); /* Append a GRPC_OP_BEGIN to a buffer */ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, gpr_uint32 flags); -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata); +void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, + grpc_metadata_batch metadata); /* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); /* Append a buffer to a buffer - does not ref/unref any internal objects */ @@ -160,4 +163,4 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, char *grpc_sopb_string(grpc_stream_op_buffer *sopb); -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */ +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index ab2f7c3470..724cb0ea68 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -52,8 +52,10 @@ void grpc_transport_destroy(grpc_transport *transport) { } int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - const void *server_data, grpc_transport_op *initial_op) { - return transport->vtable->init_stream(transport, stream, server_data, initial_op); + const void *server_data, + grpc_transport_op *initial_op) { + return transport->vtable->init_stream(transport, stream, server_data, + initial_op); } void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index a51e01d3c9..5036dfc2de 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -95,7 +95,8 @@ struct grpc_transport_callbacks { void (*accept_stream)(void *user_data, grpc_transport *transport, const void *server_data); - void (*goaway)(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug); + void (*goaway)(void *user_data, grpc_transport *transport, + grpc_status_code status, gpr_slice debug); /* The transport has been closed */ void (*closed)(void *user_data, grpc_transport *transport); @@ -115,7 +116,8 @@ size_t grpc_transport_stream_size(grpc_transport *transport); server_data - either NULL for a client initiated stream, or a pointer supplied from the accept_stream callback function */ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - const void *server_data, grpc_transport_op *initial_op); + const void *server_data, + grpc_transport_op *initial_op); /* Destroy transport data for a stream. @@ -133,7 +135,8 @@ void grpc_transport_destroy_stream(grpc_transport *transport, void grpc_transport_op_finish_with_failure(grpc_transport_op *op); /* TODO(ctiller): remove this */ -void grpc_transport_add_to_pollset(grpc_transport *transport, grpc_pollset *pollset); +void grpc_transport_add_to_pollset(grpc_transport *transport, + grpc_pollset *pollset); char *grpc_transport_op_string(grpc_transport_op *op); @@ -205,4 +208,4 @@ void grpc_transport_setup_initiate(grpc_transport_setup *setup); used as a destruction call by setup). */ void grpc_transport_setup_cancel(grpc_transport_setup *setup); -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 75d8d51e20..479e15338f 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -76,4 +76,4 @@ struct grpc_transport { const grpc_transport_vtable *vtable; }; -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */ +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */ diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index acc0a69708..d19ceb178b 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -59,7 +59,8 @@ static grpc_transport_setup_result server_setup_transport( void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { grpc_end2end_test_fixture *f = ts; - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_server_filter}; return grpc_server_setup_transport(f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } @@ -73,9 +74,9 @@ static grpc_transport_setup_result client_setup_transport( void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { sp_client_setup *cs = ts; - const grpc_channel_filter *filters[] = { - &grpc_client_surface_filter, &grpc_http_client_filter, - &grpc_connected_channel_filter}; + const grpc_channel_filter *filters[] = {&grpc_client_surface_filter, + &grpc_http_client_filter, + &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( filters, nfilters, cs->client_args, mdctx, 1); -- cgit v1.2.3