aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/grpc_context.c3
-rw-r--r--src/core/channel/http_client_filter.c44
-rw-r--r--src/core/channel/http_server_filter.c10
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c16
-rw-r--r--src/core/client_config/subchannel_factory_decorators/add_channel_arg.c (renamed from src/core/census/grpc_context.h)32
-rw-r--r--src/core/client_config/subchannel_factory_decorators/add_channel_arg.h45
-rw-r--r--src/core/client_config/subchannel_factory_decorators/merge_channel_args.c84
-rw-r--r--src/core/client_config/subchannel_factory_decorators/merge_channel_args.h45
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/fd_posix.h1
-rw-r--r--src/core/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/support/stack_lockfree.c12
-rw-r--r--src/core/surface/call.c17
-rw-r--r--src/core/surface/channel.c18
-rw-r--r--src/core/surface/channel_connectivity.c8
-rw-r--r--src/core/surface/server.c11
-rw-r--r--src/core/transport/chttp2/alpn.c3
17 files changed, 298 insertions, 59 deletions
diff --git a/src/core/census/grpc_context.c b/src/core/census/grpc_context.c
index d4243cb246..11f1eb3d5d 100644
--- a/src/core/census/grpc_context.c
+++ b/src/core/census/grpc_context.c
@@ -32,7 +32,8 @@
*/
#include <grpc/census.h>
-#include "src/core/census/grpc_context.h"
+#include <grpc/grpc.h>
+#include "src/core/surface/call.h"
static void grpc_census_context_destroy(void *context) {
census_context_destroy((census_context *)context);
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 91125cb149..48c623d359 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -40,10 +40,12 @@
typedef struct call_data {
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
+ grpc_linked_mdelem authority;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent;
int sent_initial_metadata;
+ int sent_authority;
int got_initial_metadata;
grpc_stream_op_buffer *recv_ops;
@@ -62,6 +64,7 @@ typedef struct channel_data {
grpc_mdelem *scheme;
grpc_mdelem *content_type;
grpc_mdelem *status;
+ grpc_mdelem *default_authority;
/** complete user agent mdelem */
grpc_mdelem *user_agent;
} channel_data;
@@ -100,6 +103,7 @@ static void hc_on_recv(void *user_data, int success) {
static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
/* eat the things we'd like to set ourselves */
if (md->key == channeld->method->key) return NULL;
@@ -107,6 +111,10 @@ static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
if (md->key == channeld->te_trailers->key) return NULL;
if (md->key == channeld->content_type->key) return NULL;
if (md->key == channeld->user_agent->key) return NULL;
+ if (channeld->default_authority &&
+ channeld->default_authority->key == md->key) {
+ calld->sent_authority = 1;
+ }
return md;
}
@@ -130,6 +138,11 @@ static void hc_mutate_op(grpc_call_element *elem,
GRPC_MDELEM_REF(channeld->method));
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
GRPC_MDELEM_REF(channeld->scheme));
+ if (channeld->default_authority && !calld->sent_authority) {
+ grpc_metadata_batch_add_head(
+ &op->data.metadata, &calld->authority,
+ GRPC_MDELEM_REF(channeld->default_authority));
+ }
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers,
GRPC_MDELEM_REF(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
@@ -162,6 +175,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
+ calld->sent_authority = 0;
calld->on_done_recv = NULL;
grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
if (initial_op) hc_mutate_op(elem, initial_op);
@@ -241,8 +255,10 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx,
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ const grpc_channel_args *channel_args,
+ grpc_mdctx *mdctx, int is_first, int is_last) {
+ size_t i;
+
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
@@ -251,17 +267,32 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
path */
GPR_ASSERT(!is_last);
+ channeld->default_authority = NULL;
+ if (channel_args) {
+ for (i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
+ if (channel_args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "%s: must be an string",
+ GRPC_ARG_DEFAULT_AUTHORITY);
+ } else {
+ channeld->default_authority = grpc_mdelem_from_strings(
+ mdctx, ":authority", channel_args->args[i].value.string);
+ }
+ }
+ }
+ }
+
/* initialize members */
channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->scheme =
- grpc_mdelem_from_strings(mdctx, ":scheme", scheme_from_args(args));
+ channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme",
+ scheme_from_args(channel_args));
channeld->content_type =
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200");
channeld->user_agent = grpc_mdelem_from_metadata_strings(
mdctx, grpc_mdstr_from_string(mdctx, "user-agent", 0),
- user_agent_from_args(mdctx, args));
+ user_agent_from_args(mdctx, channel_args));
}
/* Destructor for channel data */
@@ -275,6 +306,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
GRPC_MDELEM_UNREF(channeld->content_type);
GRPC_MDELEM_UNREF(channeld->status);
GRPC_MDELEM_UNREF(channeld->user_agent);
+ if (channeld->default_authority) {
+ GRPC_MDELEM_UNREF(channeld->default_authority);
+ }
}
const grpc_channel_filter grpc_http_client_filter = {
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 9d89eb9bf2..0955ae319a 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -44,6 +44,7 @@ typedef struct call_data {
gpr_uint8 sent_status;
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
+ gpr_uint8 seen_authority;
grpc_linked_mdelem status;
grpc_stream_op_buffer *recv_ops;
@@ -125,6 +126,9 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
calld->seen_path = 1;
return md;
+ } else if (md->key == channeld->authority_key) {
+ calld->seen_authority = 1;
+ return md;
} else if (md->key == channeld->host_key) {
/* translate host to :authority since :authority may be
omitted */
@@ -132,6 +136,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key),
GRPC_MDSTR_REF(md->value));
GRPC_MDELEM_UNREF(md);
+ calld->seen_authority = 1;
return authority;
} else {
return md;
@@ -154,12 +159,15 @@ static void hs_on_recv(void *user_data, int success) {
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
- calld->seen_path) {
+ calld->seen_path && calld->seen_authority) {
/* do nothing */
} else {
if (!calld->seen_path) {
gpr_log(GPR_ERROR, "Missing :path header");
}
+ if (!calld->seen_authority) {
+ gpr_log(GPR_ERROR, "Missing :authority header");
+ }
if (!calld->seen_post) {
gpr_log(GPR_ERROR, "Missing :method header");
}
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index ac401bc4d3..827b1a2be5 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -36,9 +36,11 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
#include "src/core/client_config/lb_policies/pick_first.h"
+#include "src/core/client_config/subchannel_factory_decorators/add_channel_arg.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/support/string.h"
@@ -201,6 +203,9 @@ static grpc_resolver *dns_create(
grpc_subchannel_factory *subchannel_factory) {
dns_resolver *r;
const char *path = uri->path;
+ grpc_arg default_host_arg;
+ char *host;
+ char *port;
if (0 != strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
@@ -209,6 +214,16 @@ static grpc_resolver *dns_create(
if (path[0] == '/') ++path;
+ gpr_split_host_port(path, &host, &port);
+
+ default_host_arg.type = GRPC_ARG_STRING;
+ default_host_arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
+ default_host_arg.value.string = host;
+ subchannel_factory = grpc_subchannel_factory_add_channel_arg(subchannel_factory, &default_host_arg);
+
+ gpr_free(host);
+ gpr_free(port);
+
r = gpr_malloc(sizeof(dns_resolver));
memset(r, 0, sizeof(*r));
gpr_ref_init(&r->refs, 1);
@@ -218,7 +233,6 @@ static grpc_resolver *dns_create(
r->default_port = gpr_strdup(default_port);
r->subchannel_factory = subchannel_factory;
r->lb_policy_factory = lb_policy_factory;
- grpc_subchannel_factory_ref(subchannel_factory);
return &r->base;
}
diff --git a/src/core/census/grpc_context.h b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c
index 4637e7218e..7dc6d99ebe 100644
--- a/src/core/census/grpc_context.h
+++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c
@@ -31,27 +31,13 @@
*
*/
-/* GRPC <--> CENSUS context interface */
-
-#ifndef CENSUS_GRPC_CONTEXT_H
-#define CENSUS_GRPC_CONTEXT_H
-
-#include <grpc/census.h>
-#include "src/core/surface/call.h"
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/* Set census context for the call; Must be called before first call to
- grpc_call_start_batch(). */
-void grpc_census_call_set_context(grpc_call *call, census_context *context);
-
-/* Retrieve the calls current census context. */
-census_context *grpc_census_call_get_context(grpc_call *call);
-
-#ifdef __cplusplus
+#include "src/core/client_config/subchannel_factory_decorators/add_channel_arg.h"
+#include "src/core/client_config/subchannel_factory_decorators/merge_channel_args.h"
+
+grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg(
+ grpc_subchannel_factory *input, const grpc_arg *arg) {
+ grpc_channel_args args;
+ args.num_args = 1;
+ args.args = (grpc_arg *)arg;
+ return grpc_subchannel_factory_merge_channel_args(input, &args);
}
-#endif
-
-#endif /* CENSUS_GRPC_CONTEXT_H */
diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h
new file mode 100644
index 0000000000..1937623374
--- /dev/null
+++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H
+#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H
+
+#include "src/core/client_config/subchannel_factory.h"
+
+/** Takes a subchannel factory, returns a new one that mutates incoming
+ channel_args by adding a new argument; ownership of input, arg is retained
+ by the caller. */
+grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg(
+ grpc_subchannel_factory *input, const grpc_arg *arg);
+
+#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H */
diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
new file mode 100644
index 0000000000..7e028857ac
--- /dev/null
+++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/client_config/subchannel_factory_decorators/merge_channel_args.h"
+#include <grpc/support/alloc.h>
+#include "src/core/channel/channel_args.h"
+
+typedef struct {
+ grpc_subchannel_factory base;
+ gpr_refcount refs;
+ grpc_subchannel_factory *wrapped;
+ grpc_channel_args *merge_args;
+} merge_args_factory;
+
+static void merge_args_factory_ref(grpc_subchannel_factory *scf) {
+ merge_args_factory *f = (merge_args_factory *)scf;
+ gpr_ref(&f->refs);
+}
+
+static void merge_args_factory_unref(grpc_subchannel_factory *scf) {
+ merge_args_factory *f = (merge_args_factory *)scf;
+ if (gpr_unref(&f->refs)) {
+ grpc_subchannel_factory_unref(f->wrapped);
+ grpc_channel_args_destroy(f->merge_args);
+ gpr_free(f);
+ }
+}
+
+static grpc_subchannel *merge_args_factory_create_subchannel(
+ grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
+ merge_args_factory *f = (merge_args_factory *)scf;
+ grpc_channel_args *final_args =
+ grpc_channel_args_merge(args->args, f->merge_args);
+ grpc_subchannel *s;
+ args->args = final_args;
+ s = grpc_subchannel_factory_create_subchannel(f->wrapped, args);
+ grpc_channel_args_destroy(final_args);
+ return s;
+}
+
+static const grpc_subchannel_factory_vtable merge_args_factory_vtable = {
+ merge_args_factory_ref, merge_args_factory_unref,
+ merge_args_factory_create_subchannel};
+
+grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args(
+ grpc_subchannel_factory *input, const grpc_channel_args *args) {
+ merge_args_factory *f = gpr_malloc(sizeof(*f));
+ f->base.vtable = &merge_args_factory_vtable;
+ gpr_ref_init(&f->refs, 1);
+ grpc_subchannel_factory_ref(input);
+ f->wrapped = input;
+ f->merge_args = grpc_channel_args_copy(args);
+ return &f->base;
+}
diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h
new file mode 100644
index 0000000000..73a03b752f
--- /dev/null
+++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H
+#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H
+
+#include "src/core/client_config/subchannel_factory.h"
+
+/** Takes a subchannel factory, returns a new one that mutates incoming
+ channel_args by adding a new argument; ownership of input, args is retained
+ by the caller. */
+grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args(
+ grpc_subchannel_factory *input, const grpc_channel_args *args);
+
+#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H */
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 6ad377ce1c..a2df838d4a 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -102,6 +102,7 @@ static grpc_fd *alloc_fd(int fd) {
r->freelist_next = NULL;
r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL;
+ r->closed = 0;
return r;
}
@@ -209,6 +210,8 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
if (!has_watchers(fd)) {
+ GPR_ASSERT(!fd->closed);
+ fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure) {
grpc_iomgr_add_callback(fd->on_done_closure);
@@ -426,7 +429,8 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
- if (grpc_fd_is_orphaned(fd) && !has_watchers(fd)) {
+ if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
+ fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure != NULL) {
grpc_iomgr_add_callback(fd->on_done_closure);
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 94d0019fa4..4e8e267ffd 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -60,6 +60,7 @@ struct grpc_fd {
gpr_mu set_state_mu;
gpr_atm shutdown;
+ int closed;
/* The watcher list.
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 8538600112..6399aaadb9 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -142,6 +142,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
static void finish_shutdown(grpc_tcp_server *s) {
s->shutdown_complete(s->shutdown_complete_arg);
+ s->shutdown_complete = NULL;
gpr_mu_destroy(&s->mu);
@@ -157,6 +158,7 @@ static void destroyed_port(void *server, int success) {
gpr_mu_unlock(&s->mu);
finish_shutdown(s);
} else {
+ GPR_ASSERT(s->destroyed_ports < s->nports);
gpr_mu_unlock(&s->mu);
}
}
diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c
index f24e272207..bc741f8c70 100644
--- a/src/core/support/stack_lockfree.c
+++ b/src/core/support/stack_lockfree.c
@@ -95,6 +95,8 @@ gpr_stack_lockfree *gpr_stack_lockfree_create(int entries) {
memset(&stack->pushed, 0, sizeof(stack->pushed));
#endif
+ GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents));
+
/* Point the head at reserved dummy entry */
stack->head.contents.index = INVALID_ENTRY_INDEX;
return stack;
@@ -108,11 +110,15 @@ void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) {
int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) {
lockfree_node head;
lockfree_node newhead;
+ lockfree_node curent;
+ lockfree_node newent;
/* First fill in the entry's index and aba ctr for new head */
newhead.contents.index = (gpr_uint16)entry;
/* Also post-increment the aba_ctr */
- newhead.contents.aba_ctr = stack->entries[entry].contents.aba_ctr++;
+ curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
+ newhead.contents.aba_ctr = ++curent.contents.aba_ctr;
+ gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm);
#ifndef NDEBUG
/* Check for double push */
@@ -131,7 +137,9 @@ int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) {
/* Atomically get the existing head value for use */
head.atm = gpr_atm_no_barrier_load(&(stack->head.atm));
/* Point to it */
- stack->entries[entry].contents.index = head.contents.index;
+ newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
+ newent.contents.index = head.contents.index;
+ gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm);
} while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm));
/* Use rel_cas above to make sure that entry index is set properly */
return head.contents.index == INVALID_ENTRY_INDEX;
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 12378f0066..327a096ffb 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -40,7 +40,6 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/census/grpc_context.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h"
@@ -348,7 +347,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
- if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) {
+ if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
+ 0) {
set_deadline_alarm(call, send_deadline);
}
return call;
@@ -1283,8 +1283,9 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
- grpc_alarm_init(&call->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), call_alarm, call,
- gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_alarm_init(&call->alarm,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC));
}
/* we offset status by a small amount when storing it into transport metadata
@@ -1319,15 +1320,17 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
grpc_compression_algorithm algorithm;
void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
- algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ algorithm =
+ ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
assert(0);
}
- grpc_mdelem_set_user_data(md, destroy_compression,
- (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
+ grpc_mdelem_set_user_data(
+ md, destroy_compression,
+ (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
}
return algorithm;
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 583d350128..81f673f856 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -149,14 +149,17 @@ static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
grpc_mdelem *send_metadata[2];
+ int num_metadata = 0;
GPR_ASSERT(channel->is_client);
- send_metadata[0] = path_mdelem;
- send_metadata[1] = authority_mdelem;
+ send_metadata[num_metadata++] = path_mdelem;
+ if (authority_mdelem != NULL) {
+ send_metadata[num_metadata++] = authority_mdelem;
+ }
return grpc_call_create(channel, cq, NULL, send_metadata,
- GPR_ARRAY_SIZE(send_metadata), deadline);
+ num_metadata, deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
@@ -168,9 +171,10 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0)),
+ host ?
grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string),
- grpc_mdstr_from_string(channel->metadata_context, host, 0)),
+ grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL,
deadline);
}
@@ -180,9 +184,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
rc->path = grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0));
- rc->authority = grpc_mdelem_from_metadata_strings(
+ rc->authority = host ? grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string),
- grpc_mdstr_from_string(channel->metadata_context, host, 0));
+ grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL;
gpr_mu_lock(&channel->registered_call_mu);
rc->next = channel->registered_calls;
channel->registered_calls = rc;
@@ -196,7 +200,7 @@ grpc_call *grpc_channel_create_registered_call(
registered_call *rc = registered_call_handle;
return grpc_channel_create_call_internal(
channel, completion_queue, GRPC_MDELEM_REF(rc->path),
- GRPC_MDELEM_REF(rc->authority), deadline);
+ rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index b6ea86d730..1223706457 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -70,7 +70,6 @@ typedef struct {
grpc_iomgr_closure on_complete;
grpc_alarm alarm;
grpc_connectivity_state state;
- grpc_connectivity_state *optional_new_state;
grpc_completion_queue *cq;
grpc_cq_completion completion_storage;
grpc_channel *channel;
@@ -124,9 +123,6 @@ static void partly_done(state_watcher *w, int due_to_completion) {
switch (w->phase) {
case WAITING:
w->phase = CALLING_BACK;
- if (w->optional_new_state) {
- *w->optional_new_state = w->state;
- }
grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w,
&w->completion_storage);
break;
@@ -154,8 +150,7 @@ static void timeout_complete(void *pw, int success) { partly_done(pw, 0); }
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
- grpc_connectivity_state *optional_new_state, gpr_timespec deadline,
- grpc_completion_queue *cq, void *tag) {
+ gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
state_watcher *w = gpr_malloc(sizeof(*w));
@@ -167,7 +162,6 @@ void grpc_channel_watch_connectivity_state(
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
- w->optional_new_state = optional_new_state;
w->cq = cq;
w->tag = tag;
w->channel = channel;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index f19bcbd090..c370a9b8ab 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -554,8 +554,10 @@ static void server_on_recv(void *ptr, int success) {
gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
calld->deadline = op->data.metadata.deadline;
}
- calld->got_initial_metadata = 1;
- start_new_rpc(elem);
+ if (calld->host && calld->path) {
+ calld->got_initial_metadata = 1;
+ start_new_rpc(elem);
+ }
break;
}
}
@@ -1271,6 +1273,8 @@ static void done_request_event(void *req, grpc_cq_completion *c) {
} else {
gpr_free(req);
}
+
+ server_unref(server);
}
static void fail_call(grpc_server *server, requested_call *rc) {
@@ -1283,6 +1287,7 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc->data.registered.initial_metadata->count = 0;
break;
}
+ server_ref(server);
grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
&rc->completion);
}
@@ -1293,6 +1298,8 @@ static void publish_registered_or_batch(grpc_call *call, int success,
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
requested_call *rc = prc;
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ server_ref(chand->server);
grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
&rc->completion);
GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
diff --git a/src/core/transport/chttp2/alpn.c b/src/core/transport/chttp2/alpn.c
index 3ccd5796ba..69da4e6718 100644
--- a/src/core/transport/chttp2/alpn.c
+++ b/src/core/transport/chttp2/alpn.c
@@ -36,8 +36,7 @@
#include <grpc/support/useful.h>
/* in order of preference */
-static const char *const supported_versions[] = {"h2", "h2-17", "h2-16",
- "h2-15", "h2-14"};
+static const char *const supported_versions[] = {"h2"};
int grpc_chttp2_is_alpn_version_supported(const char *version, size_t size) {
size_t i;