aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-23 10:52:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-23 10:52:14 -0700
commit50d9db534c202a2a473d7b5d54f105174ec7f727 (patch)
tree0fbb703e46646b7666600d5de7cf28256ad239ad
parent0b20751b5040ccecb7a31c57f4038cec1559ab7e (diff)
Flesh out initial_op
-rw-r--r--src/core/channel/client_channel.c5
-rw-r--r--src/core/channel/connected_channel.c2
-rw-r--r--src/core/channel/http_client_filter.c12
-rw-r--r--src/core/channel/http_server_filter.c59
-rw-r--r--src/core/channel/noop_filter.c22
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/surface/client.c2
-rw-r--r--src/core/surface/lame_client.c6
-rw-r--r--src/core/surface/server.c11
-rw-r--r--src/core/transport/chttp2_transport.c20
-rw-r--r--src/core/transport/transport.c4
-rw-r--r--src/core/transport/transport_impl.h2
12 files changed, 65 insertions, 82 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index e6b0f7bba8..77c4951038 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -325,9 +325,12 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data, grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
+ /* TODO(ctiller): is there something useful we can do here? */
+ GPR_ASSERT(initial_op == NULL);
+
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
calld->elem = elem;
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 5a5a849907..9b7db6124a 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -102,7 +102,7 @@ static void init_call_elem(grpc_call_element *elem,
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- r = grpc_transport_1chand->transport,
+ r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
server_transport_data, initial_op);
GPR_ASSERT(r == 0);
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 7287408401..45a7436361 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -87,13 +87,11 @@ static void hc_on_recv(void *user_data, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
-static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
if (op->send_ops && !calld->sent_initial_metadata) {
size_t nops = op->send_ops->nops;
grpc_stream_op *ops = op->send_ops->ops;
@@ -123,7 +121,11 @@ static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op
op->on_done_recv = hc_on_recv;
op->recv_user_data = elem;
}
+}
+static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hc_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@@ -146,10 +148,12 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
+ if (initial_op) hc_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 31ed8a3393..e5174acd68 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -38,12 +38,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct {
- grpc_mdelem *path;
- grpc_mdelem *content_type;
- grpc_byte_buffer *content;
-} gettable;
-
typedef struct call_data {
gpr_uint8 got_initial_metadata;
gpr_uint8 seen_path;
@@ -73,9 +67,6 @@ typedef struct channel_data {
grpc_mdstr *host_key;
grpc_mdctx *mdctx;
-
- size_t gettable_count;
- gettable *gettables;
} channel_data;
/* used to silence 'variable not used' warnings */
@@ -187,12 +178,11 @@ static void hs_on_recv(void *user_data, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
-static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->send_ops && !calld->sent_status) {
size_t nops = op->send_ops->nops;
@@ -215,7 +205,11 @@ static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op
op->on_done_recv = hs_on_recv;
op->recv_user_data = elem;
}
+}
+static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hs_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@@ -238,15 +232,12 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data, grpc_transport_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
/* initialize members */
memset(calld, 0, sizeof(*calld));
+ if (initial_op) hs_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -256,9 +247,6 @@ static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
- size_t i;
- size_t gettable_capacity = 0;
-
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
@@ -284,46 +272,13 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
channeld->mdctx = mdctx;
-
- /* initialize http download support */
- channeld->gettable_count = 0;
- channeld->gettables = NULL;
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_SERVE_OVER_HTTP)) {
- gettable *g;
- gpr_slice slice;
- grpc_http_server_page *p = args->args[i].value.pointer.p;
- if (channeld->gettable_count == gettable_capacity) {
- gettable_capacity =
- GPR_MAX(gettable_capacity * 3 / 2, gettable_capacity + 1);
- channeld->gettables = gpr_realloc(channeld->gettables,
- gettable_capacity * sizeof(gettable));
- }
- g = &channeld->gettables[channeld->gettable_count++];
- g->path = grpc_mdelem_from_strings(mdctx, ":path", p->path);
- g->content_type =
- grpc_mdelem_from_strings(mdctx, "content-type", p->content_type);
- slice = gpr_slice_from_copied_string(p->content);
- g->content = grpc_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
- }
- }
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) {
- size_t i;
-
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- for (i = 0; i < channeld->gettable_count; i++) {
- grpc_mdelem_unref(channeld->gettables[i].path);
- grpc_mdelem_unref(channeld->gettables[i].content_type);
- grpc_byte_buffer_destroy(channeld->gettables[i].content);
- }
- gpr_free(channeld->gettables);
-
grpc_mdelem_unref(channeld->te_trailers);
grpc_mdelem_unref(channeld->status_ok);
grpc_mdelem_unref(channeld->status_not_found);
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index 403b60901b..62e57ce285 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -45,12 +45,7 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -58,6 +53,17 @@ static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *
ignore_unused(calld);
ignore_unused(channeld);
+ /* do nothing */
+}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ noop_mutate_op(elem, op);
+
/* pass control down the stack */
grpc_call_next_op(elem, op);
}
@@ -81,13 +87,15 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data, grpc_transport_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
/* initialize members */
calld->unused = channeld->unused;
+
+ if (initial_op) noop_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 18be81308d..c39e6cf3a4 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -288,7 +288,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
/* one ref is dropped in response to destroy, the other in
stream_closed */
gpr_ref_init(&call->internal_refcount, 2);
- grpc_call_stack_init(channel_stack, server_transport_data,
+ grpc_call_stack_init(channel_stack, server_transport_data, NULL,
CALL_STACK_FROM_CALL(call));
if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
set_deadline_alarm(call, send_deadline);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 4669c59ee4..7eb99895f7 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -67,7 +67,7 @@ static void channel_op(grpc_channel_element *elem,
}
static void init_call_elem(grpc_call_element *elem,
- const void *transport_server_data) {}
+ const void *transport_server_data, grpc_transport_op *initial_op) {}
static void destroy_call_elem(grpc_call_element *elem) {}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 4e9eb808d7..f95e2a16c2 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -66,7 +66,11 @@ static void channel_op(grpc_channel_element *elem,
}
static void init_call_elem(grpc_call_element *elem,
- const void *transport_server_data) {}
+ const void *transport_server_data, grpc_transport_op *initial_op) {
+ if (initial_op) {
+ grpc_transport_op_finish_with_failure(initial_op);
+ }
+}
static void destroy_call_elem(grpc_call_element *elem) {}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 82d1323a4b..e9d6f86734 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -439,9 +439,8 @@ static void server_on_recv(void *ptr, int success) {
calld->on_done_recv(calld->recv_user_data, success);
}
-static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->recv_ops) {
/* substitute our callback for the higher callback */
@@ -452,7 +451,11 @@ static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op
op->on_done_recv = server_on_recv;
op->recv_user_data = elem;
}
+}
+static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@@ -504,7 +507,7 @@ static void shutdown_channel(channel_data *chand) {
}
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data, grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -516,6 +519,8 @@ static void init_call_elem(grpc_call_element *elem,
gpr_mu_unlock(&chand->server->mu);
server_ref(chand->server);
+
+ server_mutate_op(elem, initial_op);
}
static void destroy_call_elem(grpc_call_element *elem) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index acdc98b86b..d7156142fb 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -378,7 +378,7 @@ static void maybe_finish_read(transport *t, stream *s);
static void maybe_join_window_updates(transport *t, stream *s);
static void finish_reads(transport *t);
static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
-
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
@@ -595,7 +595,7 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
}
static int init_stream(grpc_transport *gt, grpc_stream *gs,
- const void *server_data) {
+ const void *server_data, grpc_transport_op *initial_op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
@@ -622,6 +622,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
grpc_sopb_init(&s->callback_sopb);
grpc_chttp2_data_parser_init(&s->parser);
+ if (initial_op) perform_op_locked(t, s, initial_op);
+
if (!server_data) {
unlock(t);
}
@@ -1003,12 +1005,7 @@ static void maybe_start_some_streams(transport *t) {
}
}
-static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) {
- transport *t = (transport *)gt;
- stream *s = (stream *)gs;
-
- lock(t);
-
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
if (op->send_ops) {
GPR_ASSERT(s->outgoing_sopb == NULL);
s->send_done_closure.cb = op->on_done_send;
@@ -1053,7 +1050,14 @@ static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *o
cancel_stream(t, s, op->cancel_with_status, grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
1);
}
+}
+static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) {
+ transport *t = (transport *)gt;
+ stream *s = (stream *)gs;
+
+ lock(t);
+ perform_op_locked(t, s, op);
unlock(t);
}
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 35195348e7..ab2f7c3470 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -52,8 +52,8 @@ void grpc_transport_destroy(grpc_transport *transport) {
}
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
- const void *server_data) {
- return transport->vtable->init_stream(transport, stream, server_data);
+ const void *server_data, grpc_transport_op *initial_op) {
+ return transport->vtable->init_stream(transport, stream, server_data, initial_op);
}
void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index ef79ac0741..75d8d51e20 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -43,7 +43,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_transport *self, grpc_stream *stream,
- const void *server_data);
+ const void *server_data, grpc_transport_op *initial_op);
/* implementation of grpc_transport_send_batch */
void (*perform_op)(grpc_transport *self, grpc_stream *stream,