diff options
author | Craig Tiller <ctiller@google.com> | 2015-04-22 14:00:47 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-04-22 14:00:47 -0700 |
commit | be18b8db301b800edef382389749632e4099afaf (patch) | |
tree | ec9210bc75dacabc146298f37eb89e3070170a2f /src | |
parent | 629b0ed8041f96968e8e61c2b4992d08a38cf28a (diff) |
Beginning transport work
Diffstat (limited to 'src')
-rw-r--r-- | src/core/surface/channel_create.c | 4 | ||||
-rw-r--r-- | src/core/surface/client.c | 29 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 19 | ||||
-rw-r--r-- | src/core/surface/server.c | 124 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 10 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 88 | ||||
-rw-r--r-- | src/core/transport/transport.h | 3 | ||||
-rw-r--r-- | src/core/transport/transport_impl.h | 12 |
9 files changed, 138 insertions, 155 deletions
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 3104b1d00d..5f3e9bec0c 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -44,7 +44,6 @@ #include "src/core/channel/client_setup.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/channel/http_filter.h" #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" @@ -176,8 +175,7 @@ static void done_setup(void *sp) { static grpc_transport_setup_result complete_setup(void *channel_stack, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter}; return grpc_client_channel_transport_setup_complete( channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 2f898ff7d7..4669c59ee4 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -43,32 +43,9 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void client_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, &op->data.metadata); - break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_RECV_HALF_CLOSE: - grpc_call_read_closed(elem); - break; - case GRPC_RECV_FINISH: - grpc_call_stream_closed(elem); - break; - case GRPC_RECV_SYNTHETIC_STATUS: - grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status, - op->data.synthetic_status.message); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); - } + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -104,6 +81,6 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} const grpc_channel_filter grpc_client_surface_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client", }; diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 78170806f1..4e9eb808d7 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -46,22 +46,9 @@ typedef struct { void *unused; } call_data; typedef struct { void *unused; } channel_data; -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void lame_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_SEND_METADATA: - grpc_metadata_batch_destroy(&op->data.metadata); - grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN, - "Rpc sent on a lame channel."); - grpc_call_stream_closed(elem); - break; - default: - break; - } - - op->done_cb(op->user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(op); } static void channel_op(grpc_channel_element *elem, @@ -93,7 +80,7 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "lame-client", }; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e771929870..82d1323a4b 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -173,13 +173,19 @@ struct call_data { grpc_call *call; call_state state; - gpr_timespec deadline; grpc_mdstr *path; grpc_mdstr *host; + gpr_timespec deadline; + int got_initial_metadata; legacy_data *legacy; grpc_completion_queue *cq_new; + grpc_stream_op_buffer *recv_ops; + grpc_stream_state *recv_state; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; }; @@ -371,46 +377,6 @@ static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } -static void stream_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - break; - case PENDING: - call_list_remove(calld, PENDING_START); - /* fallthrough intended */ - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); - grpc_call_stream_closed(elem); -} - -static void read_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - case PENDING: - grpc_call_read_closed(elem); - break; - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); -} - static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *chand = elem->channel_data; @@ -425,33 +391,69 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, - grpc_call_op *op) { +static void server_on_recv(void *ptr, int success) { + grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_RECV_METADATA: + channel_data *chand = elem->channel_data; + + if (success && !calld->got_initial_metadata) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (grpc_call_recv_metadata(elem, &op->data.metadata)) { + if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) { calld->deadline = op->data.metadata.deadline; - start_new_rpc(elem); } + calld->got_initial_metadata = 1; + start_new_rpc(elem); break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_RECV_HALF_CLOSE: - read_closed(elem); - break; - case GRPC_RECV_FINISH: - stream_closed(elem); + } + } + + switch (*calld->recv_state) { + case GRPC_STREAM_OPEN: break; + case GRPC_STREAM_SEND_CLOSED: break; + case GRPC_STREAM_RECV_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } + gpr_mu_unlock(&chand->server->mu); break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); + case GRPC_STREAM_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } else if (calld->state == PENDING) { + call_list_remove(calld, PENDING_START); + } + gpr_mu_unlock(&chand->server->mu); break; } + + calld->on_done_recv(calld->recv_user_data, success); +} + +static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { + call_data *calld = elem->call_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + if (op->recv_ops) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->recv_state = op->recv_state; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = server_on_recv; + op->recv_user_data = elem; + } + + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -592,7 +594,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", }; diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index f3b9219f8b..ebde5095a9 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -33,7 +33,6 @@ #include <grpc/grpc.h> -#include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_server.h" @@ -46,8 +45,7 @@ static grpc_transport_setup_result setup_transport(void *server, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter, - &grpc_http_filter}; + static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter}; return grpc_server_setup_transport(server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 5ca31d6bc7..203de99314 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -481,12 +481,6 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, break; case GRPC_OP_METADATA: grpc_metadata_batch_assert_ok(&op->data.metadata); - case GRPC_OP_FLOW_CTL_CB: - /* these just get copied as they don't impact the number of flow - controlled bytes */ - grpc_sopb_append(outops, op, 1); - curop++; - break; case GRPC_OP_BEGIN_MESSAGE: /* begin op: for now we just convert the op to a slice and fall through - this lets us reuse the slice framing code below */ @@ -567,10 +561,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, GPR_ERROR, "These stream ops should be filtered out by grpc_chttp2_preencode"); abort(); - case GRPC_OP_FLOW_CTL_CB: - op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK); - curop++; - break; case GRPC_OP_METADATA: /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 86bd5d2098..9aee7ca4f1 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -91,10 +91,6 @@ typedef enum { /* streams that are waiting to start because there are too many concurrent streams on the connection */ WAITING_FOR_CONCURRENCY, - /* streams that want to callback the application */ - PENDING_CALLBACKS, - /* streams that *ARE* calling back to the application */ - EXECUTING_CALLBACKS, STREAM_LIST_COUNT /* must be last */ } stream_list_id; @@ -182,6 +178,18 @@ typedef struct { gpr_slice debug; } pending_goaway; +typedef struct { + void (*cb)(void *user_data, int success); + void *user_data; + int status; +} op_closure; + +typedef struct { + op_closure *callbacks; + size_t count; + size_t capacity; +} op_closure_array; + struct transport { grpc_transport base; /* must be first */ const grpc_transport_callbacks *cb; @@ -202,6 +210,10 @@ struct transport { gpr_uint8 closed; error_state error_state; + /* queued callbacks */ + op_closure_array pending_callbacks; + op_closure_array executing_callbacks; + /* stream indexing */ gpr_uint32 next_stream_id; gpr_uint32 last_incoming_stream_id; @@ -289,6 +301,9 @@ struct stream { gpr_uint8 allow_window_updates; gpr_uint8 published_close; + op_closure send_done_closure; + op_closure recv_done_closure; + stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; @@ -416,6 +431,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN); + memset(t, 0, sizeof(*t)); + t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ @@ -427,27 +444,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); t->reading = 1; - t->writing = 0; t->error_state = ERROR_STATE_NONE; t->next_stream_id = is_client ? 1 : 2; - t->last_incoming_stream_id = 0; - t->destroying = 0; - t->closed = 0; t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; - t->expect_continuation_stream_id = 0; - t->pings = NULL; - t->ping_count = 0; - t->ping_capacity = 0; t->ping_counter = gpr_now().tv_nsec; grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx); grpc_chttp2_goaway_parser_init(&t->goaway_parser); - t->pending_goaways = NULL; - t->num_pending_goaways = 0; - t->cap_pending_goaways = 0; gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->qbuf); grpc_sopb_init(&t->nuke_later_sopb); @@ -462,7 +468,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, needed. TODO(ctiller): tune this */ grpc_chttp2_stream_map_init(&t->stream_map, 8); - memset(&t->lists, 0, sizeof(t->lists)); /* copy in initial settings to all setting sets */ for (i = 0; i < NUM_SETTING_SETS; i++) { @@ -708,8 +713,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { } static void stream_list_join(transport *t, stream *s, stream_list_id id) { - if (id == PENDING_CALLBACKS) - GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE); if (s->included[id]) { return; } @@ -988,6 +991,32 @@ static void maybe_start_some_streams(transport *t) { } } +static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) { + transport *t = (transport *)gt; + stream *s = (stream *)gs; + + lock(t); + + if (op->send_ops) { + abort(); + } + + if (op->recv_ops) { + abort(); + } + + if (op->bind_pollset) { + abort(); + } + + if (op->cancel_with_status) { + abort(); + } + + unlock(t); +} + +#if 0 static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, size_t ops_count, int is_last) { transport *t = (transport *)gt; @@ -1016,6 +1045,7 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops, unlock(t); } +#endif static void abort_stream(grpc_transport *gt, grpc_stream *gs, grpc_status_code status) { @@ -1831,6 +1861,12 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed, } static int prepare_callbacks(transport *t) { + op_closure_array temp = t->pending_callbacks; + t->pending_callbacks = t->executing_callbacks; + t->executing_callbacks = temp; + return t->executing_callbacks.count > 0; + +#if 0 stream *s; int n = 0; while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { @@ -1855,16 +1891,16 @@ static int prepare_callbacks(transport *t) { } } return n; +#endif } static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { - stream *s; - while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) { - size_t nops = s->callback_sopb.nops; - s->callback_sopb.nops = 0; - cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, - s->callback_sopb.ops, nops, s->callback_state); + size_t i; + for (i = 0; i < t->executing_callbacks.count; i++) { + op_closure c = t->executing_callbacks.callbacks[i]; + c.cb(c.user_data, c.status); } + t->executing_callbacks.count = 0; } static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) { @@ -1885,8 +1921,8 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { */ static const grpc_transport_vtable vtable = { - sizeof(stream), init_stream, send_batch, set_allow_window_updates, - add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, + sizeof(stream), init_stream, perform_op, + add_to_pollset, destroy_stream, goaway, close_transport, send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 264245d351..d0007680e3 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -132,6 +132,9 @@ typedef struct grpc_transport_op { void grpc_transport_op_finish_with_failure(grpc_transport_op *op); +/* TODO(ctiller): remove this */ +void grpc_transport_add_to_pollset(grpc_transport *transport, grpc_pollset *pollset); + char *grpc_transport_op_string(grpc_transport_op *op); /* Send a batch of operations on a transport diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index ac275c7560..ef79ac0741 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -46,12 +46,8 @@ typedef struct grpc_transport_vtable { const void *server_data); /* implementation of grpc_transport_send_batch */ - void (*send_batch)(grpc_transport *self, grpc_stream *stream, - grpc_stream_op *ops, size_t ops_count, int is_last); - - /* implementation of grpc_transport_set_allow_window_updates */ - void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream, - int allow); + void (*perform_op)(grpc_transport *self, grpc_stream *stream, + grpc_transport_op *op); /* implementation of grpc_transport_add_to_pollset */ void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset); @@ -59,10 +55,6 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); - /* implementation of grpc_transport_abort_stream */ - void (*abort_stream)(grpc_transport *self, grpc_stream *stream, - grpc_status_code status); - /* implementation of grpc_transport_goaway */ void (*goaway)(grpc_transport *self, grpc_status_code status, gpr_slice debug_data); |