aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c33
-rw-r--r--src/core/surface/channel.c33
-rw-r--r--src/core/surface/channel.h3
-rw-r--r--src/core/surface/channel_create.c235
-rw-r--r--src/core/surface/client.c89
-rw-r--r--src/core/surface/client.h41
-rw-r--r--src/core/surface/init.c14
-rw-r--r--src/core/surface/lame_client.c49
-rw-r--r--src/core/surface/secure_channel_create.c254
-rw-r--r--src/core/surface/server.c255
-rw-r--r--src/core/surface/server.h8
-rw-r--r--src/core/surface/server_chttp2.c19
12 files changed, 460 insertions, 573 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index f6aeed856b..fc09137b67 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -262,8 +262,8 @@ struct grpc_call {
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
static void call_on_done_recv(void *call, int success);
static void call_on_done_send(void *call, int success);
-static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
-static void execute_op(grpc_call *call, grpc_transport_op *op);
+static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op);
+static void execute_op(grpc_call *call, grpc_transport_stream_op *op);
static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
@@ -279,8 +279,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
size_t add_initial_metadata_count,
gpr_timespec send_deadline) {
size_t i;
- grpc_transport_op initial_op;
- grpc_transport_op *initial_op_ptr = NULL;
+ grpc_transport_stream_op initial_op;
+ grpc_transport_stream_op *initial_op_ptr = NULL;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
@@ -464,12 +464,11 @@ static int need_more_data(grpc_call *call) {
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
- (call->cancel_with_status != GRPC_STATUS_OK) ||
- call->destroy_called;
+ (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called;
}
static void unlock(grpc_call *call) {
- grpc_transport_op op;
+ grpc_transport_stream_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
int start_op = 0;
@@ -888,7 +887,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
}
}
-static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
+static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
grpc_ioreq_data data;
gpr_uint32 flags;
grpc_metadata_batch mdb;
@@ -1144,7 +1143,7 @@ static void finished_loose_op_allocated(void *alloc, int success) {
gpr_free(args);
}
-static void execute_op(grpc_call *call, grpc_transport_op *op) {
+static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);
@@ -1155,14 +1154,15 @@ static void execute_op(grpc_call *call, grpc_transport_op *op) {
} else {
finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
args->call = call;
- grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args);
+ grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated,
+ args);
op->on_consumed = &args->closure;
}
}
elem = CALL_ELEM_FROM_CALL(call, 0);
op->context = call->context;
- elem->filter->start_transport_op(elem, op);
+ elem->filter->start_transport_stream_op(elem, op);
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -1229,13 +1229,13 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
} else {
gpr_uint32 parsed_clevel_bytes;
if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
- GPR_SLICE_LENGTH(md->value->slice),
- &parsed_clevel_bytes)) {
+ GPR_SLICE_LENGTH(md->value->slice),
+ &parsed_clevel_bytes)) {
/* the following cast is safe, as a gpr_uint32 should be able to hold all
* possible values of the grpc_compression_level enum */
- clevel = (grpc_compression_level) parsed_clevel_bytes;
+ clevel = (grpc_compression_level)parsed_clevel_bytes;
} else {
- clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
+ clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
}
grpc_mdelem_set_user_data(md, destroy_compression,
(void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
@@ -1258,7 +1258,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
- } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) {
+ } else if (key ==
+ grpc_channel_get_compresssion_level_string(call->channel)) {
set_decode_compression_level(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index a3c4dcebc1..f8151c121c 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -39,7 +39,6 @@
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include "src/core/surface/call.h"
-#include "src/core/surface/client.h"
#include "src/core/surface/init.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -94,9 +93,8 @@ grpc_channel *grpc_channel_create_from_filters(
grpc_channel *channel = gpr_malloc(size);
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
- /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if
- * is_client */
- gpr_ref_init(&channel->refs, 1 + is_client);
+ /* decremented by grpc_channel_destroy */
+ gpr_ref_init(&channel->refs, 1);
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_level_string =
@@ -111,8 +109,6 @@ grpc_channel *grpc_channel_create_from_filters(
}
channel->path_string = grpc_mdstr_from_string(mdctx, ":path");
channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority");
- grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context,
- CHANNEL_STACK_FROM_CHANNEL(channel));
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
@@ -133,6 +129,10 @@ grpc_channel *grpc_channel_create_from_filters(
}
}
+ grpc_channel_stack_init(filters, num_filters, channel, args,
+ channel->metadata_context,
+ CHANNEL_STACK_FROM_CHANNEL(channel));
+
return channel;
}
@@ -239,28 +239,16 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
}
void grpc_channel_destroy(grpc_channel *channel) {
- grpc_channel_op op;
+ grpc_transport_op op;
grpc_channel_element *elem;
-
+ memset(&op, 0, sizeof(op));
+ op.disconnect = 1;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
-
- op.type = GRPC_CHANNEL_GOAWAY;
- op.dir = GRPC_CALL_DOWN;
- op.data.goaway.status = GRPC_STATUS_OK;
- op.data.goaway.message = gpr_slice_from_copied_string("Client disconnect");
- elem->filter->channel_op(elem, NULL, &op);
-
- op.type = GRPC_CHANNEL_DISCONNECT;
- op.dir = GRPC_CALL_DOWN;
- elem->filter->channel_op(elem, NULL, &op);
+ elem->filter->start_transport_op(elem, &op);
GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
}
-void grpc_client_channel_closed(grpc_channel_element *elem) {
- GRPC_CHANNEL_INTERNAL_UNREF(CHANNEL_FROM_TOP_ELEM(elem), "closed");
-}
-
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
return CHANNEL_STACK_FROM_CHANNEL(channel);
}
@@ -277,7 +265,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
return channel->grpc_compression_level_string;
}
-
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
return grpc_mdelem_ref(channel->grpc_status_elem[i]);
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 3c04676b43..71f8a55731 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H
#include "src/core/channel/channel_stack.h"
+#include "src/core/client_config/subchannel_factory.h"
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t count,
@@ -57,8 +58,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
-void grpc_client_channel_closed(grpc_channel_element *elem);
-
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index d069a04a9a..e205f0a9f8 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -31,159 +31,120 @@
*
*/
-#include "src/core/iomgr/sockaddr.h"
-
#include <grpc/grpc.h>
#include <stdlib.h>
#include <string.h>
-#include "src/core/channel/census_filter.h"
+#include <grpc/support/alloc.h>
+
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
-#include "src/core/channel/client_setup.h"
-#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/iomgr/endpoint.h"
-#include "src/core/iomgr/resolve_address.h"
+#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/tcp_client.h"
#include "src/core/surface/channel.h"
-#include "src/core/surface/client.h"
-#include "src/core/support/string.h"
#include "src/core/transport/chttp2_transport.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/useful.h>
-
-typedef struct setup setup;
-/* A single setup request (started via initiate) */
typedef struct {
- grpc_client_setup_request *cs_request;
- setup *setup;
- /* Resolved addresses, or null if resolution not yet completed */
- grpc_resolved_addresses *resolved;
- /* which address in resolved should we pick for the next connection attempt */
- size_t resolved_index;
-} request;
-
-/* Global setup logic (may be running many simultaneous setup requests, but
- with only one 'active' */
-struct setup {
- const char *target;
- grpc_transport_setup_callback setup_callback;
- void *setup_user_data;
-};
-
-static int maybe_try_next_resolved(request *r);
-
-static void done(request *r, int was_successful) {
- grpc_client_setup_request_finish(r->cs_request, was_successful);
- if (r->resolved) {
- grpc_resolved_addresses_destroy(r->resolved);
- }
- gpr_free(r);
+ grpc_connector base;
+ gpr_refcount refs;
+
+ grpc_iomgr_closure *notify;
+ grpc_connect_in_args args;
+ grpc_connect_out_args *result;
+} connector;
+
+static void connector_ref(grpc_connector *con) {
+ connector *c = (connector *)con;
+ gpr_ref(&c->refs);
}
-/* connection callback: tcp is either valid, or null on error */
-static void on_connect(void *rp, grpc_endpoint *tcp) {
- request *r = rp;
-
- if (!grpc_client_setup_request_should_continue(r->cs_request, "on_connect")) {
- if (tcp) {
- grpc_endpoint_shutdown(tcp);
- grpc_endpoint_destroy(tcp);
- }
- done(r, 0);
- return;
+static void connector_unref(grpc_connector *con) {
+ connector *c = (connector *)con;
+ if (gpr_unref(&c->refs)) {
+ gpr_free(c);
}
+}
- if (!tcp) {
- if (!maybe_try_next_resolved(r)) {
- done(r, 0);
- return;
- } else {
- return;
- }
- } else if (grpc_client_setup_cb_begin(r->cs_request, "on_connect")) {
- grpc_create_chttp2_transport(
- r->setup->setup_callback, r->setup->setup_user_data,
- grpc_client_setup_get_channel_args(r->cs_request), tcp, NULL, 0,
- grpc_client_setup_get_mdctx(r->cs_request), 1);
- grpc_client_setup_cb_end(r->cs_request, "on_connect");
- done(r, 1);
- return;
+static void connected(void *arg, grpc_endpoint *tcp) {
+ connector *c = arg;
+ grpc_iomgr_closure *notify;
+ if (tcp != NULL) {
+ c->result->transport = grpc_create_chttp2_transport(
+ c->args.channel_args, tcp, c->args.metadata_context, 1);
+ grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
+ GPR_ASSERT(c->result->transport);
+ c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
+ c->result->filters[0] = &grpc_http_client_filter;
+ c->result->num_filters = 1;
} else {
- done(r, 0);
+ memset(c->result, 0, sizeof(*c->result));
}
+ notify = c->notify;
+ c->notify = NULL;
+ grpc_iomgr_add_callback(notify);
}
-/* attempt to connect to the next available resolved address */
-static int maybe_try_next_resolved(request *r) {
- grpc_resolved_address *addr;
- if (!r->resolved) return 0;
- if (r->resolved_index == r->resolved->naddrs) return 0;
- addr = &r->resolved->addrs[r->resolved_index++];
- grpc_tcp_client_connect(
- on_connect, r, grpc_client_setup_get_interested_parties(r->cs_request),
- (struct sockaddr *)&addr->addr, addr->len,
- grpc_client_setup_request_deadline(r->cs_request));
- return 1;
+static void connector_connect(grpc_connector *con,
+ const grpc_connect_in_args *args,
+ grpc_connect_out_args *result,
+ grpc_iomgr_closure *notify) {
+ connector *c = (connector *)con;
+ GPR_ASSERT(c->notify == NULL);
+ GPR_ASSERT(notify->cb);
+ c->notify = notify;
+ c->args = *args;
+ c->result = result;
+ grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
+ args->addr_len, args->deadline);
}
-/* callback for when our target address has been resolved */
-static void on_resolved(void *rp, grpc_resolved_addresses *resolved) {
- request *r = rp;
-
- /* if we're not still the active request, abort */
- if (!grpc_client_setup_request_should_continue(r->cs_request,
- "on_resolved")) {
- if (resolved) {
- grpc_resolved_addresses_destroy(resolved);
- }
- done(r, 0);
- return;
- }
+static const grpc_connector_vtable connector_vtable = {
+ connector_ref, connector_unref, connector_connect};
- if (!resolved) {
- done(r, 0);
- return;
- } else {
- r->resolved = resolved;
- r->resolved_index = 0;
- if (!maybe_try_next_resolved(r)) {
- done(r, 0);
- }
- }
+typedef struct {
+ grpc_subchannel_factory base;
+ gpr_refcount refs;
+ grpc_mdctx *mdctx;
+ grpc_channel_args *merge_args;
+} subchannel_factory;
+
+static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
+ subchannel_factory *f = (subchannel_factory *)scf;
+ gpr_ref(&f->refs);
}
-static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) {
- request *r = gpr_malloc(sizeof(request));
- r->setup = sp;
- r->cs_request = cs_request;
- r->resolved = NULL;
- r->resolved_index = 0;
- /* TODO(klempner): Make grpc_resolve_address respect deadline */
- grpc_resolve_address(r->setup->target, "http", on_resolved, r);
+static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
+ subchannel_factory *f = (subchannel_factory *)scf;
+ if (gpr_unref(&f->refs)) {
+ grpc_channel_args_destroy(f->merge_args);
+ grpc_mdctx_unref(f->mdctx);
+ gpr_free(f);
+ }
}
-static void done_setup(void *sp) {
- setup *s = sp;
- gpr_free((void *)s->target);
- gpr_free(s);
+static grpc_subchannel *subchannel_factory_create_subchannel(
+ grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
+ subchannel_factory *f = (subchannel_factory *)scf;
+ connector *c = gpr_malloc(sizeof(*c));
+ grpc_channel_args *final_args =
+ grpc_channel_args_merge(args->args, f->merge_args);
+ grpc_subchannel *s;
+ memset(c, 0, sizeof(*c));
+ c->base.vtable = &connector_vtable;
+ gpr_ref_init(&c->refs, 1);
+ args->mdctx = f->mdctx;
+ args->args = final_args;
+ s = grpc_subchannel_create(&c->base, args);
+ grpc_connector_unref(&c->base);
+ grpc_channel_args_destroy(final_args);
+ return s;
}
-static grpc_transport_setup_result complete_setup(void *channel_stack,
- grpc_transport *transport,
- grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {
- &grpc_http_client_filter};
- return grpc_client_channel_transport_setup_complete(
- channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
- mdctx);
-}
+static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
+ subchannel_factory_ref, subchannel_factory_unref,
+ subchannel_factory_create_subchannel};
/* Create a client channel:
Asynchronously: - resolve target
@@ -191,28 +152,36 @@ static grpc_transport_setup_result complete_setup(void *channel_stack,
- perform handshakes */
grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_args *args) {
- setup *s = gpr_malloc(sizeof(setup));
- grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
+ grpc_resolver *resolver;
+ subchannel_factory *f;
+ grpc_mdctx *mdctx = grpc_mdctx_create();
int n = 0;
- filters[n++] = &grpc_client_surface_filter;
/* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
} */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
- channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
- s->target = gpr_strdup(target);
- s->setup_callback = complete_setup;
- s->setup_user_data = grpc_channel_get_channel_stack(channel);
+ f = gpr_malloc(sizeof(*f));
+ f->base.vtable = &subchannel_factory_vtable;
+ gpr_ref_init(&f->refs, 1);
+ grpc_mdctx_ref(mdctx);
+ f->mdctx = mdctx;
+ f->merge_args = grpc_channel_args_copy(args);
+ resolver = grpc_resolver_create(target, &f->base);
+ if (!resolver) {
+ return NULL;
+ }
- grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel),
- args, mdctx, initiate_setup, done_setup,
- s);
+ channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
+ grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
+ resolver);
+ GRPC_RESOLVER_UNREF(resolver, "create");
+ grpc_subchannel_factory_unref(&f->base);
return channel;
}
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
deleted file mode 100644
index 8ac4dd1e0e..0000000000
--- a/src/core/surface/client.c
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/surface/client.h"
-
-#include "src/core/surface/call.h"
-#include "src/core/surface/channel.h"
-#include "src/core/support/string.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-typedef struct { void *unused; } call_data;
-
-typedef struct { void *unused; } channel_data;
-
-static void client_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_call_next_op(elem, op);
-}
-
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- switch (op->type) {
- case GRPC_ACCEPT_CALL:
- gpr_log(GPR_ERROR, "Client cannot accept new calls");
- break;
- case GRPC_TRANSPORT_CLOSED:
- grpc_client_channel_closed(elem);
- break;
- case GRPC_TRANSPORT_GOAWAY:
- gpr_slice_unref(op->data.goaway.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_channel_next_op(elem, op);
- }
-}
-
-static void init_call_elem(grpc_call_element *elem,
- const void *transport_server_data,
- grpc_transport_op *initial_op) {}
-
-static void destroy_call_elem(grpc_call_element *elem) {}
-
-static void init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
- GPR_ASSERT(is_first);
- GPR_ASSERT(!is_last);
-}
-
-static void destroy_channel_elem(grpc_channel_element *elem) {}
-
-const grpc_channel_filter grpc_client_surface_filter = {
- client_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "client",
-};
diff --git a/src/core/surface/client.h b/src/core/surface/client.h
deleted file mode 100644
index 9db2ccf3d2..0000000000
--- a/src/core/surface/client.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_SURFACE_CLIENT_H
-#define GRPC_INTERNAL_CORE_SURFACE_CLIENT_H
-
-#include "src/core/channel/channel_stack.h"
-
-extern const grpc_channel_filter grpc_client_surface_filter;
-
-#endif /* GRPC_INTERNAL_CORE_SURFACE_CLIENT_H */
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index ca61a38a35..3847ded28c 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -31,9 +31,13 @@
*
*/
+#include <grpc/support/port_platform.h>
+
#include <grpc/census.h>
#include <grpc/grpc.h>
#include "src/core/channel/channel_stack.h"
+#include "src/core/client_config/resolver_registry.h"
+#include "src/core/client_config/resolvers/dns_resolver.h"
#include "src/core/debug/trace.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/profiling/timers.h"
@@ -42,6 +46,10 @@
#include "src/core/surface/surface_trace.h"
#include "src/core/transport/chttp2_transport.h"
+#ifdef GPR_POSIX_SOCKET
+#include "src/core/client_config/resolvers/unix_resolver_posix.h"
+#endif
+
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static int g_initializations;
@@ -56,6 +64,11 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
+ grpc_resolver_registry_init("dns:///");
+ grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create());
+#ifdef GPR_POSIX_SOCKET
+ grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create());
+#endif
grpc_register_tracer("channel", &grpc_trace_channel);
grpc_register_tracer("surface", &grpc_surface_trace);
grpc_register_tracer("http", &grpc_http_trace);
@@ -79,6 +92,7 @@ void grpc_shutdown(void) {
census_shutdown();
grpc_timers_global_destroy();
grpc_tracer_shutdown();
+ grpc_resolver_registry_shutdown();
}
gpr_mu_unlock(&g_init_mu);
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 85e1ab5554..3dd56fe5a9 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -49,16 +49,16 @@ typedef struct {
typedef struct { grpc_mdctx *mdctx; } channel_data;
-static void lame_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+static void lame_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- if (op->send_ops) {
+ if (op->send_ops != NULL) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
op->on_done_send->cb(op->on_done_send->cb_arg, 0);
}
- if (op->recv_ops) {
+ if (op->recv_ops != NULL) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_UNKNOWN, tmp);
@@ -77,36 +77,35 @@ static void lame_start_transport_op(grpc_call_element *elem,
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
}
- if (op->on_consumed) {
+ if (op->on_consumed != NULL) {
op->on_consumed->cb(op->on_consumed->cb_arg, 0);
}
}
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- switch (op->type) {
- case GRPC_CHANNEL_GOAWAY:
- gpr_slice_unref(op->data.goaway.message);
- break;
- case GRPC_CHANNEL_DISCONNECT:
- grpc_client_channel_closed(elem);
- break;
- default:
- break;
+static void lame_start_transport_op(grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ if (op->on_connectivity_state_change) {
+ GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE);
+ *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
+ op->on_connectivity_state_change->cb(
+ op->on_connectivity_state_change->cb_arg, 1);
+ }
+ if (op->on_consumed != NULL) {
+ op->on_consumed->cb(op->on_consumed->cb_arg, 1);
}
}
static void init_call_elem(grpc_call_element *elem,
const void *transport_server_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
if (initial_op) {
- grpc_transport_op_finish_with_failure(initial_op);
+ grpc_transport_stream_op_finish_with_failure(initial_op);
}
}
static void destroy_call_elem(grpc_call_element *elem) {}
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *chand = elem->channel_data;
@@ -118,9 +117,15 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- lame_start_transport_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "lame-client",
+ lame_start_transport_stream_op,
+ lame_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "lame-client",
};
grpc_channel *grpc_lame_client_channel_create(void) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index fae3e4e90a..34ee3f8400 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -31,177 +31,147 @@
*
*/
-#include "src/core/iomgr/sockaddr.h"
-
#include <grpc/grpc.h>
#include <stdlib.h>
#include <string.h>
-#include "src/core/channel/census_filter.h"
+#include <grpc/support/alloc.h>
+
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
-#include "src/core/channel/client_setup.h"
-#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/iomgr/resolve_address.h"
+#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/tcp_client.h"
#include "src/core/security/auth_filters.h"
#include "src/core/security/credentials.h"
#include "src/core/security/secure_transport_setup.h"
-#include "src/core/support/string.h"
#include "src/core/surface/channel.h"
-#include "src/core/surface/client.h"
#include "src/core/transport/chttp2_transport.h"
-#include <grpc/grpc_security.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/useful.h>
#include "src/core/tsi/transport_security_interface.h"
-typedef struct setup setup;
-
-/* A single setup request (started via initiate) */
typedef struct {
- grpc_client_setup_request *cs_request;
- setup *setup;
- /* Resolved addresses, or null if resolution not yet completed. */
- grpc_resolved_addresses *resolved;
- /* which address in resolved should we pick for the next connection attempt */
- size_t resolved_index;
-} request;
+ grpc_connector base;
+ gpr_refcount refs;
-struct setup {
grpc_channel_security_connector *security_connector;
- const char *target;
- grpc_transport_setup_callback setup_callback;
- void *setup_user_data;
-};
-static int maybe_try_next_resolved(request *r);
+ grpc_iomgr_closure *notify;
+ grpc_connect_in_args args;
+ grpc_connect_out_args *result;
+} connector;
-static void done(request *r, int was_successful) {
- grpc_client_setup_request_finish(r->cs_request, was_successful);
- if (r->resolved) {
- grpc_resolved_addresses_destroy(r->resolved);
+static void connector_ref(grpc_connector *con) {
+ connector *c = (connector *)con;
+ gpr_ref(&c->refs);
+}
+
+static void connector_unref(grpc_connector *con) {
+ connector *c = (connector *)con;
+ if (gpr_unref(&c->refs)) {
+ gpr_free(c);
}
- gpr_free(r);
}
-static void on_secure_transport_setup_done(void *rp,
+static void on_secure_transport_setup_done(void *arg,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
- request *r = rp;
+ connector *c = arg;
+ grpc_iomgr_closure *notify;
if (status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
- done(r, 0);
- } else if (grpc_client_setup_cb_begin(r->cs_request,
- "on_secure_transport_setup_done")) {
- grpc_create_chttp2_transport(
- r->setup->setup_callback, r->setup->setup_user_data,
- grpc_client_setup_get_channel_args(r->cs_request), secure_endpoint,
- NULL, 0, grpc_client_setup_get_mdctx(r->cs_request), 1);
- grpc_client_setup_cb_end(r->cs_request, "on_secure_transport_setup_done");
- done(r, 1);
+ memset(c->result, 0, sizeof(*c->result));
} else {
- done(r, 0);
+ c->result->transport = grpc_create_chttp2_transport(
+ c->args.channel_args, secure_endpoint, c->args.metadata_context, 1);
+ grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
+ c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
+ c->result->filters[0] = &grpc_client_auth_filter;
+ c->result->filters[1] = &grpc_http_client_filter;
+ c->result->num_filters = 2;
}
+ notify = c->notify;
+ c->notify = NULL;
+ grpc_iomgr_add_callback(notify);
}
-/* connection callback: tcp is either valid, or null on error */
-static void on_connect(void *rp, grpc_endpoint *tcp) {
- request *r = rp;
-
- if (!grpc_client_setup_request_should_continue(r->cs_request,
- "on_connect.secure")) {
- if (tcp) {
- grpc_endpoint_shutdown(tcp);
- grpc_endpoint_destroy(tcp);
- }
- done(r, 0);
- return;
- }
-
- if (!tcp) {
- if (!maybe_try_next_resolved(r)) {
- done(r, 0);
- return;
- } else {
- return;
- }
+static void connected(void *arg, grpc_endpoint *tcp) {
+ connector *c = arg;
+ grpc_iomgr_closure *notify;
+ if (tcp != NULL) {
+ grpc_setup_secure_transport(&c->security_connector->base, tcp,
+ on_secure_transport_setup_done, c);
} else {
- grpc_setup_secure_transport(&r->setup->security_connector->base, tcp,
- on_secure_transport_setup_done, r);
+ memset(c->result, 0, sizeof(*c->result));
+ notify = c->notify;
+ c->notify = NULL;
+ grpc_iomgr_add_callback(notify);
}
}
-/* attempt to connect to the next available resolved address */
-static int maybe_try_next_resolved(request *r) {
- grpc_resolved_address *addr;
- if (!r->resolved) return 0;
- if (r->resolved_index == r->resolved->naddrs) return 0;
- addr = &r->resolved->addrs[r->resolved_index++];
- grpc_tcp_client_connect(
- on_connect, r, grpc_client_setup_get_interested_parties(r->cs_request),
- (struct sockaddr *)&addr->addr, addr->len,
- grpc_client_setup_request_deadline(r->cs_request));
- return 1;
+static void connector_connect(grpc_connector *con,
+ const grpc_connect_in_args *args,
+ grpc_connect_out_args *result,
+ grpc_iomgr_closure *notify) {
+ connector *c = (connector *)con;
+ GPR_ASSERT(c->notify == NULL);
+ GPR_ASSERT(notify->cb);
+ c->notify = notify;
+ c->args = *args;
+ c->result = result;
+ grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
+ args->addr_len, args->deadline);
}
-/* callback for when our target address has been resolved */
-static void on_resolved(void *rp, grpc_resolved_addresses *resolved) {
- request *r = rp;
+static const grpc_connector_vtable connector_vtable = {
+ connector_ref, connector_unref, connector_connect};
- /* if we're not still the active request, abort */
- if (!grpc_client_setup_request_should_continue(r->cs_request,
- "on_resolved.secure")) {
- if (resolved) {
- grpc_resolved_addresses_destroy(resolved);
- }
- done(r, 0);
- return;
- }
+typedef struct {
+ grpc_subchannel_factory base;
+ gpr_refcount refs;
+ grpc_mdctx *mdctx;
+ grpc_channel_args *merge_args;
+ grpc_channel_security_connector *security_connector;
+} subchannel_factory;
- if (!resolved) {
- done(r, 0);
- return;
- } else {
- r->resolved = resolved;
- r->resolved_index = 0;
- if (!maybe_try_next_resolved(r)) {
- done(r, 0);
- }
- }
+static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
+ subchannel_factory *f = (subchannel_factory *)scf;
+ gpr_ref(&f->refs);
}
-static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) {
- request *r = gpr_malloc(sizeof(request));
- r->setup = sp;
- r->cs_request = cs_request;
- r->resolved = NULL;
- r->resolved_index = 0;
- /* TODO(klempner): Make grpc_resolve_address respect deadline */
- grpc_resolve_address(r->setup->target, "https", on_resolved, r);
+static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
+ subchannel_factory *f = (subchannel_factory *)scf;
+ if (gpr_unref(&f->refs)) {
+ GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
+ "subchannel_factory");
+ grpc_channel_args_destroy(f->merge_args);
+ grpc_mdctx_unref(f->mdctx);
+ gpr_free(f);
+ }
}
-static void done_setup(void *sp) {
- setup *s = sp;
- gpr_free((void *)s->target);
- grpc_security_connector_unref(&s->security_connector->base);
- gpr_free(s);
+static grpc_subchannel *subchannel_factory_create_subchannel(
+ grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
+ subchannel_factory *f = (subchannel_factory *)scf;
+ connector *c = gpr_malloc(sizeof(*c));
+ grpc_channel_args *final_args =
+ grpc_channel_args_merge(args->args, f->merge_args);
+ grpc_subchannel *s;
+ memset(c, 0, sizeof(*c));
+ c->base.vtable = &connector_vtable;
+ c->security_connector = f->security_connector;
+ gpr_ref_init(&c->refs, 1);
+ args->mdctx = f->mdctx;
+ args->args = final_args;
+ s = grpc_subchannel_create(&c->base, args);
+ grpc_connector_unref(&c->base);
+ grpc_channel_args_destroy(final_args);
+ return s;
}
-static grpc_transport_setup_result complete_setup(void *channel_stack,
- grpc_transport *transport,
- grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {
- &grpc_client_auth_filter, &grpc_http_client_filter};
- return grpc_client_channel_transport_setup_complete(
- channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
- mdctx);
-}
+static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
+ subchannel_factory_ref, subchannel_factory_unref,
+ subchannel_factory_create_subchannel};
/* Create a secure client channel:
Asynchronously: - resolve target
@@ -210,13 +180,14 @@ static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
const char *target,
const grpc_channel_args *args) {
- setup *s;
grpc_channel *channel;
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *connector;
grpc_mdctx *mdctx;
+ grpc_resolver *resolver;
+ subchannel_factory *f;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
int n = 0;
@@ -233,30 +204,41 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
}
mdctx = grpc_mdctx_create();
- s = gpr_malloc(sizeof(setup));
connector_arg = grpc_security_connector_to_arg(&connector->base);
args_copy = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
- &connector_arg);
- filters[n++] = &grpc_client_surface_filter;
+ &connector_arg, 1);
/* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
} */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
+
+ f = gpr_malloc(sizeof(*f));
+ f->base.vtable = &subchannel_factory_vtable;
+ gpr_ref_init(&f->refs, 1);
+ grpc_mdctx_ref(mdctx);
+ f->mdctx = mdctx;
+ GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory");
+ f->security_connector = connector;
+ f->merge_args = grpc_channel_args_copy(args_copy);
+ resolver = grpc_resolver_create(target, &f->base);
+ if (!resolver) {
+ return NULL;
+ }
+
channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
+ grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
+ resolver);
+ GRPC_RESOLVER_UNREF(resolver, "create");
+ grpc_subchannel_factory_unref(&f->base);
+ GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create");
+
grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector);
}
- s->target = gpr_strdup(target);
- s->setup_callback = complete_setup;
- s->setup_user_data = grpc_channel_get_channel_stack(channel);
- s->security_connector = connector;
- grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel),
- args, mdctx, initiate_setup, done_setup,
- s);
return channel;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 13ec5bee94..f29d47c17c 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -115,6 +115,7 @@ typedef struct channel_registered_method {
struct channel_data {
grpc_server *server;
size_t num_calls;
+ grpc_connectivity_state connectivity_state;
grpc_channel *channel;
grpc_mdstr *path_key;
grpc_mdstr *authority_key;
@@ -125,6 +126,7 @@ struct channel_data {
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
grpc_iomgr_closure finish_destroy_channel_closure;
+ grpc_iomgr_closure channel_connectivity_changed;
};
typedef struct shutdown_tag {
@@ -149,7 +151,7 @@ struct grpc_server {
before mu_call. This is currently used in shutdown processing
(grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
gpr_mu mu_global; /* mutex for server and channel state */
- gpr_mu mu_call; /* mutex for call-specific state */
+ gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
requested_call_array requested_calls;
@@ -200,18 +202,101 @@ struct call_data {
call_link links[CALL_LIST_COUNT];
};
+typedef struct {
+ grpc_channel **channels;
+ grpc_channel **disconnects;
+ size_t num_channels;
+ size_t num_disconnects;
+} channel_broadcaster;
+
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
-static void shutdown_channel(channel_data *chand, int send_goaway,
- int send_disconnect);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
static void maybe_finish_shutdown(grpc_server *server);
+/* channel broadcaster */
+
+/* assumes server locked */
+static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
+ channel_data *c;
+ size_t count = 0;
+ size_t dc_count = 0;
+ for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
+ count++;
+ if (c->num_calls == 0) {
+ dc_count++;
+ }
+ }
+ cb->num_channels = count;
+ cb->num_disconnects = dc_count;
+ cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
+ cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects);
+ count = 0;
+ dc_count = 0;
+ for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
+ cb->channels[count++] = c->channel;
+ GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
+ if (c->num_calls == 0) {
+ cb->disconnects[dc_count++] = c->channel;
+ GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect");
+ }
+ }
+}
+
+struct shutdown_cleanup_args {
+ grpc_iomgr_closure closure;
+ gpr_slice slice;
+};
+
+static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
+ struct shutdown_cleanup_args *a = arg;
+ gpr_slice_unref(a->slice);
+ gpr_free(a);
+}
+
+static void send_shutdown(grpc_channel *channel, int send_goaway,
+ int send_disconnect) {
+ grpc_transport_op op;
+ struct shutdown_cleanup_args *sc;
+ grpc_channel_element *elem;
+
+ memset(&op, 0, sizeof(op));
+ op.send_goaway = send_goaway;
+ sc = gpr_malloc(sizeof(*sc));
+ sc->slice = gpr_slice_from_copied_string("Server shutdown");
+ op.goaway_message = &sc->slice;
+ op.goaway_status = GRPC_STATUS_OK;
+ op.disconnect = send_disconnect;
+ grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
+ op.on_consumed = &sc->closure;
+
+ elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
+ elem->filter->start_transport_op(elem, &op);
+}
+
+static void channel_broadcaster_shutdown(channel_broadcaster *cb,
+ int send_goaway, int send_disconnect) {
+ size_t i;
+
+ for (i = 0; i < cb->num_channels; i++) {
+ send_shutdown(cb->channels[i], 1, 0);
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
+ }
+ for (i = 0; i < cb->num_disconnects; i++) {
+ send_shutdown(cb->disconnects[i], 0, 1);
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect");
+ }
+ gpr_free(cb->channels);
+ gpr_free(cb->disconnects);
+}
+
+/* call list */
+
static int call_list_join(call_data **root, call_data *call, call_list list) {
GPR_ASSERT(!call->root[list]);
call->root[list] = root;
@@ -456,12 +541,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void decrement_call_count(channel_data *chand) {
+static int decrement_call_count(channel_data *chand) {
+ int disconnect = 0;
chand->num_calls--;
if (0 == chand->num_calls && chand->server->shutdown) {
- shutdown_channel(chand, 0, 1);
+ disconnect = 1;
}
maybe_finish_shutdown(chand->server);
+ return disconnect;
}
static void server_on_recv(void *ptr, int success) {
@@ -469,6 +556,7 @@ static void server_on_recv(void *ptr, int success) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int remove_res;
+ int disconnect = 0;
if (success && !calld->got_initial_metadata) {
size_t i;
@@ -517,16 +605,24 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_unlock(&chand->server->mu_call);
gpr_mu_lock(&chand->server->mu_global);
if (remove_res) {
- decrement_call_count(chand);
+ disconnect = decrement_call_count(chand);
+ if (disconnect) {
+ GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect");
+ }
}
gpr_mu_unlock(&chand->server->mu_global);
+ if (disconnect) {
+ send_shutdown(chand->channel, 0, 1);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect");
+ }
break;
}
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
-static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void server_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
if (op->recv_ops) {
@@ -538,92 +634,43 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
}
}
-static void server_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+static void server_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- channel_data *chand = elem->channel_data;
- grpc_server *server = chand->server;
-
- switch (op->type) {
- case GRPC_ACCEPT_CALL:
- /* create a call */
- grpc_call_create(chand->channel, NULL,
- op->data.accept_call.transport_server_data, NULL, 0,
- gpr_inf_future);
- break;
- case GRPC_TRANSPORT_CLOSED:
- /* if the transport is closed for a server channel, we destroy the
- channel */
- gpr_mu_lock(&server->mu_global);
- server_ref(server);
- destroy_channel(chand);
- gpr_mu_unlock(&server->mu_global);
- server_unref(server);
- break;
- case GRPC_TRANSPORT_GOAWAY:
- gpr_slice_unref(op->data.goaway.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_channel_next_op(elem, op);
- break;
- }
+static void accept_stream(void *cd, grpc_transport *transport,
+ const void *transport_server_data) {
+ channel_data *chand = cd;
+ /* create a call */
+ grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
+ gpr_inf_future);
}
-typedef struct {
- channel_data *chand;
- int send_goaway;
- int send_disconnect;
- grpc_iomgr_closure finish_shutdown_channel_closure;
-} shutdown_channel_args;
-
-static void finish_shutdown_channel(void *p, int success) {
- shutdown_channel_args *sca = p;
- grpc_channel_op op;
-
- if (sca->send_goaway) {
- op.type = GRPC_CHANNEL_GOAWAY;
- op.dir = GRPC_CALL_DOWN;
- op.data.goaway.status = GRPC_STATUS_OK;
- op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- NULL, &op);
- }
- if (sca->send_disconnect) {
- op.type = GRPC_CHANNEL_DISCONNECT;
- op.dir = GRPC_CALL_DOWN;
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- NULL, &op);
+static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
+ channel_data *chand = cd;
+ grpc_server *server = chand->server;
+ if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op.connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(chand->channel), 0),
+ &op);
+ } else {
+ gpr_mu_lock(&server->mu_global);
+ destroy_channel(chand);
+ gpr_mu_unlock(&server->mu_global);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
}
- GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown");
-
- gpr_free(sca);
-}
-
-static void shutdown_channel(channel_data *chand, int send_goaway,
- int send_disconnect) {
- shutdown_channel_args *sca;
- GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown");
- sca = gpr_malloc(sizeof(shutdown_channel_args));
- sca->chand = chand;
- sca->send_goaway = send_goaway;
- sca->send_disconnect = send_disconnect;
- sca->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
- sca->finish_shutdown_channel_closure.cb_arg = sca;
- grpc_iomgr_add_callback(&sca->finish_shutdown_channel_closure);
}
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -672,7 +719,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
server_unref(chand->server);
}
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
@@ -686,6 +733,9 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
+ chand->connectivity_state = GRPC_CHANNEL_IDLE;
+ grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
@@ -716,8 +766,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- server_start_transport_op,
- channel_op,
+ server_start_transport_stream_op,
+ grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
destroy_call_elem,
@@ -831,10 +881,10 @@ void grpc_server_start(grpc_server *server) {
}
}
-grpc_transport_setup_result grpc_server_setup_transport(
- grpc_server *s, grpc_transport *transport,
- grpc_channel_filter const **extra_filters, size_t num_extra_filters,
- grpc_mdctx *mdctx, const grpc_channel_args *args) {
+void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
+ grpc_channel_filter const **extra_filters,
+ size_t num_extra_filters, grpc_mdctx *mdctx,
+ const grpc_channel_args *args) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters =
gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
@@ -851,7 +901,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
gpr_uint32 slots;
gpr_uint32 probes;
gpr_uint32 max_probes = 0;
- grpc_transport_setup_result result;
+ grpc_transport_op op;
for (i = 0; i < s->channel_filter_count; i++) {
filters[i] = s->channel_filters[i];
@@ -862,7 +912,9 @@ grpc_transport_setup_result grpc_server_setup_transport(
filters[i] = &grpc_connected_channel_filter;
for (i = 0; i < s->cq_count; i++) {
- grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
+ memset(&op, 0, sizeof(op));
+ op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
+ grpc_transport_perform_op(transport, &op);
}
channel =
@@ -903,8 +955,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
chand->registered_method_max_probes = max_probes;
}
- result = grpc_connected_channel_bind_transport(
- grpc_channel_get_channel_stack(channel), transport);
+ grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
+ transport);
gpr_mu_lock(&s->mu_global);
chand->next = &s->root_channel_data;
@@ -914,17 +966,23 @@ grpc_transport_setup_result grpc_server_setup_transport(
gpr_free(filters);
- return result;
+ GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
+ memset(&op, 0, sizeof(op));
+ op.set_accept_stream = accept_stream;
+ op.set_accept_stream_user_data = chand;
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op.connectivity_state = &chand->connectivity_state;
+ grpc_transport_perform_op(transport, &op);
}
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
- channel_data *c;
size_t i;
registered_method *rm;
shutdown_tag *sdt;
+ channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
@@ -940,10 +998,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
return;
}
- for (c = server->root_channel_data.next; c != &server->root_channel_data;
- c = c->next) {
- shutdown_channel(c, 1, c->num_calls == 0);
- }
+ channel_broadcaster_init(server, &broadcaster);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
@@ -981,6 +1036,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
}
+
+ channel_broadcaster_shutdown(&broadcaster, 1, 0);
}
void grpc_server_listener_destroy_done(void *s) {
@@ -1181,6 +1238,8 @@ static void begin_call(grpc_server *server, call_data *calld,
calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
case BATCH_CALL:
+ GPR_ASSERT(calld->host != NULL);
+ GPR_ASSERT(calld->path != NULL);
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
cpstr(&rc->data.batch.details->method,
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 91a1a2a7f6..2899c6dea3 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -55,10 +55,10 @@ void grpc_server_listener_destroy_done(void *server);
/* Setup a transport - creates a channel stack, binds the transport to the
server */
-grpc_transport_setup_result grpc_server_setup_transport(
- grpc_server *server, grpc_transport *transport,
- grpc_channel_filter const **extra_filters, size_t num_extra_filters,
- grpc_mdctx *mdctx, const grpc_channel_args *args);
+void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
+ grpc_channel_filter const **extra_filters,
+ size_t num_extra_filters, grpc_mdctx *mdctx,
+ const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 7e49a531df..78c53466b3 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -42,14 +42,13 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
-static grpc_transport_setup_result setup_transport(void *server,
- grpc_transport *transport,
- grpc_mdctx *mdctx) {
+static void setup_transport(void *server, grpc_transport *transport,
+ grpc_mdctx *mdctx) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
- return grpc_server_setup_transport(server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx,
- grpc_server_get_channel_args(server));
+ grpc_server_setup_transport(server, transport, extra_filters,
+ GPR_ARRAY_SIZE(extra_filters), mdctx,
+ grpc_server_get_channel_args(server));
}
static void new_transport(void *server, grpc_endpoint *tcp) {
@@ -60,9 +59,11 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
* (as in server_secure_chttp2.c) needs to add synchronization to avoid this
* case.
*/
- grpc_create_chttp2_transport(setup_transport, server,
- grpc_server_get_channel_args(server), tcp, NULL,
- 0, grpc_mdctx_create(), 0);
+ grpc_mdctx *mdctx = grpc_mdctx_create();
+ grpc_transport *transport = grpc_create_chttp2_transport(
+ grpc_server_get_channel_args(server), tcp, mdctx, 0);
+ setup_transport(server, transport, mdctx);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0);
}
/* Server callback: start listening on our ports */