aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-22 14:00:47 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-22 14:00:47 -0700
commitbe18b8db301b800edef382389749632e4099afaf (patch)
treeec9210bc75dacabc146298f37eb89e3070170a2f
parent629b0ed8041f96968e8e61c2b4992d08a38cf28a (diff)
Beginning transport work
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/client.c29
-rw-r--r--src/core/surface/lame_client.c19
-rw-r--r--src/core/surface/server.c124
-rw-r--r--src/core/surface/server_chttp2.c4
-rw-r--r--src/core/transport/chttp2/stream_encoder.c10
-rw-r--r--src/core/transport/chttp2_transport.c88
-rw-r--r--src/core/transport/transport.h3
-rw-r--r--src/core/transport/transport_impl.h12
9 files changed, 138 insertions, 155 deletions
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 3104b1d00d..5f3e9bec0c 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -44,7 +44,6 @@
#include "src/core/channel/client_setup.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
@@ -176,8 +175,7 @@ static void done_setup(void *sp) {
static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter};
return grpc_client_channel_transport_setup_complete(
channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
mdctx);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 2f898ff7d7..4669c59ee4 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -43,32 +43,9 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void client_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, &op->data.metadata);
- break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_RECV_HALF_CLOSE:
- grpc_call_read_closed(elem);
- break;
- case GRPC_RECV_FINISH:
- grpc_call_stream_closed(elem);
- break;
- case GRPC_RECV_SYNTHETIC_STATUS:
- grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status,
- op->data.synthetic_status.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
- }
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -104,6 +81,6 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
const grpc_channel_filter grpc_client_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client",
};
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 78170806f1..4e9eb808d7 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -46,22 +46,9 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void lame_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_metadata_batch_destroy(&op->data.metadata);
- grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN,
- "Rpc sent on a lame channel.");
- grpc_call_stream_closed(elem);
- break;
- default:
- break;
- }
-
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(op);
}
static void channel_op(grpc_channel_element *elem,
@@ -93,7 +80,7 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"lame-client",
};
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e771929870..82d1323a4b 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -173,13 +173,19 @@ struct call_data {
grpc_call *call;
call_state state;
- gpr_timespec deadline;
grpc_mdstr *path;
grpc_mdstr *host;
+ gpr_timespec deadline;
+ int got_initial_metadata;
legacy_data *legacy;
grpc_completion_queue *cq_new;
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -371,46 +377,6 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
-static void stream_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- break;
- case PENDING:
- call_list_remove(calld, PENDING_START);
- /* fallthrough intended */
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
- grpc_call_stream_closed(elem);
-}
-
-static void read_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- case PENDING:
- grpc_call_read_closed(elem);
- break;
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
-}
-
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
@@ -425,33 +391,69 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
- grpc_call_op *op) {
+static void server_on_recv(void *ptr, int success) {
+ grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_RECV_METADATA:
+ channel_data *chand = elem->channel_data;
+
+ if (success && !calld->got_initial_metadata) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
calld->deadline = op->data.metadata.deadline;
- start_new_rpc(elem);
}
+ calld->got_initial_metadata = 1;
+ start_new_rpc(elem);
break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_RECV_HALF_CLOSE:
- read_closed(elem);
- break;
- case GRPC_RECV_FINISH:
- stream_closed(elem);
+ }
+ }
+
+ switch (*calld->recv_state) {
+ case GRPC_STREAM_OPEN: break;
+ case GRPC_STREAM_SEND_CLOSED: break;
+ case GRPC_STREAM_RECV_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
+ case GRPC_STREAM_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ } else if (calld->state == PENDING) {
+ call_list_remove(calld, PENDING_START);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
}
+
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ if (op->recv_ops) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->recv_state = op->recv_state;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_recv;
+ op->recv_user_data = elem;
+ }
+
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -592,7 +594,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
};
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index f3b9219f8b..ebde5095a9 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -33,7 +33,6 @@
#include <grpc/grpc.h>
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_server.h"
@@ -46,8 +45,7 @@
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 5ca31d6bc7..203de99314 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -481,12 +481,6 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
break;
case GRPC_OP_METADATA:
grpc_metadata_batch_assert_ok(&op->data.metadata);
- case GRPC_OP_FLOW_CTL_CB:
- /* these just get copied as they don't impact the number of flow
- controlled bytes */
- grpc_sopb_append(outops, op, 1);
- curop++;
- break;
case GRPC_OP_BEGIN_MESSAGE:
/* begin op: for now we just convert the op to a slice and fall
through - this lets us reuse the slice framing code below */
@@ -567,10 +561,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
GPR_ERROR,
"These stream ops should be filtered out by grpc_chttp2_preencode");
abort();
- case GRPC_OP_FLOW_CTL_CB:
- op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
- curop++;
- break;
case GRPC_OP_METADATA:
/* Encode a metadata batch; store the returned values, representing
a metadata element that needs to be unreffed back into the metadata
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 86bd5d2098..9aee7ca4f1 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -91,10 +91,6 @@ typedef enum {
/* streams that are waiting to start because there are too many concurrent
streams on the connection */
WAITING_FOR_CONCURRENCY,
- /* streams that want to callback the application */
- PENDING_CALLBACKS,
- /* streams that *ARE* calling back to the application */
- EXECUTING_CALLBACKS,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
@@ -182,6 +178,18 @@ typedef struct {
gpr_slice debug;
} pending_goaway;
+typedef struct {
+ void (*cb)(void *user_data, int success);
+ void *user_data;
+ int status;
+} op_closure;
+
+typedef struct {
+ op_closure *callbacks;
+ size_t count;
+ size_t capacity;
+} op_closure_array;
+
struct transport {
grpc_transport base; /* must be first */
const grpc_transport_callbacks *cb;
@@ -202,6 +210,10 @@ struct transport {
gpr_uint8 closed;
error_state error_state;
+ /* queued callbacks */
+ op_closure_array pending_callbacks;
+ op_closure_array executing_callbacks;
+
/* stream indexing */
gpr_uint32 next_stream_id;
gpr_uint32 last_incoming_stream_id;
@@ -289,6 +301,9 @@ struct stream {
gpr_uint8 allow_window_updates;
gpr_uint8 published_close;
+ op_closure send_done_closure;
+ op_closure recv_done_closure;
+
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
@@ -416,6 +431,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
+ memset(t, 0, sizeof(*t));
+
t->base.vtable = &vtable;
t->ep = ep;
/* one ref is for destroy, the other for when ep becomes NULL */
@@ -427,27 +444,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->reading = 1;
- t->writing = 0;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
- t->last_incoming_stream_id = 0;
- t->destroying = 0;
- t->closed = 0;
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
- t->expect_continuation_stream_id = 0;
- t->pings = NULL;
- t->ping_count = 0;
- t->ping_capacity = 0;
t->ping_counter = gpr_now().tv_nsec;
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- t->pending_goaways = NULL;
- t->num_pending_goaways = 0;
- t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
@@ -462,7 +468,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
needed.
TODO(ctiller): tune this */
grpc_chttp2_stream_map_init(&t->stream_map, 8);
- memset(&t->lists, 0, sizeof(t->lists));
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
@@ -708,8 +713,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
- if (id == PENDING_CALLBACKS)
- GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@@ -988,6 +991,32 @@ static void maybe_start_some_streams(transport *t) {
}
}
+static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) {
+ transport *t = (transport *)gt;
+ stream *s = (stream *)gs;
+
+ lock(t);
+
+ if (op->send_ops) {
+ abort();
+ }
+
+ if (op->recv_ops) {
+ abort();
+ }
+
+ if (op->bind_pollset) {
+ abort();
+ }
+
+ if (op->cancel_with_status) {
+ abort();
+ }
+
+ unlock(t);
+}
+
+#if 0
static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
size_t ops_count, int is_last) {
transport *t = (transport *)gt;
@@ -1016,6 +1045,7 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
unlock(t);
}
+#endif
static void abort_stream(grpc_transport *gt, grpc_stream *gs,
grpc_status_code status) {
@@ -1831,6 +1861,12 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
}
static int prepare_callbacks(transport *t) {
+ op_closure_array temp = t->pending_callbacks;
+ t->pending_callbacks = t->executing_callbacks;
+ t->executing_callbacks = temp;
+ return t->executing_callbacks.count > 0;
+
+#if 0
stream *s;
int n = 0;
while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
@@ -1855,16 +1891,16 @@ static int prepare_callbacks(transport *t) {
}
}
return n;
+#endif
}
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
- stream *s;
- while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
- size_t nops = s->callback_sopb.nops;
- s->callback_sopb.nops = 0;
- cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
- s->callback_sopb.ops, nops, s->callback_state);
+ size_t i;
+ for (i = 0; i < t->executing_callbacks.count; i++) {
+ op_closure c = t->executing_callbacks.callbacks[i];
+ c.cb(c.user_data, c.status);
}
+ t->executing_callbacks.count = 0;
}
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
@@ -1885,8 +1921,8 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
- sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
+ sizeof(stream), init_stream, perform_op,
+ add_to_pollset, destroy_stream, goaway, close_transport,
send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 264245d351..d0007680e3 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -132,6 +132,9 @@ typedef struct grpc_transport_op {
void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
+/* TODO(ctiller): remove this */
+void grpc_transport_add_to_pollset(grpc_transport *transport, grpc_pollset *pollset);
+
char *grpc_transport_op_string(grpc_transport_op *op);
/* Send a batch of operations on a transport
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index ac275c7560..ef79ac0741 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -46,12 +46,8 @@ typedef struct grpc_transport_vtable {
const void *server_data);
/* implementation of grpc_transport_send_batch */
- void (*send_batch)(grpc_transport *self, grpc_stream *stream,
- grpc_stream_op *ops, size_t ops_count, int is_last);
-
- /* implementation of grpc_transport_set_allow_window_updates */
- void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream,
- int allow);
+ void (*perform_op)(grpc_transport *self, grpc_stream *stream,
+ grpc_transport_op *op);
/* implementation of grpc_transport_add_to_pollset */
void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
@@ -59,10 +55,6 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
- /* implementation of grpc_transport_abort_stream */
- void (*abort_stream)(grpc_transport *self, grpc_stream *stream,
- grpc_status_code status);
-
/* implementation of grpc_transport_goaway */
void (*goaway)(grpc_transport *self, grpc_status_code status,
gpr_slice debug_data);