diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/compression/algorithm.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 438 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.h | 85 | ||||
-rw-r--r-- | src/core/security/client_auth_filter.c | 15 | ||||
-rw-r--r-- | src/core/support/cancellable.c | 157 | ||||
-rw-r--r-- | src/core/support/sync.c | 15 | ||||
-rw-r--r-- | src/core/surface/call.c | 17 | ||||
-rw-r--r-- | src/core/surface/channel.c | 9 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 4 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 10 | ||||
-rw-r--r-- | src/core/surface/server.c | 4 | ||||
-rw-r--r-- | src/core/surface/server_create.c | 3 | ||||
-rw-r--r-- | src/core/transport/stream_op.c | 1 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 1 |
14 files changed, 567 insertions, 196 deletions
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index e426241d0a..0fd028741e 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -37,7 +37,7 @@ int grpc_compression_algorithm_parse(const char* name, grpc_compression_algorithm *algorithm) { - if (strcmp(name, "none") == 0) { + if (strcmp(name, "identity") == 0) { *algorithm = GRPC_COMPRESS_NONE; } else if (strcmp(name, "gzip") == 0) { *algorithm = GRPC_COMPRESS_GZIP; @@ -53,7 +53,7 @@ int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, char **name) { switch (algorithm) { case GRPC_COMPRESS_NONE: - *name = "none"; + *name = "identity"; break; case GRPC_COMPRESS_DEFLATE: *name = "deflate"; diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c new file mode 100644 index 0000000000..db0aef8120 --- /dev/null +++ b/src/core/iomgr/udp_server.c @@ -0,0 +1,438 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SOCKET + +#include "src/core/iomgr/udp_server.h" + +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/un.h> +#include <unistd.h> + +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/support/string.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + +#define INIT_PORT_CAP 2 + +/* one listening port */ +typedef struct { + int fd; + grpc_fd *emfd; + grpc_udp_server *server; + union { + gpr_uint8 untyped[GRPC_MAX_SOCKADDR_SIZE]; + struct sockaddr sockaddr; + struct sockaddr_un un; + } addr; + int addr_len; + grpc_iomgr_closure read_closure; + grpc_iomgr_closure destroyed_closure; + grpc_udp_server_read_cb read_cb; +} server_port; + +static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { + struct stat st; + + if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { + unlink(un->sun_path); + } +} + +/* the overall server */ +struct grpc_udp_server { + grpc_udp_server_cb cb; + void *cb_arg; + + gpr_mu mu; + gpr_cv cv; + + /* active port count: how many ports are actually still listening */ + size_t active_ports; + /* destroyed port count: how many ports are completely destroyed */ + size_t destroyed_ports; + + /* is this server shutting down? (boolean) */ + int shutdown; + + /* all listening ports */ + server_port *ports; + size_t nports; + size_t port_capacity; + + /* shutdown callback */ + void (*shutdown_complete)(void *); + void *shutdown_complete_arg; + + /* all pollsets interested in new connections */ + grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ + size_t pollset_count; +}; + +grpc_udp_server *grpc_udp_server_create(void) { + grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); + gpr_mu_init(&s->mu); + gpr_cv_init(&s->cv); + s->active_ports = 0; + s->destroyed_ports = 0; + s->shutdown = 0; + s->cb = NULL; + s->cb_arg = NULL; + s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->nports = 0; + s->port_capacity = INIT_PORT_CAP; + + return s; +} + +static void finish_shutdown(grpc_udp_server *s) { + s->shutdown_complete(s->shutdown_complete_arg); + + gpr_mu_destroy(&s->mu); + gpr_cv_destroy(&s->cv); + + gpr_free(s->ports); + gpr_free(s); +} + +static void destroyed_port(void *server, int success) { + grpc_udp_server *s = server; + gpr_mu_lock(&s->mu); + s->destroyed_ports++; + if (s->destroyed_ports == s->nports) { + gpr_mu_unlock(&s->mu); + finish_shutdown(s); + } else { + gpr_mu_unlock(&s->mu); + } +} + +static void dont_care_about_shutdown_completion(void *ignored) {} + +/* called when all listening endpoints have been shutdown, so no further + events will be received on them - at this point it's safe to destroy + things */ +static void deactivated_all_ports(grpc_udp_server *s) { + size_t i; + + /* delete ALL the things */ + gpr_mu_lock(&s->mu); + + if (!s->shutdown) { + gpr_mu_unlock(&s->mu); + return; + } + + if (s->nports) { + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + if (sp->addr.sockaddr.sa_family == AF_UNIX) { + unlink_if_unix_domain_socket(&sp->addr.un); + } + sp->destroyed_closure.cb = destroyed_port; + sp->destroyed_closure.cb_arg = s; + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown"); + } + gpr_mu_unlock(&s->mu); + } else { + gpr_mu_unlock(&s->mu); + finish_shutdown(s); + } +} + +void grpc_udp_server_destroy( + grpc_udp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg), + void *shutdown_complete_arg) { + size_t i; + gpr_mu_lock(&s->mu); + + GPR_ASSERT(!s->shutdown); + s->shutdown = 1; + + s->shutdown_complete = shutdown_complete + ? shutdown_complete + : dont_care_about_shutdown_completion; + s->shutdown_complete_arg = shutdown_complete_arg; + + /* shutdown all fd's */ + if (s->active_ports) { + for (i = 0; i < s->nports; i++) { + grpc_fd_shutdown(s->ports[i].emfd); + } + gpr_mu_unlock(&s->mu); + } else { + gpr_mu_unlock(&s->mu); + deactivated_all_ports(s); + } +} + +/* Prepare a recently-created socket for listening. */ +static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + int get_local_ip; + int rc; + + if (fd < 0) { + goto error; + } + + get_local_ip = 1; + rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, + &get_local_ip, sizeof(get_local_ip)); + if (rc == 0 && addr->sa_family == AF_INET6) { + rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, + &get_local_ip, sizeof(get_local_ip)); + } + + if (bind(fd, addr, addr_len) < 0) { + char *addr_str; + grpc_sockaddr_to_string(&addr_str, addr, 0); + gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); + gpr_free(addr_str); + goto error; + } + + sockname_len = sizeof(sockname_temp); + if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) { + goto error; + } + + return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + +error: + if (fd >= 0) { + close(fd); + } + return -1; +} + +/* event manager callback when reads are ready */ +static void on_read(void *arg, int success) { + server_port *sp = arg; + + if (success == 0) { + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_mu_unlock(&sp->server->mu); + deactivated_all_ports(sp->server); + } else { + gpr_mu_unlock(&sp->server->mu); + } + return; + } + + /* Tell the registered callback that data is available to read. */ + GPR_ASSERT(sp->read_cb); + sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg); + + /* Re-arm the notification event so we get another chance to read. */ + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); +} + +static int add_socket_to_server(grpc_udp_server *s, int fd, + const struct sockaddr *addr, int addr_len, + grpc_udp_server_read_cb read_cb) { + server_port *sp; + int port; + char *addr_str; + char *name; + + port = prepare_socket(fd, addr, addr_len); + if (port >= 0) { + grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + gpr_asprintf(&name, "udp-server-listener:%s", addr_str); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb && "must add ports before starting server"); + /* append it to the list under a lock */ + if (s->nports == s->port_capacity) { + s->port_capacity *= 2; + s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + } + sp = &s->ports[s->nports++]; + sp->server = s; + sp->fd = fd; + sp->emfd = grpc_fd_create(fd, name); + memcpy(sp->addr.untyped, addr, addr_len); + sp->addr_len = addr_len; + sp->read_cb = read_cb; + GPR_ASSERT(sp->emfd); + gpr_mu_unlock(&s->mu); + } + + return port; +} + +int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, + int addr_len, grpc_udp_server_read_cb read_cb) { + int allocated_port1 = -1; + int allocated_port2 = -1; + unsigned i; + int fd; + grpc_dualstack_mode dsmode; + struct sockaddr_in6 addr6_v4mapped; + struct sockaddr_in wild4; + struct sockaddr_in6 wild6; + struct sockaddr_in addr4_copy; + struct sockaddr *allocated_addr = NULL; + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + int port; + + if (((struct sockaddr *)addr)->sa_family == AF_UNIX) { + unlink_if_unix_domain_socket(addr); + } + + /* Check if this is a wildcard port, and if so, try to keep the port the same + as some previously created listener. */ + if (grpc_sockaddr_get_port(addr) == 0) { + for (i = 0; i < s->nports; i++) { + sockname_len = sizeof(sockname_temp); + if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, + &sockname_len)) { + port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + if (port > 0) { + allocated_addr = malloc(addr_len); + memcpy(allocated_addr, addr, addr_len); + grpc_sockaddr_set_port(allocated_addr, port); + addr = allocated_addr; + break; + } + } + } + } + + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = (const struct sockaddr *)&addr6_v4mapped; + addr_len = sizeof(addr6_v4mapped); + } + + /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ + if (grpc_sockaddr_is_wildcard(addr, &port)) { + grpc_sockaddr_make_wildcards(port, &wild4, &wild6); + + /* Try listening on IPv6 first. */ + addr = (struct sockaddr *)&wild6; + addr_len = sizeof(wild6); + fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { + goto done; + } + + /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ + if (port == 0 && allocated_port1 > 0) { + grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1); + } + addr = (struct sockaddr *)&wild4; + addr_len = sizeof(wild4); + } + + fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + } + if (dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = (struct sockaddr *)&addr4_copy; + addr_len = sizeof(addr4_copy); + } + allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + +done: + gpr_free(allocated_addr); + return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; +} + +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { + return (index < s->nports) ? s->ports[index].fd : -1; +} + +void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, + size_t pollset_count, + grpc_udp_server_cb new_transport_cb, void *cb_arg) { + size_t i, j; + GPR_ASSERT(new_transport_cb); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb); + GPR_ASSERT(s->active_ports == 0); + s->cb = new_transport_cb; + s->cb_arg = cb_arg; + s->pollsets = pollsets; + for (i = 0; i < s->nports; i++) { + for (j = 0; j < pollset_count; j++) { + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd); + } + s->ports[i].read_closure.cb = on_read; + s->ports[i].read_closure.cb_arg = &s->ports[i]; + grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure); + s->active_ports++; + } + gpr_mu_unlock(&s->mu); +} + +/* TODO(rjshade): Add a test for this method. */ +void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len, + const struct sockaddr *peer_address) { + int rc; + rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address)); + if (rc < 0) { + gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno)); + } +} + +#endif diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h new file mode 100644 index 0000000000..fcc4ba6e97 --- /dev/null +++ b/src/core/iomgr/udp_server.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H +#define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H + +#include "src/core/iomgr/endpoint.h" + +/* Forward decl of grpc_udp_server */ +typedef struct grpc_udp_server grpc_udp_server; + +/* New server callback: ep is the newly connected connection */ +typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep); + +/* Called when data is available to read from the socket. */ +typedef void (*grpc_udp_server_read_cb)(int fd, + grpc_udp_server_cb new_transport_cb, + void *cb_arg); + +/* Create a server, initially not bound to any ports */ +grpc_udp_server *grpc_udp_server_create(void); + +/* Start listening to bound ports */ +void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets, + size_t pollset_count, grpc_udp_server_cb cb, + void *cb_arg); + +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); + +/* Add a port to the server, returning port number on success, or negative + on failure. + + The :: and 0.0.0.0 wildcard addresses are treated identically, accepting + both IPv4 and IPv6 connections, but :: is the preferred style. This usually + creates one socket, but possibly two on systems which support IPv6, + but not dualstack sockets. */ + +/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle + all of the multiple socket port matching logic in one place */ +int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len, + grpc_udp_server_read_cb read_cb); + +void grpc_udp_server_destroy(grpc_udp_server *server, + void (*shutdown_done)(void *shutdown_done_arg), + void *shutdown_done_arg); + +/* Write the contents of buffer to the underlying UDP socket. */ +/* +void grpc_udp_server_write(grpc_udp_server *s, + const char *buffer, + int buf_len, + const struct sockaddr* to); + */ + +#endif /* GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H */ diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 0e699874bc..410852da52 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -75,11 +75,11 @@ typedef struct { grpc_mdstr *status_key; } channel_data; -static void bubble_up_error(grpc_call_element *elem, const char *error_msg) { +static void bubble_up_error(grpc_call_element *elem, grpc_status_code status, + const char *error_msg) { call_data *calld = elem->call_data; gpr_log(GPR_ERROR, "Client side authentication failure: %s", error_msg); - grpc_transport_stream_op_add_cancellation(&calld->op, - GRPC_STATUS_UNAUTHENTICATED); + grpc_transport_stream_op_add_cancellation(&calld->op, status); grpc_call_next_op(elem, &calld->op); } @@ -94,7 +94,8 @@ static void on_credentials_metadata(void *user_data, grpc_metadata_batch *mdb; size_t i; if (status != GRPC_CREDENTIALS_OK) { - bubble_up_error(elem, "Credentials failed to get metadata."); + bubble_up_error(elem, GRPC_STATUS_UNAUTHENTICATED, + "Credentials failed to get metadata."); return; } GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); @@ -154,7 +155,7 @@ static void send_security_metadata(grpc_call_element *elem, if (channel_creds_has_md && call_creds_has_md) { calld->creds = grpc_composite_credentials_create(channel_creds, ctx->creds); if (calld->creds == NULL) { - bubble_up_error(elem, + bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, "Incompatible credentials set on channel and call."); return; } @@ -182,7 +183,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) { char *error_msg; gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", grpc_mdstr_as_c_string(calld->host)); - bubble_up_error(elem, error_msg); + bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg); gpr_free(error_msg); } } @@ -252,7 +253,7 @@ static void auth_start_transport_op(grpc_call_element *elem, gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", call_host); - bubble_up_error(elem, error_msg); + bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg); gpr_free(error_msg); } return; /* early exit */ diff --git a/src/core/support/cancellable.c b/src/core/support/cancellable.c deleted file mode 100644 index 4756f1e125..0000000000 --- a/src/core/support/cancellable.c +++ /dev/null @@ -1,157 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -/* Implementation for gpr_cancellable */ - -#include <grpc/support/atm.h> -#include <grpc/support/sync.h> -#include <grpc/support/time.h> - -void gpr_cancellable_init(gpr_cancellable *c) { - gpr_mu_init(&c->mu); - c->cancelled = 0; - c->waiters.next = &c->waiters; - c->waiters.prev = &c->waiters; - c->waiters.mu = NULL; - c->waiters.cv = NULL; -} - -void gpr_cancellable_destroy(gpr_cancellable *c) { gpr_mu_destroy(&c->mu); } - -int gpr_cancellable_is_cancelled(gpr_cancellable *c) { - return gpr_atm_acq_load(&c->cancelled) != 0; -} - -/* Threads in gpr_cv_cancellable_wait(cv, mu, ..., c) place themselves on a - linked list c->waiters of gpr_cancellable_list_ before waiting on their - condition variables. They check for cancellation while holding *mu. Thus, - to wake a thread from gpr_cv_cancellable_wait(), it suffices to: - - set c->cancelled - - acquire and release *mu - - gpr_cv_broadcast(cv) - - However, gpr_cancellable_cancel() may not use gpr_mu_lock(mu), since the - caller may already hold *mu---a possible deadlock. (If we knew the caller - did not hold *mu, care would still be needed, because c->mu follows *mu in - the locking order, so *mu could not be acquired while holding c->mu---which - is needed to iterate over c->waiters.) - - Therefore, gpr_cancellable_cancel() uses gpr_mu_trylock() rather than - gpr_mu_lock(), and retries until either gpr_mu_trylock() succeeds or the - thread leaves gpr_cv_cancellable_wait() for other reasons. In the first - case, gpr_cancellable_cancel() removes the entry from the waiters list; in - the second, the waiting thread removes itself from the list. - - A one-entry cache of mutexes and condition variables processed is kept to - avoid doing the same work again and again if many threads are blocked in the - same place. However, it's important to broadcast on a condition variable if - the corresponding mutex has been locked successfully, even if the condition - variable has been signalled before. */ - -void gpr_cancellable_cancel(gpr_cancellable *c) { - if (!gpr_cancellable_is_cancelled(c)) { - int failures; - int backoff = 1; - do { - struct gpr_cancellable_list_ *l; - struct gpr_cancellable_list_ *nl; - gpr_mu *omu = 0; /* one-element cache of a processed gpr_mu */ - gpr_cv *ocv = 0; /* one-element cache of a processd gpr_cv */ - gpr_mu_lock(&c->mu); - gpr_atm_rel_store(&c->cancelled, 1); - failures = 0; - for (l = c->waiters.next; l != &c->waiters; l = nl) { - nl = l->next; - if (omu != l->mu) { - omu = l->mu; - if (gpr_mu_trylock(l->mu)) { - gpr_mu_unlock(l->mu); - l->next->prev = l->prev; /* remove *l from list */ - l->prev->next = l->next; - /* allow unconditional dequeue in gpr_cv_cancellable_wait() */ - l->next = l; - l->prev = l; - ocv = 0; /* force broadcast */ - } else { - failures++; - } - } - if (ocv != l->cv) { - ocv = l->cv; - gpr_cv_broadcast(l->cv); - } - } - gpr_mu_unlock(&c->mu); - if (failures != 0) { - if (backoff < 10) { - volatile int i; - for (i = 0; i != (1 << backoff); i++) { - } - backoff++; - } else { - gpr_event ev; - gpr_event_init(&ev); - gpr_event_wait( - &ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000, GPR_TIMESPAN))); - } - } - } while (failures != 0); - } -} - -int gpr_cv_cancellable_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline, - gpr_cancellable *c) { - gpr_int32 timeout; - gpr_mu_lock(&c->mu); - timeout = gpr_cancellable_is_cancelled(c); - if (!timeout) { - struct gpr_cancellable_list_ le; - le.mu = mu; - le.cv = cv; - le.next = c->waiters.next; - le.prev = &c->waiters; - le.next->prev = ≤ - le.prev->next = ≤ - gpr_mu_unlock(&c->mu); - timeout = gpr_cv_wait(cv, mu, abs_deadline); - gpr_mu_lock(&c->mu); - le.next->prev = le.prev; - le.prev->next = le.next; - if (!timeout) { - timeout = gpr_cancellable_is_cancelled(c); - } - } - gpr_mu_unlock(&c->mu); - return timeout; -} diff --git a/src/core/support/sync.c b/src/core/support/sync.c index 856b5adb86..d3cf77faea 100644 --- a/src/core/support/sync.c +++ b/src/core/support/sync.c @@ -94,21 +94,6 @@ void *gpr_event_wait(gpr_event *ev, gpr_timespec abs_deadline) { return result; } -void *gpr_event_cancellable_wait(gpr_event *ev, gpr_timespec abs_deadline, - gpr_cancellable *c) { - void *result = (void *)gpr_atm_acq_load(&ev->state); - if (result == NULL) { - struct sync_array_s *s = hash(ev); - gpr_mu_lock(&s->mu); - do { - result = (void *)gpr_atm_acq_load(&ev->state); - } while (result == NULL && - !gpr_cv_cancellable_wait(&s->cv, &s->mu, abs_deadline, c)); - gpr_mu_unlock(&s->mu); - } - return result; -} - void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); } void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5839d3ac2e..6a1a6cbf30 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -964,7 +964,7 @@ static void call_on_done_recv(void *pc, int success) { next_child_call = child_call->sibling_next; if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call); + grpc_call_cancel(child_call, NULL); GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0); } child_call = next_child_call; @@ -1265,18 +1265,22 @@ void grpc_call_destroy(grpc_call *c) { c->cancel_alarm |= c->have_alarm; cancel = c->read_state != READ_STATE_STREAM_CLOSED; unlock(c); - if (cancel) grpc_call_cancel(c); + if (cancel) grpc_call_cancel(c, NULL); GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1); } -grpc_call_error grpc_call_cancel(grpc_call *call) { - return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled"); +grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { + GPR_ASSERT(!reserved); + return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled", + NULL); } grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, - const char *description) { + const char *description, + void *reserved) { grpc_call_error r; + (void) reserved; lock(c); r = cancel_with_status(c, status, description); unlock(c); @@ -1513,13 +1517,14 @@ static int are_write_flags_valid(gpr_uint32 flags) { } grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag) { + size_t nops, void *tag, void *reserved) { grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; size_t in; size_t out; const grpc_op *op; grpc_ioreq *req; void (*finish_func)(grpc_call *, int, void *) = finish_batch; + GPR_ASSERT(!reserved); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 8692aa3903..308572c634 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -168,7 +168,8 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, gpr_uint32 propagation_mask, grpc_completion_queue *cq, const char *method, const char *host, - gpr_timespec deadline) { + gpr_timespec deadline, void *reserved) { + GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( channel, parent_call, propagation_mask, cq, grpc_mdelem_from_metadata_strings( @@ -182,8 +183,9 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, } void *grpc_channel_register_call(grpc_channel *channel, const char *method, - const char *host) { + const char *host, void *reserved) { registered_call *rc = gpr_malloc(sizeof(registered_call)); + GPR_ASSERT(!reserved); rc->path = grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method, 0)); @@ -200,8 +202,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, grpc_call *grpc_channel_create_registered_call( grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, - gpr_timespec deadline) { + gpr_timespec deadline, void *reserved) { registered_call *rc = registered_call_handle; + GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( channel, parent_call, propagation_mask, completion_queue, GRPC_MDELEM_REF(rc->path), diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 707d615688..82ddfac757 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -155,7 +155,8 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = { - connect to it (trying alternatives as presented) - perform handshakes */ grpc_channel *grpc_insecure_channel_create(const char *target, - const grpc_channel_args *args) { + const grpc_channel_args *args, + void *reserved) { grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; @@ -163,6 +164,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); int n = 0; + GPR_ASSERT(!reserved); /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 36d69cfe5f..378b3f71a1 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -69,8 +69,9 @@ struct grpc_completion_queue { plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; }; -grpc_completion_queue *grpc_completion_queue_create(void) { +grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); + GPR_ASSERT(!reserved); memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); @@ -166,9 +167,11 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, } grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, - gpr_timespec deadline) { + gpr_timespec deadline, + void *reserved) { grpc_event ret; grpc_pollset_worker worker; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -232,11 +235,12 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, } grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, - gpr_timespec deadline) { + gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker worker; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index cd1dc589e1..f883275951 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -761,8 +761,10 @@ static const grpc_channel_filter server_surface_filter = { }; void grpc_server_register_completion_queue(grpc_server *server, - grpc_completion_queue *cq) { + grpc_completion_queue *cq, + void *reserved) { size_t i, n; + GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index 1e26c67693..9237eb5a90 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -36,8 +36,9 @@ #include "src/core/surface/server.h" #include "src/core/channel/compress_filter.h" -grpc_server *grpc_server_create(const grpc_channel_args *args) { +grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { const grpc_channel_filter *filters[] = {&grpc_compress_filter}; + (void) reserved; return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters), args); } diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index a5dfec9d50..0a9669b0ab 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -258,6 +258,7 @@ static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { GPR_ASSERT(storage->md); storage->prev = list->tail; storage->next = NULL; + storage->reserved = NULL; if (list->tail != NULL) { list->tail->next = storage; } else { diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 227320cf2a..37f18b02d9 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -77,6 +77,7 @@ typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; struct grpc_linked_mdelem *prev; + void *reserved; } grpc_linked_mdelem; typedef struct grpc_mdelem_list { |