diff options
author | Nicolas Noble <nnoble@google.com> | 2014-11-26 16:33:03 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2014-11-26 16:33:03 -0800 |
commit | b7ebd3b8c6fe39f99c40b10c1b563e4adb607b6c (patch) | |
tree | c1decf819492d455ec81cd471942c5516138f825 /src/core/endpoint/secure_endpoint.c | |
parent | 0e905e63db21bcdd85d3d1af051fcdc5bb5caa38 (diff) |
Initial import.
Diffstat (limited to 'src/core/endpoint/secure_endpoint.c')
-rw-r--r-- | src/core/endpoint/secure_endpoint.c | 335 |
1 files changed, 335 insertions, 0 deletions
diff --git a/src/core/endpoint/secure_endpoint.c b/src/core/endpoint/secure_endpoint.c new file mode 100644 index 0000000000..4fab0faa03 --- /dev/null +++ b/src/core/endpoint/secure_endpoint.c @@ -0,0 +1,335 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/secure_endpoint.h" +#include "src/core/tsi/transport_security_interface.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> + +#define STAGING_BUFFER_SIZE 8192 + +typedef struct { + grpc_endpoint base; + grpc_endpoint *wrapped_ep; + struct tsi_frame_protector *protector; + gpr_mu protector_mu; + /* saved upper level callbacks and user_data. */ + grpc_endpoint_read_cb read_cb; + void *read_user_data; + /* saved handshaker leftover data to unprotect. */ + gpr_slice_buffer leftover_bytes; + /* buffers for read and write */ + gpr_slice read_staging_buffer; + gpr_slice_buffer input_buffer; + + gpr_slice write_staging_buffer; + gpr_slice_buffer output_buffer; + + gpr_refcount ref; +} secure_endpoint; + +static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); } + +static void destroy(secure_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_destroy(ep->wrapped_ep); + tsi_frame_protector_destroy(ep->protector); + gpr_slice_buffer_destroy(&ep->leftover_bytes); + gpr_slice_unref(ep->read_staging_buffer); + gpr_slice_buffer_destroy(&ep->input_buffer); + gpr_slice_unref(ep->write_staging_buffer); + gpr_slice_buffer_destroy(&ep->output_buffer); + gpr_mu_destroy(&ep->protector_mu); + gpr_free(ep); +} + +static void secure_endpoint_unref(secure_endpoint *ep) { + if (gpr_unref(&ep->ref)) { + destroy(ep); + } +} + +static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, + gpr_uint8 **end) { + gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer); + ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); + *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); + *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); +} + +static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { +#ifdef GRPC_TRACE_SECURE_TRANSPORT + size_t i; + for (i = 0; i < nslices; i++) { + char *data = + gpr_hexdump((char*)GPR_SLICE_START_PTR(slices[i]), + GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "READ %p: %s", ep, data); + gpr_free(data); + } +#endif + ep->read_cb(ep->read_user_data, slices, nslices, error); + secure_endpoint_unref(ep); +} + +static void on_read(void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + int i = 0; + gpr_uint8 keep_looping = 0; + int input_buffer_count = 0; + tsi_result result = TSI_OK; + secure_endpoint *ep = (secure_endpoint *)user_data; + gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); + gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); + + /* TODO(yangg) check error, maybe bail out early */ + for (i = 0; i < nslices; i++) { + gpr_slice encrypted = slices[i]; + gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted); + size_t message_size = GPR_SLICE_LENGTH(encrypted); + + while (message_size > 0 || keep_looping) { + gpr_uint32 unprotected_buffer_size_written = end - cur; + gpr_uint32 processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_unprotect(ep->protector, message_bytes, + &processed_message_size, cur, + &unprotected_buffer_size_written); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Decryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += unprotected_buffer_size_written; + + if (cur == end) { + flush_read_staging_buffer(ep, &cur, &end); + /* Force to enter the loop again to extract buffered bytes in protector. + The bytes could be buffered because of running out of staging_buffer. + If this happens at the end of all slices, doing another unprotect + avoids leaving data in the protector. */ + keep_looping = 1; + } else if (unprotected_buffer_size_written > 0) { + keep_looping = 1; + } else { + keep_looping = 0; + } + } + if (result != TSI_OK) break; + } + + if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) { + gpr_slice_buffer_add( + &ep->input_buffer, + gpr_slice_split_head( + &ep->read_staging_buffer, + cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))); + } + + /* TODO(yangg) experiment with moving this block after read_cb to see if it + helps latency */ + for (i = 0; i < nslices; i++) { + gpr_slice_unref(slices[i]); + } + + if (result != TSI_OK) { + gpr_slice_buffer_reset_and_unref(&ep->input_buffer); + call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR); + return; + } + /* The upper level will unref the slices. */ + input_buffer_count = ep->input_buffer.count; + ep->input_buffer.count = 0; + call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error); +} + +static void notify_on_read(grpc_endpoint *secure_ep, grpc_endpoint_read_cb cb, + void *user_data, gpr_timespec deadline) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + ep->read_cb = cb; + ep->read_user_data = user_data; + + secure_endpoint_ref(ep); + + if (ep->leftover_bytes.count) { + size_t leftover_nslices = ep->leftover_bytes.count; + ep->leftover_bytes.count = 0; + on_read(ep, ep->leftover_bytes.slices, leftover_nslices, + GRPC_ENDPOINT_CB_OK); + return; + } + + grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep, deadline); +} + +static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, + gpr_uint8 **end) { + gpr_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer); + ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); + *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer); + *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); +} + +static grpc_endpoint_write_status write(grpc_endpoint *secure_ep, + gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data, + gpr_timespec deadline) { + int i = 0; + int output_buffer_count = 0; + tsi_result result = TSI_OK; + secure_endpoint *ep = (secure_endpoint *)secure_ep; + gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer); + gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); + GPR_ASSERT(ep->output_buffer.count == 0); + +#ifdef GRPC_TRACE_SECURE_TRANSPORT + for (i = 0; i < nslices; i++) { + char *data = + gpr_hexdump((char*)GPR_SLICE_START_PTR(slices[i]), + GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data); + gpr_free(data); + } +#endif + + for (i = 0; i < nslices; i++) { + gpr_slice plain = slices[i]; + gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain); + size_t message_size = GPR_SLICE_LENGTH(plain); + while (message_size > 0) { + gpr_uint32 protected_buffer_size_to_send = end - cur; + gpr_uint32 processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect(ep->protector, message_bytes, + &processed_message_size, cur, + &protected_buffer_size_to_send); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Encryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += protected_buffer_size_to_send; + + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } + } + if (result != TSI_OK) break; + } + if (result == TSI_OK) { + gpr_uint32 still_pending_size; + do { + gpr_uint32 protected_buffer_size_to_send = end - cur; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect_flush(ep->protector, cur, + &protected_buffer_size_to_send, + &still_pending_size); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) break; + cur += protected_buffer_size_to_send; + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } + } while (still_pending_size > 0); + if (cur != GPR_SLICE_START_PTR(ep->write_staging_buffer)) { + gpr_slice_buffer_add( + &ep->output_buffer, + gpr_slice_split_head( + &ep->write_staging_buffer, + cur - GPR_SLICE_START_PTR(ep->write_staging_buffer))); + } + } + + for (i = 0; i < nslices; i++) { + gpr_slice_unref(slices[i]); + } + + if (result != TSI_OK) { + /* TODO(yangg) do different things according to the error type? */ + gpr_slice_buffer_reset_and_unref(&ep->output_buffer); + return GRPC_ENDPOINT_WRITE_ERROR; + } + + /* clear output_buffer and let the lower level handle its slices. */ + output_buffer_count = ep->output_buffer.count; + ep->output_buffer.count = 0; + return grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices, + output_buffer_count, cb, user_data, deadline); +} + +static void shutdown(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_shutdown(ep->wrapped_ep); +} + +static void unref(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + secure_endpoint_unref(ep); +} + +static const grpc_endpoint_vtable vtable = {notify_on_read, write, shutdown, + unref}; + +grpc_endpoint *grpc_secure_endpoint_create( + struct tsi_frame_protector *protector, grpc_endpoint *transport, + gpr_slice *leftover_slices, size_t leftover_nslices) { + size_t i; + secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint)); + ep->base.vtable = &vtable; + ep->wrapped_ep = transport; + ep->protector = protector; + gpr_slice_buffer_init(&ep->leftover_bytes); + for (i = 0; i < leftover_nslices; i++) { + gpr_slice_buffer_add(&ep->leftover_bytes, + gpr_slice_ref(leftover_slices[i])); + } + ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); + ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); + gpr_slice_buffer_init(&ep->input_buffer); + gpr_slice_buffer_init(&ep->output_buffer); + gpr_mu_init(&ep->protector_mu); + gpr_ref_init(&ep->ref, 1); + return &ep->base; +} |