From 50d9db534c202a2a473d7b5d54f105174ec7f727 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 23 Apr 2015 10:52:14 -0700 Subject: Flesh out initial_op --- src/core/channel/client_channel.c | 5 ++- src/core/channel/connected_channel.c | 2 +- src/core/channel/http_client_filter.c | 12 ++++--- src/core/channel/http_server_filter.c | 59 +++++------------------------------ src/core/channel/noop_filter.c | 22 ++++++++----- src/core/surface/call.c | 2 +- src/core/surface/client.c | 2 +- src/core/surface/lame_client.c | 6 +++- src/core/surface/server.c | 11 +++++-- src/core/transport/chttp2_transport.c | 20 +++++++----- src/core/transport/transport.c | 4 +-- src/core/transport/transport_impl.h | 2 +- 12 files changed, 65 insertions(+), 82 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index e6b0f7bba8..77c4951038 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -325,9 +325,12 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, grpc_transport_op *initial_op) { call_data *calld = elem->call_data; + /* TODO(ctiller): is there something useful we can do here? */ + GPR_ASSERT(initial_op == NULL); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GPR_ASSERT(server_transport_data == NULL); calld->elem = elem; diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 5a5a849907..9b7db6124a 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -102,7 +102,7 @@ static void init_call_elem(grpc_call_element *elem, int r; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - r = grpc_transport_1chand->transport, + r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), server_transport_data, initial_op); GPR_ASSERT(r == 0); diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 7287408401..45a7436361 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -87,13 +87,11 @@ static void hc_on_recv(void *user_data, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->send_ops && !calld->sent_initial_metadata) { size_t nops = op->send_ops->nops; grpc_stream_op *ops = op->send_ops->ops; @@ -123,7 +121,11 @@ static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op op->on_done_recv = hc_on_recv; op->recv_user_data = elem; } +} +static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + hc_mutate_op(elem, op); grpc_call_next_op(elem, op); } @@ -146,10 +148,12 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; + if (initial_op) hc_mutate_op(elem, initial_op); } /* Destructor for call_data */ diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 31ed8a3393..e5174acd68 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -38,12 +38,6 @@ #include #include -typedef struct { - grpc_mdelem *path; - grpc_mdelem *content_type; - grpc_byte_buffer *content; -} gettable; - typedef struct call_data { gpr_uint8 got_initial_metadata; gpr_uint8 seen_path; @@ -73,9 +67,6 @@ typedef struct channel_data { grpc_mdstr *host_key; grpc_mdctx *mdctx; - - size_t gettable_count; - gettable *gettables; } channel_data; /* used to silence 'variable not used' warnings */ @@ -187,12 +178,11 @@ static void hs_on_recv(void *user_data, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->send_ops && !calld->sent_status) { size_t nops = op->send_ops->nops; @@ -215,7 +205,11 @@ static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op op->on_done_recv = hs_on_recv; op->recv_user_data = elem; } +} +static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + hs_mutate_op(elem, op); grpc_call_next_op(elem, op); } @@ -238,15 +232,12 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, grpc_transport_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - /* initialize members */ memset(calld, 0, sizeof(*calld)); + if (initial_op) hs_mutate_op(elem, initial_op); } /* Destructor for call_data */ @@ -256,9 +247,6 @@ static void destroy_call_elem(grpc_call_element *elem) {} static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { - size_t i; - size_t gettable_capacity = 0; - /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; @@ -284,46 +272,13 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); channeld->mdctx = mdctx; - - /* initialize http download support */ - channeld->gettable_count = 0; - channeld->gettables = NULL; - for (i = 0; i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_SERVE_OVER_HTTP)) { - gettable *g; - gpr_slice slice; - grpc_http_server_page *p = args->args[i].value.pointer.p; - if (channeld->gettable_count == gettable_capacity) { - gettable_capacity = - GPR_MAX(gettable_capacity * 3 / 2, gettable_capacity + 1); - channeld->gettables = gpr_realloc(channeld->gettables, - gettable_capacity * sizeof(gettable)); - } - g = &channeld->gettables[channeld->gettable_count++]; - g->path = grpc_mdelem_from_strings(mdctx, ":path", p->path); - g->content_type = - grpc_mdelem_from_strings(mdctx, "content-type", p->content_type); - slice = gpr_slice_from_copied_string(p->content); - g->content = grpc_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); - } - } } /* Destructor for channel data */ static void destroy_channel_elem(grpc_channel_element *elem) { - size_t i; - /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - for (i = 0; i < channeld->gettable_count; i++) { - grpc_mdelem_unref(channeld->gettables[i].path); - grpc_mdelem_unref(channeld->gettables[i].content_type); - grpc_byte_buffer_destroy(channeld->gettables[i].content); - } - gpr_free(channeld->gettables); - grpc_mdelem_unref(channeld->te_trailers); grpc_mdelem_unref(channeld->status_ok); grpc_mdelem_unref(channeld->status_not_found); diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 403b60901b..62e57ce285 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -45,12 +45,7 @@ typedef struct channel_data { /* used to silence 'variable not used' warnings */ static void ignore_unused(void *ignored) {} -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -58,6 +53,17 @@ static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op * ignore_unused(calld); ignore_unused(channeld); + /* do nothing */ +} + +/* Called either: + - in response to an API call (or similar) from above, to send something + - a network event (or similar) from below, to receive something + op contains type and call direction information, in addition to the data + that is being sent or received. */ +static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { + noop_mutate_op(elem, op); + /* pass control down the stack */ grpc_call_next_op(elem, op); } @@ -81,13 +87,15 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, grpc_transport_op *initial_op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; /* initialize members */ calld->unused = channeld->unused; + + if (initial_op) noop_mutate_op(elem, initial_op); } /* Destructor for call_data */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 18be81308d..c39e6cf3a4 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -288,7 +288,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, /* one ref is dropped in response to destroy, the other in stream_closed */ gpr_ref_init(&call->internal_refcount, 2); - grpc_call_stack_init(channel_stack, server_transport_data, + grpc_call_stack_init(channel_stack, server_transport_data, NULL, CALL_STACK_FROM_CALL(call)); if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { set_deadline_alarm(call, send_deadline); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 4669c59ee4..7eb99895f7 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -67,7 +67,7 @@ static void channel_op(grpc_channel_element *elem, } static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data) {} + const void *transport_server_data, grpc_transport_op *initial_op) {} static void destroy_call_elem(grpc_call_element *elem) {} diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 4e9eb808d7..f95e2a16c2 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -66,7 +66,11 @@ static void channel_op(grpc_channel_element *elem, } static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data) {} + const void *transport_server_data, grpc_transport_op *initial_op) { + if (initial_op) { + grpc_transport_op_finish_with_failure(initial_op); + } +} static void destroy_call_elem(grpc_call_element *elem) {} diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 82d1323a4b..e9d6f86734 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -439,9 +439,8 @@ static void server_on_recv(void *ptr, int success) { calld->on_done_recv(calld->recv_user_data, success); } -static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { +static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->recv_ops) { /* substitute our callback for the higher callback */ @@ -452,7 +451,11 @@ static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op op->on_done_recv = server_on_recv; op->recv_user_data = elem; } +} +static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + server_mutate_op(elem, op); grpc_call_next_op(elem, op); } @@ -504,7 +507,7 @@ static void shutdown_channel(channel_data *chand) { } static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, grpc_transport_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -516,6 +519,8 @@ static void init_call_elem(grpc_call_element *elem, gpr_mu_unlock(&chand->server->mu); server_ref(chand->server); + + server_mutate_op(elem, initial_op); } static void destroy_call_elem(grpc_call_element *elem) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index acdc98b86b..d7156142fb 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -378,7 +378,7 @@ static void maybe_finish_read(transport *t, stream *s); static void maybe_join_window_updates(transport *t, stream *s); static void finish_reads(transport *t); static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); - +static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op); /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING @@ -595,7 +595,7 @@ static void goaway(grpc_transport *gt, grpc_status_code status, } static int init_stream(grpc_transport *gt, grpc_stream *gs, - const void *server_data) { + const void *server_data, grpc_transport_op *initial_op) { transport *t = (transport *)gt; stream *s = (stream *)gs; @@ -622,6 +622,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, grpc_sopb_init(&s->callback_sopb); grpc_chttp2_data_parser_init(&s->parser); + if (initial_op) perform_op_locked(t, s, initial_op); + if (!server_data) { unlock(t); } @@ -1003,12 +1005,7 @@ static void maybe_start_some_streams(transport *t) { } } -static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { - transport *t = (transport *)gt; - stream *s = (stream *)gs; - - lock(t); - +static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { if (op->send_ops) { GPR_ASSERT(s->outgoing_sopb == NULL); s->send_done_closure.cb = op->on_done_send; @@ -1053,7 +1050,14 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *o cancel_stream(t, s, op->cancel_with_status, grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), 1); } +} +static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { + transport *t = (transport *)gt; + stream *s = (stream *)gs; + + lock(t); + perform_op_locked(t, s, op); unlock(t); } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 35195348e7..ab2f7c3470 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -52,8 +52,8 @@ void grpc_transport_destroy(grpc_transport *transport) { } int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, - const void *server_data) { - return transport->vtable->init_stream(transport, stream, server_data); + const void *server_data, grpc_transport_op *initial_op) { + return transport->vtable->init_stream(transport, stream, server_data, initial_op); } void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index ef79ac0741..75d8d51e20 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -43,7 +43,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_init_stream */ int (*init_stream)(grpc_transport *self, grpc_stream *stream, - const void *server_data); + const void *server_data, grpc_transport_op *initial_op); /* implementation of grpc_transport_send_batch */ void (*perform_op)(grpc_transport *self, grpc_stream *stream, -- cgit v1.2.3