diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/debug/stats_data.c | 81 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data.h | 14 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data.yaml | 47 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.c | 2 | ||||
-rw-r--r-- | src/core/lib/security/transport/secure_endpoint.c | 200 | ||||
-rw-r--r-- | src/core/lib/security/transport/secure_endpoint.h | 11 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_handshaker.c | 35 | ||||
-rw-r--r-- | src/core/lib/support/string.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/alarm.c | 31 | ||||
-rw-r--r-- | src/core/lib/surface/version.c | 2 |
13 files changed, 296 insertions, 138 deletions
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index 81ef0d28ee..969cac9d89 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -55,10 +55,59 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_wakeup_initiated", "executor_queue_drained", }; +const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { + "Number of client side calls created by this process", + "Number of server side calls created by this process", + "Number of polling syscalls (epoll_wait, poll, etc) made by this process", + "Number of sleeping syscalls made by this process", + "How many polling wakeups were performed by the process", + "How many times was a polling wakeup requested without an active poller", + "How many times was the same polling worker awoken repeatedly before " + "waking up", + "How many times was an eventfd used as the wakeup vector for a polling " + "wakeup", + "How many times was a condition variable used as the wakeup vector for a " + "polling wakeup", + "How many times could a polling wakeup be satisfied by keeping the waking " + "thread awake?", + "Number of times histogram increments went through the slow (binary " + "search) path", + "Number of write syscalls (or equivalent - eg sendmsg) made by this " + "process", + "Number of read syscalls (or equivalent - eg recvmsg) made by this process", + "Number of batches received by HTTP2 transport", + "Number of cancelations received by HTTP2 transport", + "Number of batches containing send initial metadata", + "Number of batches containing send message", + "Number of batches containing send trailing metadata", + "Number of batches containing receive initial metadata", + "Number of batches containing receive message", + "Number of batches containing receive trailing metadata", + "Number of HTTP2 pings sent by process", "Number of HTTP2 writes initiated", + "Number of combiner lock entries by process (first items queued to a " + "combiner)", + "Number of items scheduled against combiner locks", + "Number of final items scheduled against combiner locks", + "Number of combiner locks offloaded to different threads", + "Number of closures scheduled against the executor (gRPC thread pool)", + "Number of closures scheduled by the executor to the executor", + "Number of thread wakeups initiated within the executor", + "Number of times an executor queue was drained", +}; const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { - "call_initial_size", "poll_events_returned", "tcp_write_size", - "tcp_write_iov_size", "tcp_read_size", "tcp_read_offer", - "tcp_read_iov_size", "http2_send_message_size", + "call_initial_size", "poll_events_returned", "tcp_write_size", + "tcp_write_iov_size", "tcp_read_size", "tcp_read_offer", + "tcp_read_offer_iov_size", "http2_send_message_size", +}; +const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { + "Initial size of the grpc_call arena created at call start", + "How many events are called for each syscall_poll", + "Number of bytes offered to each syscall_write", + "Number of byte segments offered to each syscall_write", + "Number of bytes received by each syscall_read", + "Number of bytes offered to each syscall_read", + "Number of byte segments offered to each syscall_read", + "Size of messages received by HTTP2 transport", }; const int grpc_stats_table_0[65] = { 0, 1, 2, 3, 4, 5, 7, 9, 11, 14, @@ -276,11 +325,12 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) { grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_4, 64)); } -void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) { +void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, + int value) { value = GPR_CLAMP(value, 0, 1024); if (value < 13) { - GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE, - value); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, value); return; } union { @@ -293,11 +343,12 @@ void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) { grpc_stats_table_7[((_val.uint - 4623507967449235456ull) >> 48)] + 13; _bkt.dbl = grpc_stats_table_6[bucket]; bucket -= (_val.uint < _bkt.uint); - GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE, - bucket); + GRPC_STATS_INC_HISTOGRAM( + (exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, bucket); return; } - GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE, + GRPC_STATS_INC_HISTOGRAM((exec_ctx), + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_6, 64)); } @@ -335,7 +386,11 @@ const int *const grpc_stats_histo_bucket_boundaries[8] = { grpc_stats_table_6, grpc_stats_table_4, grpc_stats_table_4, grpc_stats_table_6, grpc_stats_table_4}; void (*const grpc_stats_inc_histogram[8])(grpc_exec_ctx *exec_ctx, int x) = { - grpc_stats_inc_call_initial_size, grpc_stats_inc_poll_events_returned, - grpc_stats_inc_tcp_write_size, grpc_stats_inc_tcp_write_iov_size, - grpc_stats_inc_tcp_read_size, grpc_stats_inc_tcp_read_offer, - grpc_stats_inc_tcp_read_iov_size, grpc_stats_inc_http2_send_message_size}; + grpc_stats_inc_call_initial_size, + grpc_stats_inc_poll_events_returned, + grpc_stats_inc_tcp_write_size, + grpc_stats_inc_tcp_write_iov_size, + grpc_stats_inc_tcp_read_size, + grpc_stats_inc_tcp_read_offer, + grpc_stats_inc_tcp_read_offer_iov_size, + grpc_stats_inc_http2_send_message_size}; diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index e6a2256172..27f0aa8ba2 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -59,6 +59,7 @@ typedef enum { GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; +extern const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT]; typedef enum { GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE, GRPC_STATS_HISTOGRAM_POLL_EVENTS_RETURNED, @@ -66,11 +67,12 @@ typedef enum { GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE, GRPC_STATS_HISTOGRAM_TCP_READ_SIZE, GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, - GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE, GRPC_STATS_HISTOGRAM_COUNT } grpc_stats_histograms; extern const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT]; +extern const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT]; typedef enum { GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE_FIRST_SLOT = 0, GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE_BUCKETS = 64, @@ -84,8 +86,8 @@ typedef enum { GRPC_STATS_HISTOGRAM_TCP_READ_SIZE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_FIRST_SLOT = 384, GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_BUCKETS = 64, - GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE_FIRST_SLOT = 448, - GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE_BUCKETS = 64, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_FIRST_SLOT = 448, + GRPC_STATS_HISTOGRAM_TCP_READ_OFFER_IOV_SIZE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_FIRST_SLOT = 512, GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_SIZE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_BUCKETS = 576 @@ -182,9 +184,9 @@ void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int x); #define GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, value) \ grpc_stats_inc_tcp_read_offer((exec_ctx), (int)(value)) void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int x); -#define GRPC_STATS_INC_TCP_READ_IOV_SIZE(exec_ctx, value) \ - grpc_stats_inc_tcp_read_iov_size((exec_ctx), (int)(value)) -void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int x); +#define GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, value) \ + grpc_stats_inc_tcp_read_offer_iov_size((exec_ctx), (int)(value)) +void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int x); #define GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(exec_ctx, value) \ grpc_stats_inc_http2_send_message_size((exec_ctx), (int)(value)) void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int x); diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 4e0bc5de58..ddf1294a78 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -17,63 +17,108 @@ # overall - counter: client_calls_created + doc: Number of client side calls created by this process - counter: server_calls_created + doc: Number of server side calls created by this process - histogram: call_initial_size max: 262144 buckets: 64 + doc: Initial size of the grpc_call arena created at call start # polling - counter: syscall_poll + doc: Number of polling syscalls (epoll_wait, poll, etc) made by this process - counter: syscall_wait + doc: Number of sleeping syscalls made by this process - histogram: poll_events_returned max: 1024 buckets: 128 + doc: How many events are called for each syscall_poll - counter: pollset_kick + doc: How many polling wakeups were performed by the process - counter: pollset_kicked_without_poller + doc: How many times was a polling wakeup requested without an active poller - counter: pollset_kicked_again + doc: How many times was the same polling worker awoken repeatedly before + waking up - counter: pollset_kick_wakeup_fd + doc: How many times was an eventfd used as the wakeup vector for a polling + wakeup - counter: pollset_kick_wakeup_cv + doc: How many times was a condition variable used as the wakeup vector for a + polling wakeup - counter: pollset_kick_own_thread + doc: How many times could a polling wakeup be satisfied by keeping the waking + thread awake? # stats system - counter: histogram_slow_lookups + doc: Number of times histogram increments went through the slow + (binary search) path # tcp - counter: syscall_write + doc: Number of write syscalls (or equivalent - eg sendmsg) made by this process - counter: syscall_read + doc: Number of read syscalls (or equivalent - eg recvmsg) made by this process - histogram: tcp_write_size max: 16777216 # 16 meg max write tracked buckets: 64 + doc: Number of bytes offered to each syscall_write - histogram: tcp_write_iov_size max: 1024 buckets: 64 + doc: Number of byte segments offered to each syscall_write - histogram: tcp_read_size max: 16777216 buckets: 64 + doc: Number of bytes received by each syscall_read - histogram: tcp_read_offer max: 16777216 buckets: 64 -- histogram: tcp_read_iov_size + doc: Number of bytes offered to each syscall_read +- histogram: tcp_read_offer_iov_size max: 1024 buckets: 64 + doc: Number of byte segments offered to each syscall_read # chttp2 - counter: http2_op_batches + doc: Number of batches received by HTTP2 transport - counter: http2_op_cancel + doc: Number of cancelations received by HTTP2 transport - counter: http2_op_send_initial_metadata + doc: Number of batches containing send initial metadata - counter: http2_op_send_message + doc: Number of batches containing send message - counter: http2_op_send_trailing_metadata + doc: Number of batches containing send trailing metadata - counter: http2_op_recv_initial_metadata + doc: Number of batches containing receive initial metadata - counter: http2_op_recv_message + doc: Number of batches containing receive message - counter: http2_op_recv_trailing_metadata + doc: Number of batches containing receive trailing metadata - histogram: http2_send_message_size max: 16777216 buckets: 64 + doc: Size of messages received by HTTP2 transport - counter: http2_pings_sent + doc: Number of HTTP2 pings sent by process - counter: http2_writes_begun + doc: Number of HTTP2 writes initiated # combiner locks - counter: combiner_locks_initiated + doc: Number of combiner lock entries by process + (first items queued to a combiner) - counter: combiner_locks_scheduled_items + doc: Number of items scheduled against combiner locks - counter: combiner_locks_scheduled_final_items + doc: Number of final items scheduled against combiner locks - counter: combiner_locks_offloaded + doc: Number of combiner locks offloaded to different threads # executor - counter: executor_scheduled_items + doc: Number of closures scheduled against the executor (gRPC thread pool) - counter: executor_scheduled_to_self + doc: Number of closures scheduled by the executor to the executor - counter: executor_wakeup_initiated + doc: Number of thread wakeups initiated within the executor - counter: executor_queue_drained + doc: Number of times an executor queue was drained diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 98a2afd78b..3372e14eef 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -256,7 +256,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { msg.msg_flags = 0; GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, tcp->incoming_buffer->length); - GRPC_STATS_INC_TCP_READ_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count); + GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count); GPR_TIMER_BEGIN("recvmsg", 0); do { diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index b92b8fb8b8..ac392f87fe 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -44,6 +44,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_closure *closure, gpr_timespec now); +/* Initialize *timer without setting it. This can later be passed through + the regular init or cancel */ +void grpc_timer_init_unset(grpc_timer *timer); + /* Note that there is no timer destroy function. This is because the timer is a one-time occurrence with a guarantee that the callback will be called exactly once, either at expiration or cancellation. Thus, all diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 12efce241f..c08bb525b7 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -234,6 +234,8 @@ static void note_deadline_change(timer_shard *shard) { } } +void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; } + void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_closure *closure, gpr_timespec now) { diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 70f49bcbe8..adced41f53 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -77,6 +77,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_unref((uv_handle_t *)uv_timer); } +void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = 0; } + void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { GRPC_UV_ASSERT_SAME_THREAD(); if (timer->pending) { diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 5e41b94ff8..ae5633b82c 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -34,7 +34,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" -#include "src/core/tsi/transport_security_interface.h" +#include "src/core/tsi/transport_security_grpc.h" #define STAGING_BUFFER_SIZE 8192 @@ -42,6 +42,7 @@ typedef struct { grpc_endpoint base; grpc_endpoint *wrapped_ep; struct tsi_frame_protector *protector; + struct tsi_zero_copy_grpc_protector *zero_copy_protector; gpr_mu protector_mu; /* saved upper level callbacks and user_data. */ grpc_closure *read_cb; @@ -67,6 +68,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) { secure_endpoint *ep = secure_ep; grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep); tsi_frame_protector_destroy(ep->protector); + tsi_zero_copy_grpc_protector_destroy(exec_ctx, ep->zero_copy_protector); grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes); grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer); grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer); @@ -159,51 +161,58 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, return; } - /* TODO(yangg) check error, maybe bail out early */ - for (i = 0; i < ep->source_buffer.count; i++) { - grpc_slice encrypted = ep->source_buffer.slices[i]; - uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted); - size_t message_size = GRPC_SLICE_LENGTH(encrypted); - - while (message_size > 0 || keep_looping) { - size_t unprotected_buffer_size_written = (size_t)(end - cur); - size_t processed_message_size = message_size; - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_unprotect(ep->protector, message_bytes, - &processed_message_size, cur, - &unprotected_buffer_size_written); - gpr_mu_unlock(&ep->protector_mu); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Decryption error: %s", - tsi_result_to_string(result)); - break; - } - message_bytes += processed_message_size; - message_size -= processed_message_size; - cur += unprotected_buffer_size_written; - - if (cur == end) { - flush_read_staging_buffer(ep, &cur, &end); - /* Force to enter the loop again to extract buffered bytes in protector. - The bytes could be buffered because of running out of staging_buffer. - If this happens at the end of all slices, doing another unprotect - avoids leaving data in the protector. */ - keep_looping = 1; - } else if (unprotected_buffer_size_written > 0) { - keep_looping = 1; - } else { - keep_looping = 0; + if (ep->zero_copy_protector != NULL) { + // Use zero-copy grpc protector to unprotect. + result = tsi_zero_copy_grpc_protector_unprotect( + exec_ctx, ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer); + } else { + // Use frame protector to unprotect. + /* TODO(yangg) check error, maybe bail out early */ + for (i = 0; i < ep->source_buffer.count; i++) { + grpc_slice encrypted = ep->source_buffer.slices[i]; + uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted); + size_t message_size = GRPC_SLICE_LENGTH(encrypted); + + while (message_size > 0 || keep_looping) { + size_t unprotected_buffer_size_written = (size_t)(end - cur); + size_t processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_unprotect( + ep->protector, message_bytes, &processed_message_size, cur, + &unprotected_buffer_size_written); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Decryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += unprotected_buffer_size_written; + + if (cur == end) { + flush_read_staging_buffer(ep, &cur, &end); + /* Force to enter the loop again to extract buffered bytes in + protector. The bytes could be buffered because of running out of + staging_buffer. If this happens at the end of all slices, doing + another unprotect avoids leaving data in the protector. */ + keep_looping = 1; + } else if (unprotected_buffer_size_written > 0) { + keep_looping = 1; + } else { + keep_looping = 0; + } } + if (result != TSI_OK) break; } - if (result != TSI_OK) break; - } - if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) { - grpc_slice_buffer_add( - ep->read_buffer, - grpc_slice_split_head( - &ep->read_staging_buffer, - (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer)))); + if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) { + grpc_slice_buffer_add( + ep->read_buffer, + grpc_slice_split_head( + &ep->read_staging_buffer, + (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer)))); + } } /* TODO(yangg) experiment with moving this block after read_cb to see if it @@ -270,54 +279,62 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, } } - for (i = 0; i < slices->count; i++) { - grpc_slice plain = slices->slices[i]; - uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain); - size_t message_size = GRPC_SLICE_LENGTH(plain); - while (message_size > 0) { - size_t protected_buffer_size_to_send = (size_t)(end - cur); - size_t processed_message_size = message_size; - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_protect(ep->protector, message_bytes, - &processed_message_size, cur, - &protected_buffer_size_to_send); - gpr_mu_unlock(&ep->protector_mu); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Encryption error: %s", - tsi_result_to_string(result)); - break; - } - message_bytes += processed_message_size; - message_size -= processed_message_size; - cur += protected_buffer_size_to_send; - - if (cur == end) { - flush_write_staging_buffer(ep, &cur, &end); + if (ep->zero_copy_protector != NULL) { + // Use zero-copy grpc protector to protect. + result = tsi_zero_copy_grpc_protector_protect( + exec_ctx, ep->zero_copy_protector, slices, &ep->output_buffer); + } else { + // Use frame protector to protect. + for (i = 0; i < slices->count; i++) { + grpc_slice plain = slices->slices[i]; + uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain); + size_t message_size = GRPC_SLICE_LENGTH(plain); + while (message_size > 0) { + size_t protected_buffer_size_to_send = (size_t)(end - cur); + size_t processed_message_size = message_size; + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect(ep->protector, message_bytes, + &processed_message_size, cur, + &protected_buffer_size_to_send); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Encryption error: %s", + tsi_result_to_string(result)); + break; + } + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += protected_buffer_size_to_send; + + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } } - } - if (result != TSI_OK) break; - } - if (result == TSI_OK) { - size_t still_pending_size; - do { - size_t protected_buffer_size_to_send = (size_t)(end - cur); - gpr_mu_lock(&ep->protector_mu); - result = tsi_frame_protector_protect_flush(ep->protector, cur, - &protected_buffer_size_to_send, - &still_pending_size); - gpr_mu_unlock(&ep->protector_mu); if (result != TSI_OK) break; - cur += protected_buffer_size_to_send; - if (cur == end) { - flush_write_staging_buffer(ep, &cur, &end); + } + if (result == TSI_OK) { + size_t still_pending_size; + do { + size_t protected_buffer_size_to_send = (size_t)(end - cur); + gpr_mu_lock(&ep->protector_mu); + result = tsi_frame_protector_protect_flush( + ep->protector, cur, &protected_buffer_size_to_send, + &still_pending_size); + gpr_mu_unlock(&ep->protector_mu); + if (result != TSI_OK) break; + cur += protected_buffer_size_to_send; + if (cur == end) { + flush_write_staging_buffer(ep, &cur, &end); + } + } while (still_pending_size > 0); + if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) { + grpc_slice_buffer_add( + &ep->output_buffer, + grpc_slice_split_head( + &ep->write_staging_buffer, + (size_t)(cur - + GRPC_SLICE_START_PTR(ep->write_staging_buffer)))); } - } while (still_pending_size > 0); - if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) { - grpc_slice_buffer_add( - &ep->output_buffer, - grpc_slice_split_head( - &ep->write_staging_buffer, - (size_t)(cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer)))); } } @@ -389,13 +406,16 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_get_fd}; grpc_endpoint *grpc_secure_endpoint_create( - struct tsi_frame_protector *protector, grpc_endpoint *transport, - grpc_slice *leftover_slices, size_t leftover_nslices) { + struct tsi_frame_protector *protector, + struct tsi_zero_copy_grpc_protector *zero_copy_protector, + grpc_endpoint *transport, grpc_slice *leftover_slices, + size_t leftover_nslices) { size_t i; secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint)); ep->base.vtable = &vtable; ep->wrapped_ep = transport; ep->protector = protector; + ep->zero_copy_protector = zero_copy_protector; grpc_slice_buffer_init(&ep->leftover_bytes); for (i = 0; i < leftover_nslices; i++) { grpc_slice_buffer_add(&ep->leftover_bytes, diff --git a/src/core/lib/security/transport/secure_endpoint.h b/src/core/lib/security/transport/secure_endpoint.h index 1c5555f3df..3323a6ff42 100644 --- a/src/core/lib/security/transport/secure_endpoint.h +++ b/src/core/lib/security/transport/secure_endpoint.h @@ -23,12 +23,17 @@ #include "src/core/lib/iomgr/endpoint.h" struct tsi_frame_protector; +struct tsi_zero_copy_grpc_protector; extern grpc_tracer_flag grpc_trace_secure_endpoint; -/* Takes ownership of protector and to_wrap, and refs leftover_slices. */ +/* Takes ownership of protector, zero_copy_protector, and to_wrap, and refs + * leftover_slices. If zero_copy_protector is not NULL, protector will never be + * used. */ grpc_endpoint *grpc_secure_endpoint_create( - struct tsi_frame_protector *protector, grpc_endpoint *to_wrap, - grpc_slice *leftover_slices, size_t leftover_nslices); + struct tsi_frame_protector *protector, + struct tsi_zero_copy_grpc_protector *zero_copy_protector, + grpc_endpoint *to_wrap, grpc_slice *leftover_slices, + size_t leftover_nslices); #endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURE_ENDPOINT_H */ diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index fc9c9f980f..ea9608f444 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -32,6 +32,7 @@ #include "src/core/lib/security/transport/secure_endpoint.h" #include "src/core/lib/security/transport/tsi_error.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/tsi/transport_security_grpc.h" #define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256 @@ -135,17 +136,31 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); goto done; } - // Create frame protector. - tsi_frame_protector *protector; - tsi_result result = tsi_handshaker_result_create_frame_protector( - h->handshaker_result, NULL, &protector); - if (result != TSI_OK) { + // Create zero-copy frame protector, if implemented. + tsi_zero_copy_grpc_protector *zero_copy_protector = NULL; + tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector( + h->handshaker_result, NULL, &zero_copy_protector); + if (result != TSI_OK && result != TSI_UNIMPLEMENTED) { error = grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Zero-copy frame protector creation failed"), result); security_handshake_failed_locked(exec_ctx, h, error); goto done; } + // Create frame protector if zero-copy frame protector is NULL. + tsi_frame_protector *protector = NULL; + if (zero_copy_protector == NULL) { + result = tsi_handshaker_result_create_frame_protector(h->handshaker_result, + NULL, &protector); + if (result != TSI_OK) { + error = grpc_set_tsi_error_result(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Frame protector creation failed"), + result); + security_handshake_failed_locked(exec_ctx, h, error); + goto done; + } + } // Get unused bytes. const unsigned char *unused_bytes = NULL; size_t unused_bytes_size = 0; @@ -155,12 +170,12 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, if (unused_bytes_size > 0) { grpc_slice slice = grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size); - h->args->endpoint = - grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1); + h->args->endpoint = grpc_secure_endpoint_create( + protector, zero_copy_protector, h->args->endpoint, &slice, 1); grpc_slice_unref_internal(exec_ctx, slice); } else { - h->args->endpoint = - grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0); + h->args->endpoint = grpc_secure_endpoint_create( + protector, zero_copy_protector, h->args->endpoint, NULL, 0); } tsi_handshaker_result_destroy(h->handshaker_result); h->handshaker_result = NULL; diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c index ec93303024..523e43445b 100644 --- a/src/core/lib/support/string.c +++ b/src/core/lib/support/string.c @@ -300,11 +300,12 @@ void *gpr_memrchr(const void *s, int c, size_t n) { } bool gpr_is_true(const char *s) { + size_t i; if (s == NULL) { return false; } static const char *truthy[] = {"yes", "true", "1"}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + for (i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { if (0 == gpr_stricmp(s, truthy[i])) { return true; } diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 7d60b1de17..5dbfaa2d43 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -44,7 +44,9 @@ static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); } static void alarm_unref(grpc_alarm *alarm) { if (gpr_unref(&alarm->refs)) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + if (alarm->cq != NULL) { + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); + } grpc_exec_ctx_finish(&exec_ctx); gpr_free(alarm); } @@ -93,12 +95,8 @@ static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { (void *)alarm, &alarm->completion); } -grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, - void *tag) { +grpc_alarm *grpc_alarm_create(void *reserved) { grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm)); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - gpr_ref_init(&alarm->refs, 1); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) { @@ -106,27 +104,36 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, } #endif + gpr_ref_init(&alarm->refs, 1); + grpc_timer_init_unset(&alarm->alarm); + alarm->cq = NULL; + GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, + grpc_schedule_on_exec_ctx); + return alarm; +} + +void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq, + gpr_timespec deadline, void *tag, void *reserved) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_REF(cq, "alarm"); alarm->cq = cq; alarm->tag = tag; GPR_ASSERT(grpc_cq_begin_op(cq, tag)); - GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, - grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_exec_ctx_finish(&exec_ctx); - return alarm; } -void grpc_alarm_cancel(grpc_alarm *alarm) { +void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_timer_cancel(&exec_ctx, &alarm->alarm); grpc_exec_ctx_finish(&exec_ctx); } -void grpc_alarm_destroy(grpc_alarm *alarm) { - grpc_alarm_cancel(alarm); +void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved) { + grpc_alarm_cancel(alarm, reserved); GRPC_ALARM_UNREF(alarm, "alarm_destroy"); } diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 96c16105e7..fd6ea4daa9 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -21,6 +21,6 @@ #include <grpc/grpc.h> -const char *grpc_version_string(void) { return "4.0.0-dev"; } +const char *grpc_version_string(void) { return "5.0.0-dev"; } const char *grpc_g_stands_for(void) { return "gambit"; } |