diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 33 | ||||
-rw-r--r-- | src/core/surface/channel.c | 33 | ||||
-rw-r--r-- | src/core/surface/channel.h | 3 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 235 | ||||
-rw-r--r-- | src/core/surface/client.c | 89 | ||||
-rw-r--r-- | src/core/surface/client.h | 41 | ||||
-rw-r--r-- | src/core/surface/init.c | 14 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 49 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 254 | ||||
-rw-r--r-- | src/core/surface/server.c | 255 | ||||
-rw-r--r-- | src/core/surface/server.h | 8 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 19 |
12 files changed, 460 insertions, 573 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index f6aeed856b..fc09137b67 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -262,8 +262,8 @@ struct grpc_call { static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); static void call_on_done_recv(void *call, int success); static void call_on_done_send(void *call, int success); -static int fill_send_ops(grpc_call *call, grpc_transport_op *op); -static void execute_op(grpc_call *call, grpc_transport_op *op); +static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op); +static void execute_op(grpc_call *call, grpc_transport_stream_op *op); static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, @@ -279,8 +279,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, size_t add_initial_metadata_count, gpr_timespec send_deadline) { size_t i; - grpc_transport_op initial_op; - grpc_transport_op *initial_op_ptr = NULL; + grpc_transport_stream_op initial_op; + grpc_transport_stream_op *initial_op_ptr = NULL; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); @@ -464,12 +464,11 @@ static int need_more_data(grpc_call *call) { (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK) || - call->destroy_called; + (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called; } static void unlock(grpc_call *call) { - grpc_transport_op op; + grpc_transport_stream_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; int start_op = 0; @@ -888,7 +887,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, } } -static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { +static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { grpc_ioreq_data data; gpr_uint32 flags; grpc_metadata_batch mdb; @@ -1144,7 +1143,7 @@ static void finished_loose_op_allocated(void *alloc, int success) { gpr_free(args); } -static void execute_op(grpc_call *call, grpc_transport_op *op) { +static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { grpc_call_element *elem; GPR_ASSERT(op->on_consumed == NULL); @@ -1155,14 +1154,15 @@ static void execute_op(grpc_call *call, grpc_transport_op *op) { } else { finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); args->call = call; - grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args); + grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, + args); op->on_consumed = &args->closure; } } elem = CALL_ELEM_FROM_CALL(call, 0); op->context = call->context; - elem->filter->start_transport_op(elem, op); + elem->filter->start_transport_stream_op(elem, op); } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -1229,13 +1229,13 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { } else { gpr_uint32 parsed_clevel_bytes; if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &parsed_clevel_bytes)) { + GPR_SLICE_LENGTH(md->value->slice), + &parsed_clevel_bytes)) { /* the following cast is safe, as a gpr_uint32 should be able to hold all * possible values of the grpc_compression_level enum */ - clevel = (grpc_compression_level) parsed_clevel_bytes; + clevel = (grpc_compression_level)parsed_clevel_bytes; } else { - clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ } grpc_mdelem_set_user_data(md, destroy_compression, (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); @@ -1258,7 +1258,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { + } else if (key == + grpc_channel_get_compresssion_level_string(call->channel)) { set_decode_compression_level(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a3c4dcebc1..f8151c121c 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -39,7 +39,6 @@ #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" -#include "src/core/surface/client.h" #include "src/core/surface/init.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -94,9 +93,8 @@ grpc_channel *grpc_channel_create_from_filters( grpc_channel *channel = gpr_malloc(size); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; - /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if - * is_client */ - gpr_ref_init(&channel->refs, 1 + is_client); + /* decremented by grpc_channel_destroy */ + gpr_ref_init(&channel->refs, 1); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_compression_level_string = @@ -111,8 +109,6 @@ grpc_channel *grpc_channel_create_from_filters( } channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); - grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, - CHANNEL_STACK_FROM_CHANNEL(channel)); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; @@ -133,6 +129,10 @@ grpc_channel *grpc_channel_create_from_filters( } } + grpc_channel_stack_init(filters, num_filters, channel, args, + channel->metadata_context, + CHANNEL_STACK_FROM_CHANNEL(channel)); + return channel; } @@ -239,28 +239,16 @@ void grpc_channel_internal_unref(grpc_channel *channel) { } void grpc_channel_destroy(grpc_channel *channel) { - grpc_channel_op op; + grpc_transport_op op; grpc_channel_element *elem; - + memset(&op, 0, sizeof(op)); + op.disconnect = 1; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - - op.type = GRPC_CHANNEL_GOAWAY; - op.dir = GRPC_CALL_DOWN; - op.data.goaway.status = GRPC_STATUS_OK; - op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect"); - elem->filter->channel_op(elem, NULL, &op); - - op.type = GRPC_CHANNEL_DISCONNECT; - op.dir = GRPC_CALL_DOWN; - elem->filter->channel_op(elem, NULL, &op); + elem->filter->start_transport_op(elem, &op); GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel"); } -void grpc_client_channel_closed(grpc_channel_element *elem) { - GRPC_CHANNEL_INTERNAL_UNREF(CHANNEL_FROM_TOP_ELEM(elem), "closed"); -} - grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } @@ -277,7 +265,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { return channel->grpc_compression_level_string; } - grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { return grpc_mdelem_ref(channel->grpc_status_elem[i]); diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 3c04676b43..71f8a55731 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H #include "src/core/channel/channel_stack.h" +#include "src/core/client_config/subchannel_factory.h" grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t count, @@ -57,8 +58,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); -void grpc_client_channel_closed(grpc_channel_element *elem); - #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_channel *channel, const char *reason); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index d069a04a9a..e205f0a9f8 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -31,159 +31,120 @@ * */ -#include "src/core/iomgr/sockaddr.h" - #include <grpc/grpc.h> #include <stdlib.h> #include <string.h> -#include "src/core/channel/census_filter.h" +#include <grpc/support/alloc.h> + #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" -#include "src/core/channel/client_setup.h" -#include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/resolve_address.h" +#include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/tcp_client.h" #include "src/core/surface/channel.h" -#include "src/core/surface/client.h" -#include "src/core/support/string.h" #include "src/core/transport/chttp2_transport.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/useful.h> - -typedef struct setup setup; -/* A single setup request (started via initiate) */ typedef struct { - grpc_client_setup_request *cs_request; - setup *setup; - /* Resolved addresses, or null if resolution not yet completed */ - grpc_resolved_addresses *resolved; - /* which address in resolved should we pick for the next connection attempt */ - size_t resolved_index; -} request; - -/* Global setup logic (may be running many simultaneous setup requests, but - with only one 'active' */ -struct setup { - const char *target; - grpc_transport_setup_callback setup_callback; - void *setup_user_data; -}; - -static int maybe_try_next_resolved(request *r); - -static void done(request *r, int was_successful) { - grpc_client_setup_request_finish(r->cs_request, was_successful); - if (r->resolved) { - grpc_resolved_addresses_destroy(r->resolved); - } - gpr_free(r); + grpc_connector base; + gpr_refcount refs; + + grpc_iomgr_closure *notify; + grpc_connect_in_args args; + grpc_connect_out_args *result; +} connector; + +static void connector_ref(grpc_connector *con) { + connector *c = (connector *)con; + gpr_ref(&c->refs); } -/* connection callback: tcp is either valid, or null on error */ -static void on_connect(void *rp, grpc_endpoint *tcp) { - request *r = rp; - - if (!grpc_client_setup_request_should_continue(r->cs_request, "on_connect")) { - if (tcp) { - grpc_endpoint_shutdown(tcp); - grpc_endpoint_destroy(tcp); - } - done(r, 0); - return; +static void connector_unref(grpc_connector *con) { + connector *c = (connector *)con; + if (gpr_unref(&c->refs)) { + gpr_free(c); } +} - if (!tcp) { - if (!maybe_try_next_resolved(r)) { - done(r, 0); - return; - } else { - return; - } - } else if (grpc_client_setup_cb_begin(r->cs_request, "on_connect")) { - grpc_create_chttp2_transport( - r->setup->setup_callback, r->setup->setup_user_data, - grpc_client_setup_get_channel_args(r->cs_request), tcp, NULL, 0, - grpc_client_setup_get_mdctx(r->cs_request), 1); - grpc_client_setup_cb_end(r->cs_request, "on_connect"); - done(r, 1); - return; +static void connected(void *arg, grpc_endpoint *tcp) { + connector *c = arg; + grpc_iomgr_closure *notify; + if (tcp != NULL) { + c->result->transport = grpc_create_chttp2_transport( + c->args.channel_args, tcp, c->args.metadata_context, 1); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); + GPR_ASSERT(c->result->transport); + c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); + c->result->filters[0] = &grpc_http_client_filter; + c->result->num_filters = 1; } else { - done(r, 0); + memset(c->result, 0, sizeof(*c->result)); } + notify = c->notify; + c->notify = NULL; + grpc_iomgr_add_callback(notify); } -/* attempt to connect to the next available resolved address */ -static int maybe_try_next_resolved(request *r) { - grpc_resolved_address *addr; - if (!r->resolved) return 0; - if (r->resolved_index == r->resolved->naddrs) return 0; - addr = &r->resolved->addrs[r->resolved_index++]; - grpc_tcp_client_connect( - on_connect, r, grpc_client_setup_get_interested_parties(r->cs_request), - (struct sockaddr *)&addr->addr, addr->len, - grpc_client_setup_request_deadline(r->cs_request)); - return 1; +static void connector_connect(grpc_connector *con, + const grpc_connect_in_args *args, + grpc_connect_out_args *result, + grpc_iomgr_closure *notify) { + connector *c = (connector *)con; + GPR_ASSERT(c->notify == NULL); + GPR_ASSERT(notify->cb); + c->notify = notify; + c->args = *args; + c->result = result; + grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, + args->addr_len, args->deadline); } -/* callback for when our target address has been resolved */ -static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { - request *r = rp; - - /* if we're not still the active request, abort */ - if (!grpc_client_setup_request_should_continue(r->cs_request, - "on_resolved")) { - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - done(r, 0); - return; - } +static const grpc_connector_vtable connector_vtable = { + connector_ref, connector_unref, connector_connect}; - if (!resolved) { - done(r, 0); - return; - } else { - r->resolved = resolved; - r->resolved_index = 0; - if (!maybe_try_next_resolved(r)) { - done(r, 0); - } - } +typedef struct { + grpc_subchannel_factory base; + gpr_refcount refs; + grpc_mdctx *mdctx; + grpc_channel_args *merge_args; +} subchannel_factory; + +static void subchannel_factory_ref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + gpr_ref(&f->refs); } -static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) { - request *r = gpr_malloc(sizeof(request)); - r->setup = sp; - r->cs_request = cs_request; - r->resolved = NULL; - r->resolved_index = 0; - /* TODO(klempner): Make grpc_resolve_address respect deadline */ - grpc_resolve_address(r->setup->target, "http", on_resolved, r); +static void subchannel_factory_unref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + if (gpr_unref(&f->refs)) { + grpc_channel_args_destroy(f->merge_args); + grpc_mdctx_unref(f->mdctx); + gpr_free(f); + } } -static void done_setup(void *sp) { - setup *s = sp; - gpr_free((void *)s->target); - gpr_free(s); +static grpc_subchannel *subchannel_factory_create_subchannel( + grpc_subchannel_factory *scf, grpc_subchannel_args *args) { + subchannel_factory *f = (subchannel_factory *)scf; + connector *c = gpr_malloc(sizeof(*c)); + grpc_channel_args *final_args = + grpc_channel_args_merge(args->args, f->merge_args); + grpc_subchannel *s; + memset(c, 0, sizeof(*c)); + c->base.vtable = &connector_vtable; + gpr_ref_init(&c->refs, 1); + args->mdctx = f->mdctx; + args->args = final_args; + s = grpc_subchannel_create(&c->base, args); + grpc_connector_unref(&c->base); + grpc_channel_args_destroy(final_args); + return s; } -static grpc_transport_setup_result complete_setup(void *channel_stack, - grpc_transport *transport, - grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = { - &grpc_http_client_filter}; - return grpc_client_channel_transport_setup_complete( - channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), - mdctx); -} +static const grpc_subchannel_factory_vtable subchannel_factory_vtable = { + subchannel_factory_ref, subchannel_factory_unref, + subchannel_factory_create_subchannel}; /* Create a client channel: Asynchronously: - resolve target @@ -191,28 +152,36 @@ static grpc_transport_setup_result complete_setup(void *channel_stack, - perform handshakes */ grpc_channel *grpc_channel_create(const char *target, const grpc_channel_args *args) { - setup *s = gpr_malloc(sizeof(setup)); - grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; + grpc_resolver *resolver; + subchannel_factory *f; + grpc_mdctx *mdctx = grpc_mdctx_create(); int n = 0; - filters[n++] = &grpc_client_surface_filter; /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; } */ filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); - s->target = gpr_strdup(target); - s->setup_callback = complete_setup; - s->setup_user_data = grpc_channel_get_channel_stack(channel); + f = gpr_malloc(sizeof(*f)); + f->base.vtable = &subchannel_factory_vtable; + gpr_ref_init(&f->refs, 1); + grpc_mdctx_ref(mdctx); + f->mdctx = mdctx; + f->merge_args = grpc_channel_args_copy(args); + resolver = grpc_resolver_create(target, &f->base); + if (!resolver) { + return NULL; + } - grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel), - args, mdctx, initiate_setup, done_setup, - s); + channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); + grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), + resolver); + GRPC_RESOLVER_UNREF(resolver, "create"); + grpc_subchannel_factory_unref(&f->base); return channel; } diff --git a/src/core/surface/client.c b/src/core/surface/client.c deleted file mode 100644 index 8ac4dd1e0e..0000000000 --- a/src/core/surface/client.c +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/surface/client.h" - -#include "src/core/surface/call.h" -#include "src/core/surface/channel.h" -#include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> - -typedef struct { void *unused; } call_data; - -typedef struct { void *unused; } channel_data; - -static void client_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_call_next_op(elem, op); -} - -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - switch (op->type) { - case GRPC_ACCEPT_CALL: - gpr_log(GPR_ERROR, "Client cannot accept new calls"); - break; - case GRPC_TRANSPORT_CLOSED: - grpc_client_channel_closed(elem); - break; - case GRPC_TRANSPORT_GOAWAY: - gpr_slice_unref(op->data.goaway.message); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_channel_next_op(elem, op); - } -} - -static void init_call_elem(grpc_call_element *elem, - const void *transport_server_data, - grpc_transport_op *initial_op) {} - -static void destroy_call_elem(grpc_call_element *elem) {} - -static void init_channel_elem(grpc_channel_element *elem, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { - GPR_ASSERT(is_first); - GPR_ASSERT(!is_last); -} - -static void destroy_channel_elem(grpc_channel_element *elem) {} - -const grpc_channel_filter grpc_client_surface_filter = { - client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "client", -}; diff --git a/src/core/surface/client.h b/src/core/surface/client.h deleted file mode 100644 index 9db2ccf3d2..0000000000 --- a/src/core/surface/client.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_SURFACE_CLIENT_H -#define GRPC_INTERNAL_CORE_SURFACE_CLIENT_H - -#include "src/core/channel/channel_stack.h" - -extern const grpc_channel_filter grpc_client_surface_filter; - -#endif /* GRPC_INTERNAL_CORE_SURFACE_CLIENT_H */ diff --git a/src/core/surface/init.c b/src/core/surface/init.c index ca61a38a35..3847ded28c 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -31,9 +31,13 @@ * */ +#include <grpc/support/port_platform.h> + #include <grpc/census.h> #include <grpc/grpc.h> #include "src/core/channel/channel_stack.h" +#include "src/core/client_config/resolver_registry.h" +#include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/iomgr.h" #include "src/core/profiling/timers.h" @@ -42,6 +46,10 @@ #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" +#ifdef GPR_POSIX_SOCKET +#include "src/core/client_config/resolvers/unix_resolver_posix.h" +#endif + static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; static int g_initializations; @@ -56,6 +64,11 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { + grpc_resolver_registry_init("dns:///"); + grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create()); +#ifdef GPR_POSIX_SOCKET + grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create()); +#endif grpc_register_tracer("channel", &grpc_trace_channel); grpc_register_tracer("surface", &grpc_surface_trace); grpc_register_tracer("http", &grpc_http_trace); @@ -79,6 +92,7 @@ void grpc_shutdown(void) { census_shutdown(); grpc_timers_global_destroy(); grpc_tracer_shutdown(); + grpc_resolver_registry_shutdown(); } gpr_mu_unlock(&g_init_mu); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 85e1ab5554..3dd56fe5a9 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -49,16 +49,16 @@ typedef struct { typedef struct { grpc_mdctx *mdctx; } channel_data; -static void lame_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { +static void lame_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->send_ops) { + if (op->send_ops != NULL) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); op->on_done_send->cb(op->on_done_send->cb_arg, 0); } - if (op->recv_ops) { + if (op->recv_ops != NULL) { char tmp[GPR_LTOA_MIN_BUFSIZE]; grpc_metadata_batch mdb; gpr_ltoa(GRPC_STATUS_UNKNOWN, tmp); @@ -77,36 +77,35 @@ static void lame_start_transport_op(grpc_call_element *elem, *op->recv_state = GRPC_STREAM_CLOSED; op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); } - if (op->on_consumed) { + if (op->on_consumed != NULL) { op->on_consumed->cb(op->on_consumed->cb_arg, 0); } } -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - switch (op->type) { - case GRPC_CHANNEL_GOAWAY: - gpr_slice_unref(op->data.goaway.message); - break; - case GRPC_CHANNEL_DISCONNECT: - grpc_client_channel_closed(elem); - break; - default: - break; +static void lame_start_transport_op(grpc_channel_element *elem, + grpc_transport_op *op) { + if (op->on_connectivity_state_change) { + GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); + *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; + op->on_connectivity_state_change->cb( + op->on_connectivity_state_change->cb_arg, 1); + } + if (op->on_consumed != NULL) { + op->on_consumed->cb(op->on_consumed->cb_arg, 1); } } static void init_call_elem(grpc_call_element *elem, const void *transport_server_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { if (initial_op) { - grpc_transport_op_finish_with_failure(initial_op); + grpc_transport_stream_op_finish_with_failure(initial_op); } } static void destroy_call_elem(grpc_call_element *elem) {} -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *chand = elem->channel_data; @@ -118,9 +117,15 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - lame_start_transport_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "lame-client", + lame_start_transport_stream_op, + lame_start_transport_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "lame-client", }; grpc_channel *grpc_lame_client_channel_create(void) { diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index fae3e4e90a..34ee3f8400 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -31,177 +31,147 @@ * */ -#include "src/core/iomgr/sockaddr.h" - #include <grpc/grpc.h> #include <stdlib.h> #include <string.h> -#include "src/core/channel/census_filter.h" +#include <grpc/support/alloc.h> + #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" -#include "src/core/channel/client_setup.h" -#include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/iomgr/resolve_address.h" +#include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/tcp_client.h" #include "src/core/security/auth_filters.h" #include "src/core/security/credentials.h" #include "src/core/security/secure_transport_setup.h" -#include "src/core/support/string.h" #include "src/core/surface/channel.h" -#include "src/core/surface/client.h" #include "src/core/transport/chttp2_transport.h" -#include <grpc/grpc_security.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/useful.h> #include "src/core/tsi/transport_security_interface.h" -typedef struct setup setup; - -/* A single setup request (started via initiate) */ typedef struct { - grpc_client_setup_request *cs_request; - setup *setup; - /* Resolved addresses, or null if resolution not yet completed. */ - grpc_resolved_addresses *resolved; - /* which address in resolved should we pick for the next connection attempt */ - size_t resolved_index; -} request; + grpc_connector base; + gpr_refcount refs; -struct setup { grpc_channel_security_connector *security_connector; - const char *target; - grpc_transport_setup_callback setup_callback; - void *setup_user_data; -}; -static int maybe_try_next_resolved(request *r); + grpc_iomgr_closure *notify; + grpc_connect_in_args args; + grpc_connect_out_args *result; +} connector; -static void done(request *r, int was_successful) { - grpc_client_setup_request_finish(r->cs_request, was_successful); - if (r->resolved) { - grpc_resolved_addresses_destroy(r->resolved); +static void connector_ref(grpc_connector *con) { + connector *c = (connector *)con; + gpr_ref(&c->refs); +} + +static void connector_unref(grpc_connector *con) { + connector *c = (connector *)con; + if (gpr_unref(&c->refs)) { + gpr_free(c); } - gpr_free(r); } -static void on_secure_transport_setup_done(void *rp, +static void on_secure_transport_setup_done(void *arg, grpc_security_status status, grpc_endpoint *secure_endpoint) { - request *r = rp; + connector *c = arg; + grpc_iomgr_closure *notify; if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); - done(r, 0); - } else if (grpc_client_setup_cb_begin(r->cs_request, - "on_secure_transport_setup_done")) { - grpc_create_chttp2_transport( - r->setup->setup_callback, r->setup->setup_user_data, - grpc_client_setup_get_channel_args(r->cs_request), secure_endpoint, - NULL, 0, grpc_client_setup_get_mdctx(r->cs_request), 1); - grpc_client_setup_cb_end(r->cs_request, "on_secure_transport_setup_done"); - done(r, 1); + memset(c->result, 0, sizeof(*c->result)); } else { - done(r, 0); + c->result->transport = grpc_create_chttp2_transport( + c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); + c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); + c->result->filters[0] = &grpc_client_auth_filter; + c->result->filters[1] = &grpc_http_client_filter; + c->result->num_filters = 2; } + notify = c->notify; + c->notify = NULL; + grpc_iomgr_add_callback(notify); } -/* connection callback: tcp is either valid, or null on error */ -static void on_connect(void *rp, grpc_endpoint *tcp) { - request *r = rp; - - if (!grpc_client_setup_request_should_continue(r->cs_request, - "on_connect.secure")) { - if (tcp) { - grpc_endpoint_shutdown(tcp); - grpc_endpoint_destroy(tcp); - } - done(r, 0); - return; - } - - if (!tcp) { - if (!maybe_try_next_resolved(r)) { - done(r, 0); - return; - } else { - return; - } +static void connected(void *arg, grpc_endpoint *tcp) { + connector *c = arg; + grpc_iomgr_closure *notify; + if (tcp != NULL) { + grpc_setup_secure_transport(&c->security_connector->base, tcp, + on_secure_transport_setup_done, c); } else { - grpc_setup_secure_transport(&r->setup->security_connector->base, tcp, - on_secure_transport_setup_done, r); + memset(c->result, 0, sizeof(*c->result)); + notify = c->notify; + c->notify = NULL; + grpc_iomgr_add_callback(notify); } } -/* attempt to connect to the next available resolved address */ -static int maybe_try_next_resolved(request *r) { - grpc_resolved_address *addr; - if (!r->resolved) return 0; - if (r->resolved_index == r->resolved->naddrs) return 0; - addr = &r->resolved->addrs[r->resolved_index++]; - grpc_tcp_client_connect( - on_connect, r, grpc_client_setup_get_interested_parties(r->cs_request), - (struct sockaddr *)&addr->addr, addr->len, - grpc_client_setup_request_deadline(r->cs_request)); - return 1; +static void connector_connect(grpc_connector *con, + const grpc_connect_in_args *args, + grpc_connect_out_args *result, + grpc_iomgr_closure *notify) { + connector *c = (connector *)con; + GPR_ASSERT(c->notify == NULL); + GPR_ASSERT(notify->cb); + c->notify = notify; + c->args = *args; + c->result = result; + grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, + args->addr_len, args->deadline); } -/* callback for when our target address has been resolved */ -static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { - request *r = rp; +static const grpc_connector_vtable connector_vtable = { + connector_ref, connector_unref, connector_connect}; - /* if we're not still the active request, abort */ - if (!grpc_client_setup_request_should_continue(r->cs_request, - "on_resolved.secure")) { - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - done(r, 0); - return; - } +typedef struct { + grpc_subchannel_factory base; + gpr_refcount refs; + grpc_mdctx *mdctx; + grpc_channel_args *merge_args; + grpc_channel_security_connector *security_connector; +} subchannel_factory; - if (!resolved) { - done(r, 0); - return; - } else { - r->resolved = resolved; - r->resolved_index = 0; - if (!maybe_try_next_resolved(r)) { - done(r, 0); - } - } +static void subchannel_factory_ref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + gpr_ref(&f->refs); } -static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) { - request *r = gpr_malloc(sizeof(request)); - r->setup = sp; - r->cs_request = cs_request; - r->resolved = NULL; - r->resolved_index = 0; - /* TODO(klempner): Make grpc_resolve_address respect deadline */ - grpc_resolve_address(r->setup->target, "https", on_resolved, r); +static void subchannel_factory_unref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + if (gpr_unref(&f->refs)) { + GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, + "subchannel_factory"); + grpc_channel_args_destroy(f->merge_args); + grpc_mdctx_unref(f->mdctx); + gpr_free(f); + } } -static void done_setup(void *sp) { - setup *s = sp; - gpr_free((void *)s->target); - grpc_security_connector_unref(&s->security_connector->base); - gpr_free(s); +static grpc_subchannel *subchannel_factory_create_subchannel( + grpc_subchannel_factory *scf, grpc_subchannel_args *args) { + subchannel_factory *f = (subchannel_factory *)scf; + connector *c = gpr_malloc(sizeof(*c)); + grpc_channel_args *final_args = + grpc_channel_args_merge(args->args, f->merge_args); + grpc_subchannel *s; + memset(c, 0, sizeof(*c)); + c->base.vtable = &connector_vtable; + c->security_connector = f->security_connector; + gpr_ref_init(&c->refs, 1); + args->mdctx = f->mdctx; + args->args = final_args; + s = grpc_subchannel_create(&c->base, args); + grpc_connector_unref(&c->base); + grpc_channel_args_destroy(final_args); + return s; } -static grpc_transport_setup_result complete_setup(void *channel_stack, - grpc_transport *transport, - grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = { - &grpc_client_auth_filter, &grpc_http_client_filter}; - return grpc_client_channel_transport_setup_complete( - channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), - mdctx); -} +static const grpc_subchannel_factory_vtable subchannel_factory_vtable = { + subchannel_factory_ref, subchannel_factory_unref, + subchannel_factory_create_subchannel}; /* Create a secure client channel: Asynchronously: - resolve target @@ -210,13 +180,14 @@ static grpc_transport_setup_result complete_setup(void *channel_stack, grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, const char *target, const grpc_channel_args *args) { - setup *s; grpc_channel *channel; grpc_arg connector_arg; grpc_channel_args *args_copy; grpc_channel_args *new_args_from_connector; grpc_channel_security_connector *connector; grpc_mdctx *mdctx; + grpc_resolver *resolver; + subchannel_factory *f; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; int n = 0; @@ -233,30 +204,41 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, } mdctx = grpc_mdctx_create(); - s = gpr_malloc(sizeof(setup)); connector_arg = grpc_security_connector_to_arg(&connector->base); args_copy = grpc_channel_args_copy_and_add( new_args_from_connector != NULL ? new_args_from_connector : args, - &connector_arg); - filters[n++] = &grpc_client_surface_filter; + &connector_arg, 1); /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; } */ filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); + + f = gpr_malloc(sizeof(*f)); + f->base.vtable = &subchannel_factory_vtable; + gpr_ref_init(&f->refs, 1); + grpc_mdctx_ref(mdctx); + f->mdctx = mdctx; + GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory"); + f->security_connector = connector; + f->merge_args = grpc_channel_args_copy(args_copy); + resolver = grpc_resolver_create(target, &f->base); + if (!resolver) { + return NULL; + } + channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); + grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), + resolver); + GRPC_RESOLVER_UNREF(resolver, "create"); + grpc_subchannel_factory_unref(&f->base); + GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create"); + grpc_channel_args_destroy(args_copy); if (new_args_from_connector != NULL) { grpc_channel_args_destroy(new_args_from_connector); } - s->target = gpr_strdup(target); - s->setup_callback = complete_setup; - s->setup_user_data = grpc_channel_get_channel_stack(channel); - s->security_connector = connector; - grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel), - args, mdctx, initiate_setup, done_setup, - s); return channel; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 13ec5bee94..f29d47c17c 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -115,6 +115,7 @@ typedef struct channel_registered_method { struct channel_data { grpc_server *server; size_t num_calls; + grpc_connectivity_state connectivity_state; grpc_channel *channel; grpc_mdstr *path_key; grpc_mdstr *authority_key; @@ -125,6 +126,7 @@ struct channel_data { gpr_uint32 registered_method_slots; gpr_uint32 registered_method_max_probes; grpc_iomgr_closure finish_destroy_channel_closure; + grpc_iomgr_closure channel_connectivity_changed; }; typedef struct shutdown_tag { @@ -149,7 +151,7 @@ struct grpc_server { before mu_call. This is currently used in shutdown processing (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ gpr_mu mu_global; /* mutex for server and channel state */ - gpr_mu mu_call; /* mutex for call-specific state */ + gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; requested_call_array requested_calls; @@ -200,18 +202,101 @@ struct call_data { call_link links[CALL_LIST_COUNT]; }; +typedef struct { + grpc_channel **channels; + grpc_channel **disconnects; + size_t num_channels; + size_t num_disconnects; +} channel_broadcaster; + #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) static void begin_call(grpc_server *server, call_data *calld, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc); -static void shutdown_channel(channel_data *chand, int send_goaway, - int send_disconnect); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ static void maybe_finish_shutdown(grpc_server *server); +/* channel broadcaster */ + +/* assumes server locked */ +static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { + channel_data *c; + size_t count = 0; + size_t dc_count = 0; + for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { + count++; + if (c->num_calls == 0) { + dc_count++; + } + } + cb->num_channels = count; + cb->num_disconnects = dc_count; + cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); + cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects); + count = 0; + dc_count = 0; + for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { + cb->channels[count++] = c->channel; + GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); + if (c->num_calls == 0) { + cb->disconnects[dc_count++] = c->channel; + GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect"); + } + } +} + +struct shutdown_cleanup_args { + grpc_iomgr_closure closure; + gpr_slice slice; +}; + +static void shutdown_cleanup(void *arg, int iomgr_status_ignored) { + struct shutdown_cleanup_args *a = arg; + gpr_slice_unref(a->slice); + gpr_free(a); +} + +static void send_shutdown(grpc_channel *channel, int send_goaway, + int send_disconnect) { + grpc_transport_op op; + struct shutdown_cleanup_args *sc; + grpc_channel_element *elem; + + memset(&op, 0, sizeof(op)); + op.send_goaway = send_goaway; + sc = gpr_malloc(sizeof(*sc)); + sc->slice = gpr_slice_from_copied_string("Server shutdown"); + op.goaway_message = &sc->slice; + op.goaway_status = GRPC_STATUS_OK; + op.disconnect = send_disconnect; + grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc); + op.on_consumed = &sc->closure; + + elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); + elem->filter->start_transport_op(elem, &op); +} + +static void channel_broadcaster_shutdown(channel_broadcaster *cb, + int send_goaway, int send_disconnect) { + size_t i; + + for (i = 0; i < cb->num_channels; i++) { + send_shutdown(cb->channels[i], 1, 0); + GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); + } + for (i = 0; i < cb->num_disconnects; i++) { + send_shutdown(cb->disconnects[i], 0, 1); + GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect"); + } + gpr_free(cb->channels); + gpr_free(cb->disconnects); +} + +/* call list */ + static int call_list_join(call_data **root, call_data *call, call_list list) { GPR_ASSERT(!call->root[list]); call->root[list] = root; @@ -456,12 +541,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void decrement_call_count(channel_data *chand) { +static int decrement_call_count(channel_data *chand) { + int disconnect = 0; chand->num_calls--; if (0 == chand->num_calls && chand->server->shutdown) { - shutdown_channel(chand, 0, 1); + disconnect = 1; } maybe_finish_shutdown(chand->server); + return disconnect; } static void server_on_recv(void *ptr, int success) { @@ -469,6 +556,7 @@ static void server_on_recv(void *ptr, int success) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int remove_res; + int disconnect = 0; if (success && !calld->got_initial_metadata) { size_t i; @@ -517,16 +605,24 @@ static void server_on_recv(void *ptr, int success) { gpr_mu_unlock(&chand->server->mu_call); gpr_mu_lock(&chand->server->mu_global); if (remove_res) { - decrement_call_count(chand); + disconnect = decrement_call_count(chand); + if (disconnect) { + GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); + } } gpr_mu_unlock(&chand->server->mu_global); + if (disconnect) { + send_shutdown(chand->channel, 0, 1); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); + } break; } calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } -static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { +static void server_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; if (op->recv_ops) { @@ -538,92 +634,43 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { } } -static void server_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { +static void server_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(elem, op); } -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - channel_data *chand = elem->channel_data; - grpc_server *server = chand->server; - - switch (op->type) { - case GRPC_ACCEPT_CALL: - /* create a call */ - grpc_call_create(chand->channel, NULL, - op->data.accept_call.transport_server_data, NULL, 0, - gpr_inf_future); - break; - case GRPC_TRANSPORT_CLOSED: - /* if the transport is closed for a server channel, we destroy the - channel */ - gpr_mu_lock(&server->mu_global); - server_ref(server); - destroy_channel(chand); - gpr_mu_unlock(&server->mu_global); - server_unref(server); - break; - case GRPC_TRANSPORT_GOAWAY: - gpr_slice_unref(op->data.goaway.message); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_channel_next_op(elem, op); - break; - } +static void accept_stream(void *cd, grpc_transport *transport, + const void *transport_server_data) { + channel_data *chand = cd; + /* create a call */ + grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0, + gpr_inf_future); } -typedef struct { - channel_data *chand; - int send_goaway; - int send_disconnect; - grpc_iomgr_closure finish_shutdown_channel_closure; -} shutdown_channel_args; - -static void finish_shutdown_channel(void *p, int success) { - shutdown_channel_args *sca = p; - grpc_channel_op op; - - if (sca->send_goaway) { - op.type = GRPC_CHANNEL_GOAWAY; - op.dir = GRPC_CALL_DOWN; - op.data.goaway.status = GRPC_STATUS_OK; - op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown"); - channel_op(grpc_channel_stack_element( - grpc_channel_get_channel_stack(sca->chand->channel), 0), - NULL, &op); - } - if (sca->send_disconnect) { - op.type = GRPC_CHANNEL_DISCONNECT; - op.dir = GRPC_CALL_DOWN; - channel_op(grpc_channel_stack_element( - grpc_channel_get_channel_stack(sca->chand->channel), 0), - NULL, &op); +static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { + channel_data *chand = cd; + grpc_server *server = chand->server; + if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.on_connectivity_state_change = &chand->channel_connectivity_changed, + op.connectivity_state = &chand->connectivity_state; + grpc_channel_next_op(grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + &op); + } else { + gpr_mu_lock(&server->mu_global); + destroy_channel(chand); + gpr_mu_unlock(&server->mu_global); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity"); } - GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown"); - - gpr_free(sca); -} - -static void shutdown_channel(channel_data *chand, int send_goaway, - int send_disconnect) { - shutdown_channel_args *sca; - GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown"); - sca = gpr_malloc(sizeof(shutdown_channel_args)); - sca->chand = chand; - sca->send_goaway = send_goaway; - sca->send_disconnect = send_disconnect; - sca->finish_shutdown_channel_closure.cb = finish_shutdown_channel; - sca->finish_shutdown_channel_closure.cb_arg = sca; - grpc_iomgr_add_callback(&sca->finish_shutdown_channel_closure); } static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -672,7 +719,7 @@ static void destroy_call_elem(grpc_call_element *elem) { server_unref(chand->server); } -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { @@ -686,6 +733,9 @@ static void init_channel_elem(grpc_channel_element *elem, chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->next = chand->prev = chand; chand->registered_methods = NULL; + chand->connectivity_state = GRPC_CHANNEL_IDLE; + grpc_iomgr_closure_init(&chand->channel_connectivity_changed, + channel_connectivity_changed, chand); } static void destroy_channel_elem(grpc_channel_element *elem) { @@ -716,8 +766,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - server_start_transport_op, - channel_op, + server_start_transport_stream_op, + grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, @@ -831,10 +881,10 @@ void grpc_server_start(grpc_server *server) { } } -grpc_transport_setup_result grpc_server_setup_transport( - grpc_server *s, grpc_transport *transport, - grpc_channel_filter const **extra_filters, size_t num_extra_filters, - grpc_mdctx *mdctx, const grpc_channel_args *args) { +void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, + grpc_channel_filter const **extra_filters, + size_t num_extra_filters, grpc_mdctx *mdctx, + const grpc_channel_args *args) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); @@ -851,7 +901,7 @@ grpc_transport_setup_result grpc_server_setup_transport( gpr_uint32 slots; gpr_uint32 probes; gpr_uint32 max_probes = 0; - grpc_transport_setup_result result; + grpc_transport_op op; for (i = 0; i < s->channel_filter_count; i++) { filters[i] = s->channel_filters[i]; @@ -862,7 +912,9 @@ grpc_transport_setup_result grpc_server_setup_transport( filters[i] = &grpc_connected_channel_filter; for (i = 0; i < s->cq_count; i++) { - grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i])); + memset(&op, 0, sizeof(op)); + op.bind_pollset = grpc_cq_pollset(s->cqs[i]); + grpc_transport_perform_op(transport, &op); } channel = @@ -903,8 +955,8 @@ grpc_transport_setup_result grpc_server_setup_transport( chand->registered_method_max_probes = max_probes; } - result = grpc_connected_channel_bind_transport( - grpc_channel_get_channel_stack(channel), transport); + grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel), + transport); gpr_mu_lock(&s->mu_global); chand->next = &s->root_channel_data; @@ -914,17 +966,23 @@ grpc_transport_setup_result grpc_server_setup_transport( gpr_free(filters); - return result; + GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); + memset(&op, 0, sizeof(op)); + op.set_accept_stream = accept_stream; + op.set_accept_stream_user_data = chand; + op.on_connectivity_state_change = &chand->channel_connectivity_changed; + op.connectivity_state = &chand->connectivity_state; + grpc_transport_perform_op(transport, &op); } void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; requested_call_array requested_calls; - channel_data *c; size_t i; registered_method *rm; shutdown_tag *sdt; + channel_broadcaster broadcaster; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); @@ -940,10 +998,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, return; } - for (c = server->root_channel_data.next; c != &server->root_channel_data; - c = c->next) { - shutdown_channel(c, 1, c->num_calls == 0); - } + channel_broadcaster_init(server, &broadcaster); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); @@ -981,6 +1036,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, for (l = server->listeners; l; l = l->next) { l->destroy(server, l->arg); } + + channel_broadcaster_shutdown(&broadcaster, 1, 0); } void grpc_server_listener_destroy_done(void *s) { @@ -1181,6 +1238,8 @@ static void begin_call(grpc_server *server, call_data *calld, calld->cq_new = rc->cq_for_notification; switch (rc->type) { case BATCH_CALL: + GPR_ASSERT(calld->host != NULL); + GPR_ASSERT(calld->path != NULL); cpstr(&rc->data.batch.details->host, &rc->data.batch.details->host_capacity, calld->host); cpstr(&rc->data.batch.details->method, diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 91a1a2a7f6..2899c6dea3 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -55,10 +55,10 @@ void grpc_server_listener_destroy_done(void *server); /* Setup a transport - creates a channel stack, binds the transport to the server */ -grpc_transport_setup_result grpc_server_setup_transport( - grpc_server *server, grpc_transport *transport, - grpc_channel_filter const **extra_filters, size_t num_extra_filters, - grpc_mdctx *mdctx, const grpc_channel_args *args); +void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, + grpc_channel_filter const **extra_filters, + size_t num_extra_filters, grpc_mdctx *mdctx, + const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 7e49a531df..78c53466b3 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -42,14 +42,13 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> -static grpc_transport_setup_result setup_transport(void *server, - grpc_transport *transport, - grpc_mdctx *mdctx) { +static void setup_transport(void *server, grpc_transport *transport, + grpc_mdctx *mdctx) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; - return grpc_server_setup_transport(server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, - grpc_server_get_channel_args(server)); + grpc_server_setup_transport(server, transport, extra_filters, + GPR_ARRAY_SIZE(extra_filters), mdctx, + grpc_server_get_channel_args(server)); } static void new_transport(void *server, grpc_endpoint *tcp) { @@ -60,9 +59,11 @@ static void new_transport(void *server, grpc_endpoint *tcp) { * (as in server_secure_chttp2.c) needs to add synchronization to avoid this * case. */ - grpc_create_chttp2_transport(setup_transport, server, - grpc_server_get_channel_args(server), tcp, NULL, - 0, grpc_mdctx_create(), 0); + grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_transport *transport = grpc_create_chttp2_transport( + grpc_server_get_channel_args(server), tcp, mdctx, 0); + setup_transport(server, transport, mdctx); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } /* Server callback: start listening on our ports */ |