aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/compression/algorithm.c4
-rw-r--r--src/core/iomgr/udp_server.c438
-rw-r--r--src/core/iomgr/udp_server.h85
-rw-r--r--src/core/security/client_auth_filter.c15
-rw-r--r--src/core/support/cancellable.c157
-rw-r--r--src/core/support/sync.c15
-rw-r--r--src/core/surface/call.c17
-rw-r--r--src/core/surface/channel.c9
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/completion_queue.c10
-rw-r--r--src/core/surface/server.c4
-rw-r--r--src/core/surface/server_create.c3
-rw-r--r--src/core/transport/stream_op.c1
-rw-r--r--src/core/transport/stream_op.h1
14 files changed, 567 insertions, 196 deletions
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c
index e426241d0a..0fd028741e 100644
--- a/src/core/compression/algorithm.c
+++ b/src/core/compression/algorithm.c
@@ -37,7 +37,7 @@
int grpc_compression_algorithm_parse(const char* name,
grpc_compression_algorithm *algorithm) {
- if (strcmp(name, "none") == 0) {
+ if (strcmp(name, "identity") == 0) {
*algorithm = GRPC_COMPRESS_NONE;
} else if (strcmp(name, "gzip") == 0) {
*algorithm = GRPC_COMPRESS_GZIP;
@@ -53,7 +53,7 @@ int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name) {
switch (algorithm) {
case GRPC_COMPRESS_NONE:
- *name = "none";
+ *name = "identity";
break;
case GRPC_COMPRESS_DEFLATE:
*name = "deflate";
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
new file mode 100644
index 0000000000..db0aef8120
--- /dev/null
+++ b/src/core/iomgr/udp_server.c
@@ -0,0 +1,438 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SOCKET
+
+#include "src/core/iomgr/udp_server.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/iomgr/socket_utils_posix.h"
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+
+#define INIT_PORT_CAP 2
+
+/* one listening port */
+typedef struct {
+ int fd;
+ grpc_fd *emfd;
+ grpc_udp_server *server;
+ union {
+ gpr_uint8 untyped[GRPC_MAX_SOCKADDR_SIZE];
+ struct sockaddr sockaddr;
+ struct sockaddr_un un;
+ } addr;
+ int addr_len;
+ grpc_iomgr_closure read_closure;
+ grpc_iomgr_closure destroyed_closure;
+ grpc_udp_server_read_cb read_cb;
+} server_port;
+
+static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
+ struct stat st;
+
+ if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
+ unlink(un->sun_path);
+ }
+}
+
+/* the overall server */
+struct grpc_udp_server {
+ grpc_udp_server_cb cb;
+ void *cb_arg;
+
+ gpr_mu mu;
+ gpr_cv cv;
+
+ /* active port count: how many ports are actually still listening */
+ size_t active_ports;
+ /* destroyed port count: how many ports are completely destroyed */
+ size_t destroyed_ports;
+
+ /* is this server shutting down? (boolean) */
+ int shutdown;
+
+ /* all listening ports */
+ server_port *ports;
+ size_t nports;
+ size_t port_capacity;
+
+ /* shutdown callback */
+ void (*shutdown_complete)(void *);
+ void *shutdown_complete_arg;
+
+ /* all pollsets interested in new connections */
+ grpc_pollset **pollsets;
+ /* number of pollsets in the pollsets array */
+ size_t pollset_count;
+};
+
+grpc_udp_server *grpc_udp_server_create(void) {
+ grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server));
+ gpr_mu_init(&s->mu);
+ gpr_cv_init(&s->cv);
+ s->active_ports = 0;
+ s->destroyed_ports = 0;
+ s->shutdown = 0;
+ s->cb = NULL;
+ s->cb_arg = NULL;
+ s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
+ s->nports = 0;
+ s->port_capacity = INIT_PORT_CAP;
+
+ return s;
+}
+
+static void finish_shutdown(grpc_udp_server *s) {
+ s->shutdown_complete(s->shutdown_complete_arg);
+
+ gpr_mu_destroy(&s->mu);
+ gpr_cv_destroy(&s->cv);
+
+ gpr_free(s->ports);
+ gpr_free(s);
+}
+
+static void destroyed_port(void *server, int success) {
+ grpc_udp_server *s = server;
+ gpr_mu_lock(&s->mu);
+ s->destroyed_ports++;
+ if (s->destroyed_ports == s->nports) {
+ gpr_mu_unlock(&s->mu);
+ finish_shutdown(s);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ }
+}
+
+static void dont_care_about_shutdown_completion(void *ignored) {}
+
+/* called when all listening endpoints have been shutdown, so no further
+ events will be received on them - at this point it's safe to destroy
+ things */
+static void deactivated_all_ports(grpc_udp_server *s) {
+ size_t i;
+
+ /* delete ALL the things */
+ gpr_mu_lock(&s->mu);
+
+ if (!s->shutdown) {
+ gpr_mu_unlock(&s->mu);
+ return;
+ }
+
+ if (s->nports) {
+ for (i = 0; i < s->nports; i++) {
+ server_port *sp = &s->ports[i];
+ if (sp->addr.sockaddr.sa_family == AF_UNIX) {
+ unlink_if_unix_domain_socket(&sp->addr.un);
+ }
+ sp->destroyed_closure.cb = destroyed_port;
+ sp->destroyed_closure.cb_arg = s;
+ grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown");
+ }
+ gpr_mu_unlock(&s->mu);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ finish_shutdown(s);
+ }
+}
+
+void grpc_udp_server_destroy(
+ grpc_udp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg),
+ void *shutdown_complete_arg) {
+ size_t i;
+ gpr_mu_lock(&s->mu);
+
+ GPR_ASSERT(!s->shutdown);
+ s->shutdown = 1;
+
+ s->shutdown_complete = shutdown_complete
+ ? shutdown_complete
+ : dont_care_about_shutdown_completion;
+ s->shutdown_complete_arg = shutdown_complete_arg;
+
+ /* shutdown all fd's */
+ if (s->active_ports) {
+ for (i = 0; i < s->nports; i++) {
+ grpc_fd_shutdown(s->ports[i].emfd);
+ }
+ gpr_mu_unlock(&s->mu);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ deactivated_all_ports(s);
+ }
+}
+
+/* Prepare a recently-created socket for listening. */
+static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) {
+ struct sockaddr_storage sockname_temp;
+ socklen_t sockname_len;
+ int get_local_ip;
+ int rc;
+
+ if (fd < 0) {
+ goto error;
+ }
+
+ get_local_ip = 1;
+ rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO,
+ &get_local_ip, sizeof(get_local_ip));
+ if (rc == 0 && addr->sa_family == AF_INET6) {
+ rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO,
+ &get_local_ip, sizeof(get_local_ip));
+ }
+
+ if (bind(fd, addr, addr_len) < 0) {
+ char *addr_str;
+ grpc_sockaddr_to_string(&addr_str, addr, 0);
+ gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
+ gpr_free(addr_str);
+ goto error;
+ }
+
+ sockname_len = sizeof(sockname_temp);
+ if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) {
+ goto error;
+ }
+
+ return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+
+error:
+ if (fd >= 0) {
+ close(fd);
+ }
+ return -1;
+}
+
+/* event manager callback when reads are ready */
+static void on_read(void *arg, int success) {
+ server_port *sp = arg;
+
+ if (success == 0) {
+ gpr_mu_lock(&sp->server->mu);
+ if (0 == --sp->server->active_ports) {
+ gpr_mu_unlock(&sp->server->mu);
+ deactivated_all_ports(sp->server);
+ } else {
+ gpr_mu_unlock(&sp->server->mu);
+ }
+ return;
+ }
+
+ /* Tell the registered callback that data is available to read. */
+ GPR_ASSERT(sp->read_cb);
+ sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg);
+
+ /* Re-arm the notification event so we get another chance to read. */
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+}
+
+static int add_socket_to_server(grpc_udp_server *s, int fd,
+ const struct sockaddr *addr, int addr_len,
+ grpc_udp_server_read_cb read_cb) {
+ server_port *sp;
+ int port;
+ char *addr_str;
+ char *name;
+
+ port = prepare_socket(fd, addr, addr_len);
+ if (port >= 0) {
+ grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
+ gpr_mu_lock(&s->mu);
+ GPR_ASSERT(!s->cb && "must add ports before starting server");
+ /* append it to the list under a lock */
+ if (s->nports == s->port_capacity) {
+ s->port_capacity *= 2;
+ s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
+ }
+ sp = &s->ports[s->nports++];
+ sp->server = s;
+ sp->fd = fd;
+ sp->emfd = grpc_fd_create(fd, name);
+ memcpy(sp->addr.untyped, addr, addr_len);
+ sp->addr_len = addr_len;
+ sp->read_cb = read_cb;
+ GPR_ASSERT(sp->emfd);
+ gpr_mu_unlock(&s->mu);
+ }
+
+ return port;
+}
+
+int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
+ int addr_len, grpc_udp_server_read_cb read_cb) {
+ int allocated_port1 = -1;
+ int allocated_port2 = -1;
+ unsigned i;
+ int fd;
+ grpc_dualstack_mode dsmode;
+ struct sockaddr_in6 addr6_v4mapped;
+ struct sockaddr_in wild4;
+ struct sockaddr_in6 wild6;
+ struct sockaddr_in addr4_copy;
+ struct sockaddr *allocated_addr = NULL;
+ struct sockaddr_storage sockname_temp;
+ socklen_t sockname_len;
+ int port;
+
+ if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
+ unlink_if_unix_domain_socket(addr);
+ }
+
+ /* Check if this is a wildcard port, and if so, try to keep the port the same
+ as some previously created listener. */
+ if (grpc_sockaddr_get_port(addr) == 0) {
+ for (i = 0; i < s->nports; i++) {
+ sockname_len = sizeof(sockname_temp);
+ if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp,
+ &sockname_len)) {
+ port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ if (port > 0) {
+ allocated_addr = malloc(addr_len);
+ memcpy(allocated_addr, addr, addr_len);
+ grpc_sockaddr_set_port(allocated_addr, port);
+ addr = allocated_addr;
+ break;
+ }
+ }
+ }
+ }
+
+ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
+ addr = (const struct sockaddr *)&addr6_v4mapped;
+ addr_len = sizeof(addr6_v4mapped);
+ }
+
+ /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
+ if (grpc_sockaddr_is_wildcard(addr, &port)) {
+ grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
+
+ /* Try listening on IPv6 first. */
+ addr = (struct sockaddr *)&wild6;
+ addr_len = sizeof(wild6);
+ fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
+ allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
+ goto done;
+ }
+
+ /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
+ if (port == 0 && allocated_port1 > 0) {
+ grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1);
+ }
+ addr = (struct sockaddr *)&wild4;
+ addr_len = sizeof(wild4);
+ }
+
+ fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
+ if (fd < 0) {
+ gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
+ }
+ if (dsmode == GRPC_DSMODE_IPV4 &&
+ grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
+ addr = (struct sockaddr *)&addr4_copy;
+ addr_len = sizeof(addr4_copy);
+ }
+ allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+
+done:
+ gpr_free(allocated_addr);
+ return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
+}
+
+int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
+ return (index < s->nports) ? s->ports[index].fd : -1;
+}
+
+void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets,
+ size_t pollset_count,
+ grpc_udp_server_cb new_transport_cb, void *cb_arg) {
+ size_t i, j;
+ GPR_ASSERT(new_transport_cb);
+ gpr_mu_lock(&s->mu);
+ GPR_ASSERT(!s->cb);
+ GPR_ASSERT(s->active_ports == 0);
+ s->cb = new_transport_cb;
+ s->cb_arg = cb_arg;
+ s->pollsets = pollsets;
+ for (i = 0; i < s->nports; i++) {
+ for (j = 0; j < pollset_count; j++) {
+ grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
+ }
+ s->ports[i].read_closure.cb = on_read;
+ s->ports[i].read_closure.cb_arg = &s->ports[i];
+ grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure);
+ s->active_ports++;
+ }
+ gpr_mu_unlock(&s->mu);
+}
+
+/* TODO(rjshade): Add a test for this method. */
+void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len,
+ const struct sockaddr *peer_address) {
+ int rc;
+ rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address));
+ if (rc < 0) {
+ gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno));
+ }
+}
+
+#endif
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
new file mode 100644
index 0000000000..fcc4ba6e97
--- /dev/null
+++ b/src/core/iomgr/udp_server.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H
+#define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H
+
+#include "src/core/iomgr/endpoint.h"
+
+/* Forward decl of grpc_udp_server */
+typedef struct grpc_udp_server grpc_udp_server;
+
+/* New server callback: ep is the newly connected connection */
+typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep);
+
+/* Called when data is available to read from the socket. */
+typedef void (*grpc_udp_server_read_cb)(int fd,
+ grpc_udp_server_cb new_transport_cb,
+ void *cb_arg);
+
+/* Create a server, initially not bound to any ports */
+grpc_udp_server *grpc_udp_server_create(void);
+
+/* Start listening to bound ports */
+void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets,
+ size_t pollset_count, grpc_udp_server_cb cb,
+ void *cb_arg);
+
+int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
+
+/* Add a port to the server, returning port number on success, or negative
+ on failure.
+
+ The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
+ both IPv4 and IPv6 connections, but :: is the preferred style. This usually
+ creates one socket, but possibly two on systems which support IPv6,
+ but not dualstack sockets. */
+
+/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
+ all of the multiple socket port matching logic in one place */
+int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len,
+ grpc_udp_server_read_cb read_cb);
+
+void grpc_udp_server_destroy(grpc_udp_server *server,
+ void (*shutdown_done)(void *shutdown_done_arg),
+ void *shutdown_done_arg);
+
+/* Write the contents of buffer to the underlying UDP socket. */
+/*
+void grpc_udp_server_write(grpc_udp_server *s,
+ const char *buffer,
+ int buf_len,
+ const struct sockaddr* to);
+ */
+
+#endif /* GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H */
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index 0e699874bc..410852da52 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -75,11 +75,11 @@ typedef struct {
grpc_mdstr *status_key;
} channel_data;
-static void bubble_up_error(grpc_call_element *elem, const char *error_msg) {
+static void bubble_up_error(grpc_call_element *elem, grpc_status_code status,
+ const char *error_msg) {
call_data *calld = elem->call_data;
gpr_log(GPR_ERROR, "Client side authentication failure: %s", error_msg);
- grpc_transport_stream_op_add_cancellation(&calld->op,
- GRPC_STATUS_UNAUTHENTICATED);
+ grpc_transport_stream_op_add_cancellation(&calld->op, status);
grpc_call_next_op(elem, &calld->op);
}
@@ -94,7 +94,8 @@ static void on_credentials_metadata(void *user_data,
grpc_metadata_batch *mdb;
size_t i;
if (status != GRPC_CREDENTIALS_OK) {
- bubble_up_error(elem, "Credentials failed to get metadata.");
+ bubble_up_error(elem, GRPC_STATUS_UNAUTHENTICATED,
+ "Credentials failed to get metadata.");
return;
}
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
@@ -154,7 +155,7 @@ static void send_security_metadata(grpc_call_element *elem,
if (channel_creds_has_md && call_creds_has_md) {
calld->creds = grpc_composite_credentials_create(channel_creds, ctx->creds);
if (calld->creds == NULL) {
- bubble_up_error(elem,
+ bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT,
"Incompatible credentials set on channel and call.");
return;
}
@@ -182,7 +183,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
- bubble_up_error(elem, error_msg);
+ bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg);
gpr_free(error_msg);
}
}
@@ -252,7 +253,7 @@ static void auth_start_transport_op(grpc_call_element *elem,
gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.",
call_host);
- bubble_up_error(elem, error_msg);
+ bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg);
gpr_free(error_msg);
}
return; /* early exit */
diff --git a/src/core/support/cancellable.c b/src/core/support/cancellable.c
deleted file mode 100644
index 4756f1e125..0000000000
--- a/src/core/support/cancellable.c
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-/* Implementation for gpr_cancellable */
-
-#include <grpc/support/atm.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-
-void gpr_cancellable_init(gpr_cancellable *c) {
- gpr_mu_init(&c->mu);
- c->cancelled = 0;
- c->waiters.next = &c->waiters;
- c->waiters.prev = &c->waiters;
- c->waiters.mu = NULL;
- c->waiters.cv = NULL;
-}
-
-void gpr_cancellable_destroy(gpr_cancellable *c) { gpr_mu_destroy(&c->mu); }
-
-int gpr_cancellable_is_cancelled(gpr_cancellable *c) {
- return gpr_atm_acq_load(&c->cancelled) != 0;
-}
-
-/* Threads in gpr_cv_cancellable_wait(cv, mu, ..., c) place themselves on a
- linked list c->waiters of gpr_cancellable_list_ before waiting on their
- condition variables. They check for cancellation while holding *mu. Thus,
- to wake a thread from gpr_cv_cancellable_wait(), it suffices to:
- - set c->cancelled
- - acquire and release *mu
- - gpr_cv_broadcast(cv)
-
- However, gpr_cancellable_cancel() may not use gpr_mu_lock(mu), since the
- caller may already hold *mu---a possible deadlock. (If we knew the caller
- did not hold *mu, care would still be needed, because c->mu follows *mu in
- the locking order, so *mu could not be acquired while holding c->mu---which
- is needed to iterate over c->waiters.)
-
- Therefore, gpr_cancellable_cancel() uses gpr_mu_trylock() rather than
- gpr_mu_lock(), and retries until either gpr_mu_trylock() succeeds or the
- thread leaves gpr_cv_cancellable_wait() for other reasons. In the first
- case, gpr_cancellable_cancel() removes the entry from the waiters list; in
- the second, the waiting thread removes itself from the list.
-
- A one-entry cache of mutexes and condition variables processed is kept to
- avoid doing the same work again and again if many threads are blocked in the
- same place. However, it's important to broadcast on a condition variable if
- the corresponding mutex has been locked successfully, even if the condition
- variable has been signalled before. */
-
-void gpr_cancellable_cancel(gpr_cancellable *c) {
- if (!gpr_cancellable_is_cancelled(c)) {
- int failures;
- int backoff = 1;
- do {
- struct gpr_cancellable_list_ *l;
- struct gpr_cancellable_list_ *nl;
- gpr_mu *omu = 0; /* one-element cache of a processed gpr_mu */
- gpr_cv *ocv = 0; /* one-element cache of a processd gpr_cv */
- gpr_mu_lock(&c->mu);
- gpr_atm_rel_store(&c->cancelled, 1);
- failures = 0;
- for (l = c->waiters.next; l != &c->waiters; l = nl) {
- nl = l->next;
- if (omu != l->mu) {
- omu = l->mu;
- if (gpr_mu_trylock(l->mu)) {
- gpr_mu_unlock(l->mu);
- l->next->prev = l->prev; /* remove *l from list */
- l->prev->next = l->next;
- /* allow unconditional dequeue in gpr_cv_cancellable_wait() */
- l->next = l;
- l->prev = l;
- ocv = 0; /* force broadcast */
- } else {
- failures++;
- }
- }
- if (ocv != l->cv) {
- ocv = l->cv;
- gpr_cv_broadcast(l->cv);
- }
- }
- gpr_mu_unlock(&c->mu);
- if (failures != 0) {
- if (backoff < 10) {
- volatile int i;
- for (i = 0; i != (1 << backoff); i++) {
- }
- backoff++;
- } else {
- gpr_event ev;
- gpr_event_init(&ev);
- gpr_event_wait(
- &ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(1000, GPR_TIMESPAN)));
- }
- }
- } while (failures != 0);
- }
-}
-
-int gpr_cv_cancellable_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline,
- gpr_cancellable *c) {
- gpr_int32 timeout;
- gpr_mu_lock(&c->mu);
- timeout = gpr_cancellable_is_cancelled(c);
- if (!timeout) {
- struct gpr_cancellable_list_ le;
- le.mu = mu;
- le.cv = cv;
- le.next = c->waiters.next;
- le.prev = &c->waiters;
- le.next->prev = &le;
- le.prev->next = &le;
- gpr_mu_unlock(&c->mu);
- timeout = gpr_cv_wait(cv, mu, abs_deadline);
- gpr_mu_lock(&c->mu);
- le.next->prev = le.prev;
- le.prev->next = le.next;
- if (!timeout) {
- timeout = gpr_cancellable_is_cancelled(c);
- }
- }
- gpr_mu_unlock(&c->mu);
- return timeout;
-}
diff --git a/src/core/support/sync.c b/src/core/support/sync.c
index 856b5adb86..d3cf77faea 100644
--- a/src/core/support/sync.c
+++ b/src/core/support/sync.c
@@ -94,21 +94,6 @@ void *gpr_event_wait(gpr_event *ev, gpr_timespec abs_deadline) {
return result;
}
-void *gpr_event_cancellable_wait(gpr_event *ev, gpr_timespec abs_deadline,
- gpr_cancellable *c) {
- void *result = (void *)gpr_atm_acq_load(&ev->state);
- if (result == NULL) {
- struct sync_array_s *s = hash(ev);
- gpr_mu_lock(&s->mu);
- do {
- result = (void *)gpr_atm_acq_load(&ev->state);
- } while (result == NULL &&
- !gpr_cv_cancellable_wait(&s->cv, &s->mu, abs_deadline, c));
- gpr_mu_unlock(&s->mu);
- }
- return result;
-}
-
void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); }
void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); }
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5839d3ac2e..6a1a6cbf30 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -964,7 +964,7 @@ static void call_on_done_recv(void *pc, int success) {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
- grpc_call_cancel(child_call);
+ grpc_call_cancel(child_call, NULL);
GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
}
child_call = next_child_call;
@@ -1265,18 +1265,22 @@ void grpc_call_destroy(grpc_call *c) {
c->cancel_alarm |= c->have_alarm;
cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c);
- if (cancel) grpc_call_cancel(c);
+ if (cancel) grpc_call_cancel(c, NULL);
GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
}
-grpc_call_error grpc_call_cancel(grpc_call *call) {
- return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled");
+grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
+ GPR_ASSERT(!reserved);
+ return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled",
+ NULL);
}
grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
- const char *description) {
+ const char *description,
+ void *reserved) {
grpc_call_error r;
+ (void) reserved;
lock(c);
r = cancel_with_status(c, status, description);
unlock(c);
@@ -1513,13 +1517,14 @@ static int are_write_flags_valid(gpr_uint32 flags) {
}
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
- size_t nops, void *tag) {
+ size_t nops, void *tag, void *reserved) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
size_t in;
size_t out;
const grpc_op *op;
grpc_ioreq *req;
void (*finish_func)(grpc_call *, int, void *) = finish_batch;
+ GPR_ASSERT(!reserved);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 8692aa3903..308572c634 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -168,7 +168,8 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const char *method, const char *host,
- gpr_timespec deadline) {
+ gpr_timespec deadline, void *reserved) {
+ GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, cq,
grpc_mdelem_from_metadata_strings(
@@ -182,8 +183,9 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
}
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
- const char *host) {
+ const char *host, void *reserved) {
registered_call *rc = gpr_malloc(sizeof(registered_call));
+ GPR_ASSERT(!reserved);
rc->path = grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0));
@@ -200,8 +202,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
- gpr_timespec deadline) {
+ gpr_timespec deadline, void *reserved) {
registered_call *rc = registered_call_handle;
+ GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, completion_queue,
GRPC_MDELEM_REF(rc->path),
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 707d615688..82ddfac757 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -155,7 +155,8 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- connect to it (trying alternatives as presented)
- perform handshakes */
grpc_channel *grpc_insecure_channel_create(const char *target,
- const grpc_channel_args *args) {
+ const grpc_channel_args *args,
+ void *reserved) {
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
@@ -163,6 +164,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
int n = 0;
+ GPR_ASSERT(!reserved);
/* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 36d69cfe5f..378b3f71a1 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -69,8 +69,9 @@ struct grpc_completion_queue {
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
};
-grpc_completion_queue *grpc_completion_queue_create(void) {
+grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
+ GPR_ASSERT(!reserved);
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
@@ -166,9 +167,11 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
- gpr_timespec deadline) {
+ gpr_timespec deadline,
+ void *reserved) {
grpc_event ret;
grpc_pollset_worker worker;
+ GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -232,11 +235,12 @@ static void del_plucker(grpc_completion_queue *cc, void *tag,
}
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
- gpr_timespec deadline) {
+ gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker worker;
+ GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index cd1dc589e1..f883275951 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -761,8 +761,10 @@ static const grpc_channel_filter server_surface_filter = {
};
void grpc_server_register_completion_queue(grpc_server *server,
- grpc_completion_queue *cq) {
+ grpc_completion_queue *cq,
+ void *reserved) {
size_t i, n;
+ GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index 1e26c67693..9237eb5a90 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -36,8 +36,9 @@
#include "src/core/surface/server.h"
#include "src/core/channel/compress_filter.h"
-grpc_server *grpc_server_create(const grpc_channel_args *args) {
+grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
const grpc_channel_filter *filters[] = {&grpc_compress_filter};
+ (void) reserved;
return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
args);
}
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index a5dfec9d50..0a9669b0ab 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -258,6 +258,7 @@ static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
GPR_ASSERT(storage->md);
storage->prev = list->tail;
storage->next = NULL;
+ storage->reserved = NULL;
if (list->tail != NULL) {
list->tail->next = storage;
} else {
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 227320cf2a..37f18b02d9 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -77,6 +77,7 @@ typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
struct grpc_linked_mdelem *prev;
+ void *reserved;
} grpc_linked_mdelem;
typedef struct grpc_mdelem_list {