diff options
Diffstat (limited to 'src/core/endpoint')
-rw-r--r-- | src/core/endpoint/endpoint.c | 49 | ||||
-rw-r--r-- | src/core/endpoint/endpoint.h | 99 | ||||
-rw-r--r-- | src/core/endpoint/resolve_address.c | 195 | ||||
-rw-r--r-- | src/core/endpoint/resolve_address.h | 67 | ||||
-rw-r--r-- | src/core/endpoint/secure_endpoint.c | 335 | ||||
-rw-r--r-- | src/core/endpoint/secure_endpoint.h | 47 | ||||
-rw-r--r-- | src/core/endpoint/socket_utils.c | 105 | ||||
-rw-r--r-- | src/core/endpoint/socket_utils.h | 58 | ||||
-rw-r--r-- | src/core/endpoint/socket_utils_linux.c | 52 | ||||
-rw-r--r-- | src/core/endpoint/socket_utils_posix.c | 61 | ||||
-rw-r--r-- | src/core/endpoint/tcp.c | 570 | ||||
-rw-r--r-- | src/core/endpoint/tcp.h | 55 | ||||
-rw-r--r-- | src/core/endpoint/tcp_client.c | 170 | ||||
-rw-r--r-- | src/core/endpoint/tcp_client.h | 50 | ||||
-rw-r--r-- | src/core/endpoint/tcp_server.c | 282 | ||||
-rw-r--r-- | src/core/endpoint/tcp_server.h | 64 |
16 files changed, 2259 insertions, 0 deletions
diff --git a/src/core/endpoint/endpoint.c b/src/core/endpoint/endpoint.c new file mode 100644 index 0000000000..07353751b0 --- /dev/null +++ b/src/core/endpoint/endpoint.c @@ -0,0 +1,49 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/endpoint.h" + +void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, + void *user_data, gpr_timespec deadline) { + ep->vtable->notify_on_read(ep, cb, user_data, deadline); +} + +grpc_endpoint_write_status grpc_endpoint_write( + grpc_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { + return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline); +} + +void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } + +void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } diff --git a/src/core/endpoint/endpoint.h b/src/core/endpoint/endpoint.h new file mode 100644 index 0000000000..14d9a56d55 --- /dev/null +++ b/src/core/endpoint/endpoint.h @@ -0,0 +1,99 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_ENDPOINT_ENDPOINT_H__ +#define __GRPC_INTERNAL_ENDPOINT_ENDPOINT_H__ + +#include <grpc/support/slice.h> +#include <grpc/support/time.h> + +/* An endpoint caps a streaming channel between two communicating processes. + Examples may be: a tcp socket, <stdin+stdout>, or some shared memory. */ + +typedef struct grpc_endpoint grpc_endpoint; +typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; + +typedef enum grpc_endpoint_cb_status { + GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */ + GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */ + GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */ + GRPC_ENDPOINT_CB_ERROR, /* Call interrupted by socket error */ + GRPC_ENDPOINT_CB_TIMED_OUT /* Call timed out */ +} grpc_endpoint_cb_status; + +typedef enum grpc_endpoint_write_status { + GRPC_ENDPOINT_WRITE_DONE, /* completed immediately, cb won't be called */ + GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */ + GRPC_ENDPOINT_WRITE_ERROR /* write errored out, cb won't be called */ +} grpc_endpoint_write_status; + +typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error); +typedef void (*grpc_endpoint_write_cb)(void *user_data, + grpc_endpoint_cb_status error); + +struct grpc_endpoint_vtable { + void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb, + void *user_data, gpr_timespec deadline); + grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, + size_t nslices, grpc_endpoint_write_cb cb, + void *user_data, gpr_timespec deadline); + void (*shutdown)(grpc_endpoint *ep); + void (*destroy)(grpc_endpoint *ep); +}; + +/* When data is available on the connection, calls the callback with slices. */ +void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, + void *user_data, gpr_timespec deadline); + +/* Write slices out to the socket. + + If the connection is ready for more data after the end of the call, it + returns GRPC_ENDPOINT_WRITE_DONE. + Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the + connection is ready for more data. */ +grpc_endpoint_write_status grpc_endpoint_write( + grpc_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline); + +/* Causes any pending read/write callbacks to run immediately with + GRPC_ENDPOINT_CB_SHUTDOWN status */ +void grpc_endpoint_shutdown(grpc_endpoint *ep); +void grpc_endpoint_destroy(grpc_endpoint *ep); + +struct grpc_endpoint { + const grpc_endpoint_vtable *vtable; +}; + +#endif /* __GRPC_INTERNAL_ENDPOINT_ENDPOINT_H__ */ diff --git a/src/core/endpoint/resolve_address.c b/src/core/endpoint/resolve_address.c new file mode 100644 index 0000000000..aa21954c6d --- /dev/null +++ b/src/core/endpoint/resolve_address.c @@ -0,0 +1,195 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#define _POSIX_SOURCE + +#include "src/core/endpoint/resolve_address.h" + +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> +#include <unistd.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> + +typedef struct { + char *name; + char *default_port; + grpc_resolve_cb cb; + void *arg; +} request; + +static void split_host_port(const char *name, char **host, char **port) { + const char *host_start; + size_t host_len; + const char *port_start; + + *host = NULL; + *port = NULL; + + if (name[0] == '[') { + /* Parse a bracketed host, typically an IPv6 literal. */ + const char *rbracket = strchr(name, ']'); + if (rbracket == NULL) { + /* Unmatched [ */ + return; + } + if (rbracket[1] == '\0') { + /* ]<end> */ + port_start = NULL; + } else if (rbracket[1] == ':') { + /* ]:<port?> */ + port_start = rbracket + 2; + } else { + /* ]<invalid> */ + return; + } + host_start = name + 1; + host_len = rbracket - host_start; + if (memchr(host_start, ':', host_len) == NULL) { + /* Require all bracketed hosts to contain a colon, because a hostname or + IPv4 address should never use brackets. */ + return; + } + } else { + const char *colon = strchr(name, ':'); + if (colon != NULL && strchr(colon + 1, ':') == NULL) { + /* Exactly 1 colon. Split into host:port. */ + host_start = name; + host_len = colon - name; + port_start = colon + 1; + } else { + /* 0 or 2+ colons. Bare hostname or IPv6 litearal. */ + host_start = name; + host_len = strlen(name); + port_start = NULL; + } + } + + /* Allocate return values. */ + *host = gpr_malloc(host_len + 1); + memcpy(*host, host_start, host_len); + (*host)[host_len] = '\0'; + + if (port_start != NULL) { + *port = gpr_strdup(port_start); + } +} + +grpc_resolved_addresses *grpc_blocking_resolve_address( + const char *name, const char *default_port) { + struct addrinfo hints; + struct addrinfo *result = NULL, *resp; + char *host; + char *port; + int s; + size_t i; + grpc_resolved_addresses *addrs = NULL; + + /* parse name, splitting it into host and port parts */ + split_host_port(name, &host, &port); + if (host == NULL) { + gpr_log(GPR_ERROR, "unparseable host:port: '%s'", name); + goto done; + } + if (port == NULL) { + if (default_port == NULL) { + gpr_log(GPR_ERROR, "no port in name '%s'", name); + goto done; + } + port = gpr_strdup(default_port); + } + + /* Call getaddrinfo */ + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */ + hints.ai_socktype = SOCK_STREAM; /* stream socket */ + hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + + s = getaddrinfo(host, port, &hints, &result); + if (s != 0) { + gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s)); + goto done; + } + + /* Success path: set addrs non-NULL, fill it in */ + addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + addrs->naddrs = 0; + for (resp = result; resp != NULL; resp = resp->ai_next) { + addrs->naddrs++; + } + addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address) * addrs->naddrs); + i = 0; + for (resp = result; resp != NULL; resp = resp->ai_next) { + memcpy(&addrs->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); + addrs->addrs[i].len = resp->ai_addrlen; + i++; + } + +done: + gpr_free(host); + gpr_free(port); + if (result) { + freeaddrinfo(result); + } + return addrs; +} + +/* Thread function to asynch-ify grpc_blocking_resolve_address */ +static void do_request(void *rp) { + request *r = rp; + r->cb(r->arg, grpc_blocking_resolve_address(r->name, r->default_port)); + gpr_free(r->name); + gpr_free(r->default_port); + gpr_free(r); +} + +void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { + gpr_free(addrs->addrs); + gpr_free(addrs); +} + +void grpc_resolve_address(const char *name, const char *default_port, + grpc_resolve_cb cb, void *arg) { + request *r = gpr_malloc(sizeof(request)); + gpr_thd_id id; + r->name = gpr_strdup(name); + r->default_port = gpr_strdup(default_port); + r->cb = cb; + r->arg = arg; + gpr_thd_new(&id, do_request, r, NULL); +} diff --git a/src/core/endpoint/resolve_address.h b/src/core/endpoint/resolve_address.h new file mode 100644 index 0000000000..cc32c47cef --- /dev/null +++ b/src/core/endpoint/resolve_address.h @@ -0,0 +1,67 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__ +#define __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__ + +#include <sys/socket.h> + +typedef struct { + struct sockaddr_storage addr; + int len; +} grpc_resolved_address; + +typedef struct { + size_t naddrs; + grpc_resolved_address *addrs; +} grpc_resolved_addresses; + +/* Async result callback: + On success: addresses is the result, and the callee must call + grpc_resolved_addresses_destroy when it's done with them + On failure: addresses is NULL */ +typedef void (*grpc_resolve_cb)(void *arg, grpc_resolved_addresses *addresses); +/* Asynchronously resolve addr. Use default_port if a port isn't designated + in addr, otherwise use the port in addr. */ +/* TODO(ctiller): add a timeout here */ +void grpc_resolve_address(const char *addr, const char *default_port, + grpc_resolve_cb cb, void *arg); +/* Destroy resolved addresses */ +void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); + +/* Resolve addr in a blocking fashion. Returns NULL on failure. On success, + result must be freed with grpc_resolved_addresses_destroy. */ +grpc_resolved_addresses *grpc_blocking_resolve_address( + const char *addr, const char *default_port); + +#endif /* __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__ */ diff --git a/src/core/endpoint/secure_endpoint.c b/src/core/endpoint/secure_endpoint.c new file mode 100644 index 0000000000..4fab0faa03 --- /dev/null +++ b/src/core/endpoint/secure_endpoint.c @@ -0,0 +1,335 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/secure_endpoint.h" +#include "src/core/tsi/transport_security_interface.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> + +#define STAGING_BUFFER_SIZE 8192 + +typedef struct { + grpc_endpoint base; + grpc_endpoint *wrapped_ep; + struct tsi_frame_protector *protector; + gpr_mu protector_mu; + /* saved upper level callbacks and user_data. */ + grpc_endpoint_read_cb read_cb; + void *read_user_data; + /* saved handshaker leftover data to unprotect. */ + gpr_slice_buffer leftover_bytes; + /* buffers for read and write */ + gpr_slice read_staging_buffer; + gpr_slice_buffer input_buffer; + + gpr_slice write_staging_buffer; + gpr_slice_buffer output_buffer; + + gpr_refcount ref; +} secure_endpoint; + +static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); } + +static void destroy(secure_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_destroy(ep->wrapped_ep); + tsi_frame_protector_destroy(ep->protector); + gpr_slice_buffer_destroy(&ep->leftover_bytes); + gpr_slice_unref(ep->read_staging_buffer); + gpr_slice_buffer_destroy(&ep->input_buffer); + gpr_slice_unref(ep->write_staging_buffer); + gpr_slice_buffer_destroy(&ep->output_buffer); + gpr_mu_destroy(&ep->protector_mu); + gpr_free(ep); +} + +static void secure_endpoint_unref(secure_endpoint *ep) { + if (gpr_unref(&ep->ref)) { + destroy(ep); + } +} + +static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, + gpr_uint8 **end) { + gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer); + ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); + *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); + *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); +} + +static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { +#ifdef GRPC_TRACE_SECURE_TRANSPORT + size_t i; + for (i = 0; i < nslices; i++) { + char *data = + gpr_hexdump((char*)GPR_SLICE_START_PTR(slices[i]), + GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "READ %p: %s", ep, data); + gpr_free(data); + } +#endif + ep->read_cb(ep->read_user_data, slices, nslices, error); + secure_endpoint_unref(ep); +} + +static void on_read(void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + int i = 0; + gpr_uint8 keep_looping = 0; + int input_buffer_count = 0; + tsi_result result = TSI_OK; + secure_endpoint *ep = (secure_endpoint *)user_data; + gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); + gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); + + /* TODO(yangg) check error, maybe bail out early */ + for (i = 0; i < nslices; i++) { + gpr_slice encrypted = slices[i]; + gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted); + size_t message_size = GPR_SLICE_LENGTH(encrypted); + + while (message_size > 0 || keep_looping) { + gpr_uint32 unprotected_buffer_size_written = end - cur; + gpr_uint32 processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_unprotect(ep->protector, message_bytes, + &processed_message_size, cur, + &unprotected_buffer_size_written); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Decryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += unprotected_buffer_size_written; + + if (cur == end) { + flush_read_staging_buffer(ep, &cur, &end); + /* Force to enter the loop again to extract buffered bytes in protector. + The bytes could be buffered because of running out of staging_buffer. + If this happens at the end of all slices, doing another unprotect + avoids leaving data in the protector. */ + keep_looping = 1; + } else if (unprotected_buffer_size_written > 0) { + keep_looping = 1; + } else { + keep_looping = 0; + } + } + if (result != TSI_OK) break; + } + + if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) { + gpr_slice_buffer_add( + &ep->input_buffer, + gpr_slice_split_head( + &ep->read_staging_buffer, + cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))); + } + + /* TODO(yangg) experiment with moving this block after read_cb to see if it + helps latency */ + for (i = 0; i < nslices; i++) { + gpr_slice_unref(slices[i]); + } + + if (result != TSI_OK) { + gpr_slice_buffer_reset_and_unref(&ep->input_buffer); + call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR); + return; + } + /* The upper level will unref the slices. */ + input_buffer_count = ep->input_buffer.count; + ep->input_buffer.count = 0; + call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error); +} + +static void notify_on_read(grpc_endpoint *secure_ep, grpc_endpoint_read_cb cb, + void *user_data, gpr_timespec deadline) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + ep->read_cb = cb; + ep->read_user_data = user_data; + + secure_endpoint_ref(ep); + + if (ep->leftover_bytes.count) { + size_t leftover_nslices = ep->leftover_bytes.count; + ep->leftover_bytes.count = 0; + on_read(ep, ep->leftover_bytes.slices, leftover_nslices, + GRPC_ENDPOINT_CB_OK); + return; + } + + grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep, deadline); +} + +static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, + gpr_uint8 **end) { + gpr_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer); + ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); + *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer); + *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); +} + +static grpc_endpoint_write_status write(grpc_endpoint *secure_ep, + gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data, + gpr_timespec deadline) { + int i = 0; + int output_buffer_count = 0; + tsi_result result = TSI_OK; + secure_endpoint *ep = (secure_endpoint *)secure_ep; + gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer); + gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); + GPR_ASSERT(ep->output_buffer.count == 0); + +#ifdef GRPC_TRACE_SECURE_TRANSPORT + for (i = 0; i < nslices; i++) { + char *data = + gpr_hexdump((char*)GPR_SLICE_START_PTR(slices[i]), + GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data); + gpr_free(data); + } +#endif + + for (i = 0; i < nslices; i++) { + gpr_slice plain = slices[i]; + gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain); + size_t message_size = GPR_SLICE_LENGTH(plain); + while (message_size > 0) { + gpr_uint32 protected_buffer_size_to_send = end - cur; + gpr_uint32 processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect(ep->protector, message_bytes, + &processed_message_size, cur, + &protected_buffer_size_to_send); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Encryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += protected_buffer_size_to_send; + + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } + } + if (result != TSI_OK) break; + } + if (result == TSI_OK) { + gpr_uint32 still_pending_size; + do { + gpr_uint32 protected_buffer_size_to_send = end - cur; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect_flush(ep->protector, cur, + &protected_buffer_size_to_send, + &still_pending_size); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) break; + cur += protected_buffer_size_to_send; + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } + } while (still_pending_size > 0); + if (cur != GPR_SLICE_START_PTR(ep->write_staging_buffer)) { + gpr_slice_buffer_add( + &ep->output_buffer, + gpr_slice_split_head( + &ep->write_staging_buffer, + cur - GPR_SLICE_START_PTR(ep->write_staging_buffer))); + } + } + + for (i = 0; i < nslices; i++) { + gpr_slice_unref(slices[i]); + } + + if (result != TSI_OK) { + /* TODO(yangg) do different things according to the error type? */ + gpr_slice_buffer_reset_and_unref(&ep->output_buffer); + return GRPC_ENDPOINT_WRITE_ERROR; + } + + /* clear output_buffer and let the lower level handle its slices. */ + output_buffer_count = ep->output_buffer.count; + ep->output_buffer.count = 0; + return grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices, + output_buffer_count, cb, user_data, deadline); +} + +static void shutdown(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_shutdown(ep->wrapped_ep); +} + +static void unref(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + secure_endpoint_unref(ep); +} + +static const grpc_endpoint_vtable vtable = {notify_on_read, write, shutdown, + unref}; + +grpc_endpoint *grpc_secure_endpoint_create( + struct tsi_frame_protector *protector, grpc_endpoint *transport, + gpr_slice *leftover_slices, size_t leftover_nslices) { + size_t i; + secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint)); + ep->base.vtable = &vtable; + ep->wrapped_ep = transport; + ep->protector = protector; + gpr_slice_buffer_init(&ep->leftover_bytes); + for (i = 0; i < leftover_nslices; i++) { + gpr_slice_buffer_add(&ep->leftover_bytes, + gpr_slice_ref(leftover_slices[i])); + } + ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); + ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); + gpr_slice_buffer_init(&ep->input_buffer); + gpr_slice_buffer_init(&ep->output_buffer); + gpr_mu_init(&ep->protector_mu); + gpr_ref_init(&ep->ref, 1); + return &ep->base; +} diff --git a/src/core/endpoint/secure_endpoint.h b/src/core/endpoint/secure_endpoint.h new file mode 100644 index 0000000000..971170afed --- /dev/null +++ b/src/core/endpoint/secure_endpoint.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_ENDPOINT_SECURE_ENDPOINT_H__ +#define __GRPC_INTERNAL_ENDPOINT_SECURE_ENDPOINT_H__ + +#include <grpc/support/slice.h> +#include "src/core/endpoint/endpoint.h" + +struct tsi_frame_protector; + +/* Takes ownership of protector and to_wrap, and refs leftover_slices. */ +grpc_endpoint *grpc_secure_endpoint_create( + struct tsi_frame_protector *protector, grpc_endpoint *to_wrap, + gpr_slice *leftover_slices, size_t leftover_nslices); + +#endif /* __GRPC_INTERNAL_ENDPOINT_SECURE_ENDPOINT_H__ */ diff --git a/src/core/endpoint/socket_utils.c b/src/core/endpoint/socket_utils.c new file mode 100644 index 0000000000..9c2540bcbf --- /dev/null +++ b/src/core/endpoint/socket_utils.c @@ -0,0 +1,105 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/socket_utils.h" + +#include <limits.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> + +/* set a socket to non blocking mode */ +int grpc_set_socket_nonblocking(int fd, int non_blocking) { + int oldflags = fcntl(fd, F_GETFL, 0); + if (oldflags < 0) { + return 0; + } + + if (non_blocking) { + oldflags |= O_NONBLOCK; + } else { + oldflags &= ~O_NONBLOCK; + } + + if (fcntl(fd, F_SETFL, oldflags) != 0) { + return 0; + } + + return 1; +} + +/* set a socket to close on exec */ +int grpc_set_socket_cloexec(int fd, int close_on_exec) { + int oldflags = fcntl(fd, F_GETFD, 0); + if (oldflags < 0) { + return 0; + } + + if (close_on_exec) { + oldflags |= FD_CLOEXEC; + } else { + oldflags &= ~FD_CLOEXEC; + } + + if (fcntl(fd, F_SETFD, oldflags) != 0) { + return 0; + } + + return 1; +} + +/* set a socket to reuse old addresses */ +int grpc_set_socket_reuse_addr(int fd, int reuse) { + int val = (reuse != 0); + int newval; + socklen_t intlen = sizeof(newval); + return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) && + 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) && + newval == val; +} + +/* disable nagle */ +int grpc_set_socket_low_latency(int fd, int low_latency) { + int val = (low_latency != 0); + int newval; + socklen_t intlen = sizeof(newval); + return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) && + 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) && + newval == val; +} diff --git a/src/core/endpoint/socket_utils.h b/src/core/endpoint/socket_utils.h new file mode 100644 index 0000000000..545d678eab --- /dev/null +++ b/src/core/endpoint/socket_utils.h @@ -0,0 +1,58 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__ +#define __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__ + +#include <unistd.h> +#include <sys/socket.h> + +struct sockaddr; + +/* a wrapper for accept or accept4 */ +int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, + int nonblock, int cloexec); + +/* set a socket to non blocking mode */ +int grpc_set_socket_nonblocking(int fd, int non_blocking); + +/* set a socket to close on exec */ +int grpc_set_socket_cloexec(int fd, int close_on_exec); + +/* set a socket to reuse old addresses */ +int grpc_set_socket_reuse_addr(int fd, int reuse); + +/* disable nagle */ +int grpc_set_socket_low_latency(int fd, int low_latency); + +#endif /* __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__ */ diff --git a/src/core/endpoint/socket_utils_linux.c b/src/core/endpoint/socket_utils_linux.c new file mode 100644 index 0000000000..479675ec7d --- /dev/null +++ b/src/core/endpoint/socket_utils_linux.c @@ -0,0 +1,52 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#define _GNU_SOURCE +#include <grpc/support/port_platform.h> + +#ifdef GPR_LINUX + +#include "src/core/endpoint/socket_utils.h" + +#include <sys/types.h> +#include <sys/socket.h> + +int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, + int nonblock, int cloexec) { + int flags = 0; + flags |= nonblock ? SOCK_NONBLOCK : 0; + flags |= cloexec ? SOCK_CLOEXEC : 0; + return accept4(sockfd, addr, addrlen, flags); +} + +#endif diff --git a/src/core/endpoint/socket_utils_posix.c b/src/core/endpoint/socket_utils_posix.c new file mode 100644 index 0000000000..262d606af9 --- /dev/null +++ b/src/core/endpoint/socket_utils_posix.c @@ -0,0 +1,61 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SOCKETUTILS + +#define _BSD_SOURCE +#include "src/core/endpoint/socket_utils.h" + +#include <fcntl.h> +#include <sys/socket.h> +#include <unistd.h> + +#include <grpc/support/log.h> + +int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, + int nonblock, int cloexec) { + int fd, flags; + + fd = accept(sockfd, addr, addrlen); + if (fd >= 0) { + flags = fcntl(fd, F_GETFL, 0); + flags |= nonblock ? O_NONBLOCK : 0; + flags |= cloexec ? FD_CLOEXEC : 0; + GPR_ASSERT(fcntl(fd, F_SETFL, flags) == 0); + } + return fd; +} + +#endif /* GPR_POSIX_SOCKETUTILS */ diff --git a/src/core/endpoint/tcp.c b/src/core/endpoint/tcp.c new file mode 100644 index 0000000000..39367e80f6 --- /dev/null +++ b/src/core/endpoint/tcp.c @@ -0,0 +1,570 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/tcp.h" + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> + +#include "src/core/eventmanager/em.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +/* Holds a slice array and associated state. */ +typedef struct grpc_tcp_slice_state { + gpr_slice *slices; /* Array of slices */ + size_t nslices; /* Size of slices array. */ + ssize_t first_slice; /* First valid slice in array */ + ssize_t last_slice; /* Last valid slice in array */ + gpr_slice working_slice; /* pointer to original final slice */ + int working_slice_valid; /* True if there is a working slice */ + int memory_owned; /* True if slices array is owned */ +} grpc_tcp_slice_state; + +static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices, + size_t nslices, size_t valid_slices) { + state->slices = slices; + state->nslices = nslices; + if (valid_slices == 0) { + state->first_slice = -1; + } else { + state->first_slice = 0; + } + state->last_slice = valid_slices - 1; + state->working_slice_valid = 0; + state->memory_owned = 0; +} + +/* Returns true if there is still available data */ +static int slice_state_has_available(grpc_tcp_slice_state *state) { + return state->first_slice != -1 && state->last_slice >= state->first_slice; +} + +static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) { + if (state->first_slice == -1) { + return 0; + } else { + return state->last_slice - state->first_slice + 1; + } +} + +static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) { + /* TODO(klempner): use realloc instead when first_slice is 0 */ + /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */ + gpr_slice *slices = state->slices; + size_t original_size = slice_state_slices_allocated(state); + size_t i; + gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size); + + for (i = 0; i < original_size; ++i) { + new_slices[i] = slices[i + state->first_slice]; + } + + state->slices = new_slices; + state->last_slice = original_size - 1; + if (original_size > 0) { + state->first_slice = 0; + } else { + state->first_slice = -1; + } + state->nslices = new_size; + + if (state->memory_owned) { + gpr_free(slices); + } + state->memory_owned = 1; +} + +static void slice_state_remove_prefix(grpc_tcp_slice_state *state, + size_t prefix_bytes) { + gpr_slice *current_slice = &state->slices[state->first_slice]; + size_t current_slice_size; + + while (slice_state_has_available(state)) { + current_slice_size = GPR_SLICE_LENGTH(*current_slice); + if (current_slice_size > prefix_bytes) { + /* TODO(klempner): Get rid of the extra refcount created here by adding a + native "trim the first N bytes" operation to splice */ + /* TODO(klempner): This really shouldn't be modifying the current slice + unless we own the slices array. */ + *current_slice = gpr_slice_split_tail(current_slice, prefix_bytes); + gpr_slice_unref(*current_slice); + return; + } else { + gpr_slice_unref(*current_slice); + ++state->first_slice; + ++current_slice; + prefix_bytes -= current_slice_size; + } + } +} + +static void slice_state_destroy(grpc_tcp_slice_state *state) { + while (slice_state_has_available(state)) { + gpr_slice_unref(state->slices[state->first_slice]); + ++state->first_slice; + } + + if (state->memory_owned) { + gpr_free(state->slices); + state->memory_owned = 0; + } +} + +void slice_state_transfer_ownership(grpc_tcp_slice_state *state, + gpr_slice **slices, size_t *nslices) { + *slices = state->slices + state->first_slice; + *nslices = state->last_slice - state->first_slice + 1; + + state->first_slice = -1; + state->last_slice = -1; +} + +/* Fills iov with the first min(iov_size, available) slices, returns number + filled */ +static size_t slice_state_to_iovec(grpc_tcp_slice_state *state, + struct iovec *iov, size_t iov_size) { + size_t nslices = state->last_slice - state->first_slice + 1; + gpr_slice *slices = state->slices + state->first_slice; + size_t i; + if (nslices < iov_size) { + iov_size = nslices; + } + + for (i = 0; i < iov_size; ++i) { + iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]); + iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]); + } + return iov_size; +} + +/* Makes n blocks available at the end of state, writes them into iov, and + returns the number of bytes allocated */ +static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state, + struct iovec *iov, size_t n, + size_t slice_size) { + size_t target_size; + size_t i; + size_t allocated_bytes; + ssize_t allocated_slices = slice_state_slices_allocated(state); + + if (n - state->working_slice_valid >= state->nslices - state->last_slice) { + /* Need to grow the slice array */ + target_size = state->nslices; + do { + target_size = target_size * 2; + } while (target_size < allocated_slices + n - state->working_slice_valid); + /* TODO(klempner): If this ever needs to support both prefix removal and + append, we should be smarter about the growth logic here */ + slice_state_realloc(state, target_size); + } + + i = 0; + allocated_bytes = 0; + + if (state->working_slice_valid) { + iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]); + iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) - + GPR_SLICE_LENGTH(state->slices[state->last_slice]); + allocated_bytes += iov[0].iov_len; + ++i; + state->slices[state->last_slice] = state->working_slice; + state->working_slice_valid = 0; + } + + for (; i < n; ++i) { + ++state->last_slice; + state->slices[state->last_slice] = gpr_slice_malloc(slice_size); + iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]); + iov[i].iov_len = slice_size; + allocated_bytes += slice_size; + } + if (state->first_slice == -1) { + state->first_slice = 0; + } + return allocated_bytes; +} + +/* Remove the last n bytes from state */ +/* TODO(klempner): Consider having this defer actual deletion until later */ +static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) { + while (bytes > 0 && slice_state_has_available(state)) { + if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) { + state->working_slice = state->slices[state->last_slice]; + state->working_slice_valid = 1; + /* TODO(klempner): Combine these into a single operation that doesn't need + to refcount */ + gpr_slice_unref(gpr_slice_split_tail( + &state->slices[state->last_slice], + GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes)); + bytes = 0; + } else { + bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]); + gpr_slice_unref(state->slices[state->last_slice]); + --state->last_slice; + if (state->last_slice == -1) { + state->first_slice = -1; + } + } + } +} + +typedef struct { + grpc_endpoint base; + grpc_em *em; + grpc_em_fd em_fd; + int fd; + size_t slice_size; + gpr_refcount refcount; + + grpc_endpoint_read_cb read_cb; + void *read_user_data; + gpr_timespec read_deadline; + grpc_endpoint_write_cb write_cb; + void *write_user_data; + gpr_timespec write_deadline; + + grpc_tcp_slice_state write_state; +} grpc_tcp; + +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, + grpc_em_cb_status status); +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, + grpc_em_cb_status status); + +#define DEFAULT_SLICE_SIZE 8192 +grpc_endpoint *grpc_tcp_create(int fd, grpc_em *em) { + return grpc_tcp_create_dbg(fd, em, DEFAULT_SLICE_SIZE); +} + +static void grpc_tcp_shutdown(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_em_fd_shutdown(&tcp->em_fd); +} + +static void grpc_tcp_unref(grpc_tcp *tcp) { + int refcount_zero = gpr_unref(&tcp->refcount); + if (refcount_zero) { + grpc_em_fd_destroy(&tcp->em_fd); + close(tcp->fd); + gpr_free(tcp); + } +} + +static void grpc_tcp_destroy(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_tcp_unref(tcp); +} + +static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status status) { + grpc_endpoint_read_cb cb = tcp->read_cb; + +#ifdef GRPC_TRACE_TCP + size_t i; + gpr_log(GPR_DEBUG, "read: status=%d", status); + for (i = 0; i < nslices; i++) { + char *dump = + gpr_hexdump((char *)GPR_SLICE_START_PTR(slices[i]), + GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "READ: %s", dump); + gpr_free(dump); + } +#endif + + tcp->read_cb = NULL; + cb(tcp->read_user_data, slices, nslices, status); +} + +#define INLINE_SLICE_BUFFER_SIZE 8 +#define MAX_READ_IOVEC 4 +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, + grpc_em_cb_status status) { + grpc_tcp *tcp = (grpc_tcp *)arg; + int iov_size = 1; + gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; + struct msghdr msg; + struct iovec iov[MAX_READ_IOVEC]; + ssize_t read_bytes; + ssize_t allocated_bytes; + struct grpc_tcp_slice_state read_state; + gpr_slice *final_slices; + size_t final_nslices; + + slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, + 0); + + if (status == GRPC_CALLBACK_CANCELLED) { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); + grpc_tcp_unref(tcp); + return; + } + + if (status == GRPC_CALLBACK_TIMED_OUT) { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT); + grpc_tcp_unref(tcp); + return; + } + + /* TODO(klempner): Limit the amount we read at once. */ + for (;;) { + allocated_bytes = slice_state_append_blocks_into_iovec( + &read_state, iov, iov_size, tcp->slice_size); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iov_size; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + do { + read_bytes = recvmsg(tcp->fd, &msg, 0); + } while (read_bytes < 0 && errno == EINTR); + + if (read_bytes < allocated_bytes) { + /* TODO(klempner): Consider a second read first, in hopes of getting a + * quick EAGAIN and saving a bunch of allocations. */ + slice_state_remove_last(&read_state, read_bytes < 0 + ? allocated_bytes + : allocated_bytes - read_bytes); + } + + if (read_bytes < 0) { + /* NB: After calling the user_cb a parallel call of the read handler may + * be running. */ + if (errno == EAGAIN) { + if (slice_state_has_available(&read_state)) { + /* TODO(klempner): We should probably do the call into the application + without all this junk on the stack */ + /* FIXME(klempner): Refcount properly */ + slice_state_transfer_ownership(&read_state, &final_slices, + &final_nslices); + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); + } else { + /* Spurious read event, consume it here */ + slice_state_destroy(&read_state); + grpc_em_fd_notify_on_read(&tcp->em_fd, grpc_tcp_handle_read, tcp, + tcp->read_deadline); + } + } else { + /* TODO(klempner): Log interesting errors */ + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); + } + return; + } else if (read_bytes == 0) { + /* 0 read size ==> end of stream */ + if (slice_state_has_available(&read_state)) { + /* there were bytes already read: pass them up to the application */ + slice_state_transfer_ownership(&read_state, &final_slices, + &final_nslices); + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); + } else { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); + } + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); + return; + } else if (iov_size < MAX_READ_IOVEC) { + ++iov_size; + } + } +} + +static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, + void *user_data, gpr_timespec deadline) { + grpc_tcp *tcp = (grpc_tcp *)ep; + GPR_ASSERT(tcp->read_cb == NULL); + tcp->read_cb = cb; + tcp->read_user_data = user_data; + tcp->read_deadline = deadline; + gpr_ref(&tcp->refcount); + grpc_em_fd_notify_on_read(&tcp->em_fd, grpc_tcp_handle_read, tcp, deadline); +} + +#define MAX_WRITE_IOVEC 16 +static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { + struct msghdr msg; + struct iovec iov[MAX_WRITE_IOVEC]; + int iov_size; + ssize_t sent_length; + grpc_tcp_slice_state *state = &tcp->write_state; + + for (;;) { + iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iov_size; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + do { + /* TODO(klempner): Cork if this is a partial write */ + sent_length = sendmsg(tcp->fd, &msg, 0); + } while (sent_length < 0 && errno == EINTR); + + if (sent_length < 0) { + if (errno == EAGAIN) { + return GRPC_ENDPOINT_WRITE_PENDING; + } else { + /* TODO(klempner): Log some of these */ + slice_state_destroy(state); + return GRPC_ENDPOINT_WRITE_ERROR; + } + } + + /* TODO(klempner): Probably better to batch this after we finish flushing */ + slice_state_remove_prefix(state, sent_length); + + if (!slice_state_has_available(state)) { + return GRPC_ENDPOINT_WRITE_DONE; + } + }; +} + +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, + grpc_em_cb_status status) { + grpc_tcp *tcp = (grpc_tcp *)arg; + grpc_endpoint_write_status write_status; + grpc_endpoint_cb_status cb_status; + grpc_endpoint_write_cb cb; + + cb_status = GRPC_ENDPOINT_CB_OK; + + if (status == GRPC_CALLBACK_CANCELLED) { + cb_status = GRPC_ENDPOINT_CB_SHUTDOWN; + } else if (status == GRPC_CALLBACK_TIMED_OUT) { + cb_status = GRPC_ENDPOINT_CB_TIMED_OUT; + } + + if (cb_status != GRPC_ENDPOINT_CB_OK) { + slice_state_destroy(&tcp->write_state); + cb = tcp->write_cb; + tcp->write_cb = NULL; + cb(tcp->write_user_data, cb_status); + grpc_tcp_unref(tcp); + return; + } + + write_status = grpc_tcp_flush(tcp); + if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { + grpc_em_fd_notify_on_write(&tcp->em_fd, grpc_tcp_handle_write, tcp, + tcp->write_deadline); + } else { + slice_state_destroy(&tcp->write_state); + if (write_status == GRPC_ENDPOINT_WRITE_DONE) { + cb_status = GRPC_ENDPOINT_CB_OK; + } else { + cb_status = GRPC_ENDPOINT_CB_ERROR; + } + cb = tcp->write_cb; + tcp->write_cb = NULL; + cb(tcp->write_user_data, cb_status); + grpc_tcp_unref(tcp); + } +} + +static grpc_endpoint_write_status grpc_tcp_write( + grpc_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_endpoint_write_status status; + +#ifdef GRPC_TRACE_TCP + size_t i; + + for (i = 0; i < nslices; i++) { + char *data = + gpr_hexdump((char *)GPR_SLICE_START_PTR(slices[i]), + GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data); + gpr_free(data); + } +#endif + + GPR_ASSERT(tcp->write_cb == NULL); + slice_state_init(&tcp->write_state, slices, nslices, nslices); + + status = grpc_tcp_flush(tcp); + if (status == GRPC_ENDPOINT_WRITE_PENDING) { + /* TODO(klempner): Consider inlining rather than malloc for small nslices */ + slice_state_realloc(&tcp->write_state, nslices); + gpr_ref(&tcp->refcount); + tcp->write_cb = cb; + tcp->write_user_data = user_data; + tcp->write_deadline = deadline; + grpc_em_fd_notify_on_write(&tcp->em_fd, grpc_tcp_handle_write, tcp, + tcp->write_deadline); + } + + return status; +} + +static const grpc_endpoint_vtable vtable = {grpc_tcp_notify_on_read, + grpc_tcp_write, grpc_tcp_shutdown, + grpc_tcp_destroy}; + +grpc_endpoint *grpc_tcp_create_dbg(int fd, grpc_em *em, size_t slice_size) { + grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + tcp->base.vtable = &vtable; + tcp->fd = fd; + tcp->em = em; + tcp->read_cb = NULL; + tcp->write_cb = NULL; + tcp->read_user_data = NULL; + tcp->write_user_data = NULL; + tcp->slice_size = slice_size; + tcp->read_deadline = gpr_inf_future; + tcp->write_deadline = gpr_inf_future; + slice_state_init(&tcp->write_state, NULL, 0, 0); + /* paired with unref in grpc_tcp_destroy */ + gpr_ref_init(&tcp->refcount, 1); + grpc_em_fd_init(&tcp->em_fd, tcp->em, fd); + return &tcp->base; +} diff --git a/src/core/endpoint/tcp.h b/src/core/endpoint/tcp.h new file mode 100644 index 0000000000..6507b2f6ef --- /dev/null +++ b/src/core/endpoint/tcp.h @@ -0,0 +1,55 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_H__ +#define __GRPC_INTERNAL_ENDPOINT_TCP_H__ +/* + Low level TCP "bottom half" implementation, for use by transports built on + top of a TCP connection. + + Note that this file does not (yet) include APIs for creating the socket in + the first place. + + All calls passing slice transfer ownership of a slice refcount unless + otherwise specified. +*/ + +#include "src/core/endpoint/endpoint.h" +#include "src/core/eventmanager/em.h" + +/* Create a tcp from an already connected file descriptor. */ +grpc_endpoint *grpc_tcp_create(int fd, grpc_em *em); +/* Special version for debugging slice changes */ +grpc_endpoint *grpc_tcp_create_dbg(int fd, grpc_em *em, size_t slice_size); + +#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_H__ */ diff --git a/src/core/endpoint/tcp_client.c b/src/core/endpoint/tcp_client.c new file mode 100644 index 0000000000..01a0c3f23d --- /dev/null +++ b/src/core/endpoint/tcp_client.c @@ -0,0 +1,170 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/tcp_client.h" + +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "src/core/endpoint/socket_utils.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + +typedef struct { + void (*cb)(void *arg, grpc_endpoint *tcp); + void *cb_arg; + grpc_em_fd fd; + gpr_timespec deadline; +} async_connect; + +static int create_fd(int address_family) { + int fd = socket(address_family, SOCK_STREAM, 0); + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + goto error; + } + + if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || + !grpc_set_socket_low_latency(fd, 1)) { + gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, + strerror(errno)); + goto error; + } + + return fd; + +error: + if (fd >= 0) { + close(fd); + } + return -1; +} + +static void on_writable(void *acp, grpc_em_cb_status status) { + async_connect *ac = acp; + int so_error = 0; + socklen_t so_error_size; + int err; + int fd = grpc_em_fd_get(&ac->fd); + grpc_em *em = grpc_em_fd_get_em(&ac->fd); + + if (status == GRPC_CALLBACK_SUCCESS) { + do { + so_error_size = sizeof(so_error); + err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); + } while (err < 0 && errno == EINTR); + if (err < 0) { + gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); + goto error; + } else if (so_error != 0) { + if (so_error == ENOBUFS) { + /* We will get one of these errors if we have run out of + memory in the kernel for the data structures allocated + when you connect a socket. If this happens it is very + likely that if we wait a little bit then try again the + connection will work (since other programs or this + program will close their network connections and free up + memory). This does _not_ indicate that there is anything + wrong with the server we are connecting to, this is a + local problem. + + If you are looking at this code, then chances are that + your program or another program on the same computer + opened too many network connections. The "easy" fix: + don't do that! */ + gpr_log(GPR_ERROR, "kernel out of buffers"); + grpc_em_fd_notify_on_write(&ac->fd, on_writable, ac, ac->deadline); + return; + } else { + goto error; + } + } else { + goto great_success; + } + } else { + gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status); + goto error; + } + + abort(); + +error: + ac->cb(ac->cb_arg, NULL); + grpc_em_fd_destroy(&ac->fd); + gpr_free(ac); + close(fd); + return; + +great_success: + grpc_em_fd_destroy(&ac->fd); + ac->cb(ac->cb_arg, grpc_tcp_create(fd, em)); + gpr_free(ac); +} + +void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), + void *arg, grpc_em *em, struct sockaddr *addr, + int len, gpr_timespec deadline) { + int fd = create_fd(addr->sa_family); + int err; + async_connect *ac; + + if (fd < 0) { + cb(arg, NULL); + return; + } + + do { + err = connect(fd, addr, len); + } while (err < 0 && errno == EINTR); + + if (err >= 0) { + cb(arg, grpc_tcp_create(fd, em)); + return; + } + + if (errno != EWOULDBLOCK && errno != EINPROGRESS) { + gpr_log(GPR_ERROR, "connect error: %s", strerror(errno)); + close(fd); + cb(arg, NULL); + return; + } + + ac = gpr_malloc(sizeof(async_connect)); + ac->cb = cb; + ac->cb_arg = arg; + ac->deadline = deadline; + grpc_em_fd_init(&ac->fd, em, fd); + grpc_em_fd_notify_on_write(&ac->fd, on_writable, ac, deadline); +} diff --git a/src/core/endpoint/tcp_client.h b/src/core/endpoint/tcp_client.h new file mode 100644 index 0000000000..2a8b8ee217 --- /dev/null +++ b/src/core/endpoint/tcp_client.h @@ -0,0 +1,50 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__ +#define __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__ + +#include "src/core/endpoint/tcp.h" +#include <grpc/support/time.h> + +#include <sys/types.h> +#include <sys/socket.h> + +/* Asynchronously connect to an address (specified as (addr, len)), and call + cb with arg and the completed connection when done (or call cb with arg and + NULL on failure) */ +void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), + void *arg, grpc_em *em, struct sockaddr *addr, + int len, gpr_timespec deadline); + +#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__ */ diff --git a/src/core/endpoint/tcp_server.c b/src/core/endpoint/tcp_server.c new file mode 100644 index 0000000000..2f386ce045 --- /dev/null +++ b/src/core/endpoint/tcp_server.c @@ -0,0 +1,282 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#define _GNU_SOURCE +#include "src/core/endpoint/tcp_server.h" + +#include <limits.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> + +#include "src/core/endpoint/socket_utils.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +#define INIT_PORT_CAP 2 +#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 + +static gpr_once s_init_max_accept_queue_size; +static int s_max_accept_queue_size; + +/* one listening port */ +typedef struct { + int fd; + grpc_em_fd *emfd; + grpc_tcp_server *server; +} server_port; + +/* the overall server */ +struct grpc_tcp_server { + grpc_em *em; + grpc_tcp_server_cb cb; + void *cb_arg; + + gpr_mu mu; + gpr_cv cv; + + /* active port count: how many ports are actually still listening */ + int active_ports; + + /* all listening ports */ + server_port *ports; + size_t nports; + size_t port_capacity; +}; + +grpc_tcp_server *grpc_tcp_server_create(grpc_em *em) { + grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + gpr_mu_init(&s->mu); + gpr_cv_init(&s->cv); + s->active_ports = 0; + s->em = em; + s->cb = NULL; + s->cb_arg = NULL; + s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->nports = 0; + s->port_capacity = INIT_PORT_CAP; + return s; +} + +void grpc_tcp_server_destroy(grpc_tcp_server *s) { + size_t i; + gpr_mu_lock(&s->mu); + /* shutdown all fd's */ + for (i = 0; i < s->nports; i++) { + grpc_em_fd_shutdown(s->ports[i].emfd); + } + /* wait while that happens */ + while (s->active_ports) { + gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); + } + gpr_mu_unlock(&s->mu); + + /* delete ALL the things */ + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + grpc_em_fd_destroy(sp->emfd); + gpr_free(sp->emfd); + close(sp->fd); + } + gpr_free(s->ports); + gpr_free(s); +} + +/* get max listen queue size on linux */ +static void init_max_accept_queue_size() { + int n = SOMAXCONN; + char buf[64]; + FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r"); + if (fp == NULL) { + /* 2.4 kernel. */ + s_max_accept_queue_size = SOMAXCONN; + return; + } + if (fgets(buf, sizeof buf, fp)) { + char *end; + long i = strtol(buf, &end, 10); + if (i > 0 && i <= INT_MAX && end && *end == 0) { + n = i; + } + } + fclose(fp); + s_max_accept_queue_size = n; + + if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { + gpr_log(GPR_INFO, + "Suspiciously small accept queue (%d) will probably lead to " + "connection drops", + s_max_accept_queue_size); + } +} + +static int get_max_accept_queue_size() { + gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size); + return s_max_accept_queue_size; +} + +/* create a socket to listen with */ +static int create_listening_socket(struct sockaddr *port, int len) { + int fd = socket(port->sa_family, SOCK_STREAM, 0); + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + goto error; + } + + if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || + !grpc_set_socket_low_latency(fd, 1) || + !grpc_set_socket_reuse_addr(fd, 1)) { + gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, + strerror(errno)); + goto error; + } + + if (bind(fd, port, len) < 0) { + gpr_log(GPR_ERROR, "bind: %s", strerror(errno)); + goto error; + } + + if (listen(fd, get_max_accept_queue_size()) < 0) { + gpr_log(GPR_ERROR, "listen: %s", strerror(errno)); + goto error; + } + + return fd; + +error: + if (fd >= 0) { + close(fd); + } + return -1; +} + +/* event manager callback when reads are ready */ +static void on_read(void *arg, grpc_em_cb_status status) { + server_port *sp = arg; + + if (status != GRPC_CALLBACK_SUCCESS) { + goto error; + } + + /* loop until accept4 returns EAGAIN, and then re-arm notification */ + for (;;) { + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1); + if (fd < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + if (GRPC_EM_OK != grpc_em_fd_notify_on_read(sp->emfd, on_read, sp, + gpr_inf_future)) { + gpr_log(GPR_ERROR, "Failed to register read request with em"); + goto error; + } + return; + default: + gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); + goto error; + } + } + + sp->server->cb(sp->server->cb_arg, grpc_tcp_create(fd, sp->server->em)); + } + + abort(); + +error: + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); +} + +int grpc_tcp_server_add_port(grpc_tcp_server *s, struct sockaddr *port, + int len) { + server_port *sp; + /* create a socket */ + int fd = create_listening_socket(port, len); + if (fd < 0) { + return -1; + } + + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb && "must add ports before starting server"); + /* append it to the list under a lock */ + if (s->nports == s->port_capacity) { + s->port_capacity *= 2; + s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity); + } + sp = &s->ports[s->nports++]; + sp->emfd = gpr_malloc(sizeof(grpc_em_fd)); + sp->fd = fd; + sp->server = s; + /* initialize the em desc */ + if (GRPC_EM_OK != grpc_em_fd_init(sp->emfd, s->em, fd)) { + grpc_em_fd_destroy(sp->emfd); + gpr_free(sp->emfd); + s->nports--; + gpr_mu_unlock(&s->mu); + return -1; + } + gpr_mu_unlock(&s->mu); + + return fd; +} + +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, + void *cb_arg) { + size_t i; + GPR_ASSERT(cb); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb); + GPR_ASSERT(s->active_ports == 0); + s->cb = cb; + s->cb_arg = cb_arg; + for (i = 0; i < s->nports; i++) { + grpc_em_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i], + gpr_inf_future); + s->active_ports++; + } + gpr_mu_unlock(&s->mu); +} diff --git a/src/core/endpoint/tcp_server.h b/src/core/endpoint/tcp_server.h new file mode 100644 index 0000000000..99cb83e181 --- /dev/null +++ b/src/core/endpoint/tcp_server.h @@ -0,0 +1,64 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__ +#define __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__ + +#include <sys/types.h> +#include <sys/socket.h> + +#include "src/core/endpoint/tcp.h" +#include "src/core/eventmanager/em.h" + +/* Forward decl of grpc_tcp_server */ +typedef struct grpc_tcp_server grpc_tcp_server; + +/* New server callback: tcp is the newly connected tcp connection */ +typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); + +/* Create a server, initially not bound to any ports */ +grpc_tcp_server *grpc_tcp_server_create(grpc_em *em); + +/* Start listening to bound ports */ +void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb, + void *cb_arg); + +/* Add a port to the server, returns a file descriptor on success, or <0 on + failure; the file descriptor remains owned by the server and will be cleaned + up when grpc_tcp_server_destroy is called */ +int grpc_tcp_server_add_port(grpc_tcp_server *server, struct sockaddr *port, + int len); + +void grpc_tcp_server_destroy(grpc_tcp_server *server); + +#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__ */ |