diff options
Diffstat (limited to 'src/core/lib')
18 files changed, 759 insertions, 341 deletions
diff --git a/src/core/lib/compression/stream_compression.c b/src/core/lib/compression/stream_compression.c index ba9302794f..411489f029 100644 --- a/src/core/lib/compression/stream_compression.c +++ b/src/core/lib/compression/stream_compression.c @@ -16,177 +16,62 @@ * */ -#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include "src/core/lib/compression/stream_compression.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/compression/stream_compression_gzip.h" -#define OUTPUT_BLOCK_SIZE (1024) - -static bool gzip_flate(grpc_stream_compression_context *ctx, - grpc_slice_buffer *in, grpc_slice_buffer *out, - size_t *output_size, size_t max_output_size, int flush, - bool *end_of_context) { - GPR_ASSERT(flush == 0 || flush == Z_SYNC_FLUSH || flush == Z_FINISH); - /* Full flush is not allowed when inflating. */ - GPR_ASSERT(!(ctx->flate == inflate && (flush == Z_FINISH))); - - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - int r; - bool eoc = false; - size_t original_max_output_size = max_output_size; - while (max_output_size > 0 && (in->length > 0 || flush) && !eoc) { - size_t slice_size = max_output_size < OUTPUT_BLOCK_SIZE ? max_output_size - : OUTPUT_BLOCK_SIZE; - grpc_slice slice_out = GRPC_SLICE_MALLOC(slice_size); - ctx->zs.avail_out = (uInt)slice_size; - ctx->zs.next_out = GRPC_SLICE_START_PTR(slice_out); - while (ctx->zs.avail_out > 0 && in->length > 0 && !eoc) { - grpc_slice slice = grpc_slice_buffer_take_first(in); - ctx->zs.avail_in = (uInt)GRPC_SLICE_LENGTH(slice); - ctx->zs.next_in = GRPC_SLICE_START_PTR(slice); - r = ctx->flate(&ctx->zs, Z_NO_FLUSH); - if (r < 0 && r != Z_BUF_ERROR) { - gpr_log(GPR_ERROR, "zlib error (%d)", r); - grpc_slice_unref_internal(&exec_ctx, slice_out); - grpc_exec_ctx_finish(&exec_ctx); - return false; - } else if (r == Z_STREAM_END && ctx->flate == inflate) { - eoc = true; - } - if (ctx->zs.avail_in > 0) { - grpc_slice_buffer_undo_take_first( - in, - grpc_slice_sub(slice, GRPC_SLICE_LENGTH(slice) - ctx->zs.avail_in, - GRPC_SLICE_LENGTH(slice))); - } - grpc_slice_unref_internal(&exec_ctx, slice); - } - if (flush != 0 && ctx->zs.avail_out > 0 && !eoc) { - GPR_ASSERT(in->length == 0); - r = ctx->flate(&ctx->zs, flush); - if (flush == Z_SYNC_FLUSH) { - switch (r) { - case Z_OK: - /* Maybe flush is not complete; just made some partial progress. */ - if (ctx->zs.avail_out > 0) { - flush = 0; - } - break; - case Z_BUF_ERROR: - case Z_STREAM_END: - flush = 0; - break; - default: - gpr_log(GPR_ERROR, "zlib error (%d)", r); - grpc_slice_unref_internal(&exec_ctx, slice_out); - grpc_exec_ctx_finish(&exec_ctx); - return false; - } - } else if (flush == Z_FINISH) { - switch (r) { - case Z_OK: - case Z_BUF_ERROR: - /* Wait for the next loop to assign additional output space. */ - GPR_ASSERT(ctx->zs.avail_out == 0); - break; - case Z_STREAM_END: - flush = 0; - break; - default: - gpr_log(GPR_ERROR, "zlib error (%d)", r); - grpc_slice_unref_internal(&exec_ctx, slice_out); - grpc_exec_ctx_finish(&exec_ctx); - return false; - } - } - } - - if (ctx->zs.avail_out == 0) { - grpc_slice_buffer_add(out, slice_out); - } else if (ctx->zs.avail_out < slice_size) { - slice_out.data.refcounted.length -= ctx->zs.avail_out; - grpc_slice_buffer_add(out, slice_out); - } else { - grpc_slice_unref_internal(&exec_ctx, slice_out); - } - max_output_size -= (slice_size - ctx->zs.avail_out); - } - grpc_exec_ctx_finish(&exec_ctx); - if (end_of_context) { - *end_of_context = eoc; - } - if (output_size) { - *output_size = original_max_output_size - max_output_size; - } - return true; -} +extern const grpc_stream_compression_vtable + grpc_stream_compression_identity_vtable; bool grpc_stream_compress(grpc_stream_compression_context *ctx, grpc_slice_buffer *in, grpc_slice_buffer *out, size_t *output_size, size_t max_output_size, grpc_stream_compression_flush flush) { - GPR_ASSERT(ctx->flate == deflate); - int gzip_flush; - switch (flush) { - case GRPC_STREAM_COMPRESSION_FLUSH_NONE: - gzip_flush = 0; - break; - case GRPC_STREAM_COMPRESSION_FLUSH_SYNC: - gzip_flush = Z_SYNC_FLUSH; - break; - case GRPC_STREAM_COMPRESSION_FLUSH_FINISH: - gzip_flush = Z_FINISH; - break; - default: - gzip_flush = 0; - } - return gzip_flate(ctx, in, out, output_size, max_output_size, gzip_flush, - NULL); + return ctx->vtable->compress(ctx, in, out, output_size, max_output_size, + flush); } bool grpc_stream_decompress(grpc_stream_compression_context *ctx, grpc_slice_buffer *in, grpc_slice_buffer *out, size_t *output_size, size_t max_output_size, bool *end_of_context) { - GPR_ASSERT(ctx->flate == inflate); - return gzip_flate(ctx, in, out, output_size, max_output_size, Z_SYNC_FLUSH, - end_of_context); + return ctx->vtable->decompress(ctx, in, out, output_size, max_output_size, + end_of_context); } grpc_stream_compression_context *grpc_stream_compression_context_create( grpc_stream_compression_method method) { - grpc_stream_compression_context *ctx = - (grpc_stream_compression_context *)gpr_zalloc( - sizeof(grpc_stream_compression_context)); - int r; - if (ctx == NULL) { - return NULL; - } - if (method == GRPC_STREAM_COMPRESSION_DECOMPRESS) { - r = inflateInit2(&ctx->zs, 0x1F); - ctx->flate = inflate; - } else { - r = deflateInit2(&ctx->zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 0x1F, 8, - Z_DEFAULT_STRATEGY); - ctx->flate = deflate; - } - if (r != Z_OK) { - gpr_free(ctx); - return NULL; + switch (method) { + case GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS: + case GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS: + return grpc_stream_compression_identity_vtable.context_create(method); + case GRPC_STREAM_COMPRESSION_GZIP_COMPRESS: + case GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS: + return grpc_stream_compression_gzip_vtable.context_create(method); + default: + gpr_log(GPR_ERROR, "Unknown stream compression method: %d", method); + return NULL; } - - return ctx; } void grpc_stream_compression_context_destroy( grpc_stream_compression_context *ctx) { - if (ctx->flate == inflate) { - inflateEnd(&ctx->zs); + ctx->vtable->context_destroy(ctx); +} + +int grpc_stream_compression_method_parse( + grpc_slice value, bool is_compress, + grpc_stream_compression_method *method) { + if (grpc_slice_eq(value, GRPC_MDSTR_IDENTITY)) { + *method = is_compress ? GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS + : GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; + return 1; + } else if (grpc_slice_eq(value, GRPC_MDSTR_GZIP)) { + *method = is_compress ? GRPC_STREAM_COMPRESSION_GZIP_COMPRESS + : GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS; + return 1; } else { - deflateEnd(&ctx->zs); + return 0; } - gpr_free(ctx); } diff --git a/src/core/lib/compression/stream_compression.h b/src/core/lib/compression/stream_compression.h index 0daaa9e655..6d073280fa 100644 --- a/src/core/lib/compression/stream_compression.h +++ b/src/core/lib/compression/stream_compression.h @@ -24,15 +24,20 @@ #include <grpc/slice_buffer.h> #include <zlib.h> +#include "src/core/lib/transport/static_metadata.h" + +typedef struct grpc_stream_compression_vtable grpc_stream_compression_vtable; + /* Stream compression/decompression context */ typedef struct grpc_stream_compression_context { - z_stream zs; - int (*flate)(z_stream *zs, int flush); + const grpc_stream_compression_vtable *vtable; } grpc_stream_compression_context; typedef enum grpc_stream_compression_method { - GRPC_STREAM_COMPRESSION_COMPRESS = 0, - GRPC_STREAM_COMPRESSION_DECOMPRESS, + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS = 0, + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS, + GRPC_STREAM_COMPRESSION_GZIP_COMPRESS, + GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS, GRPC_STREAM_COMPRESSION_METHOD_COUNT } grpc_stream_compression_method; @@ -43,6 +48,19 @@ typedef enum grpc_stream_compression_flush { GRPC_STREAM_COMPRESSION_FLUSH_COUNT } grpc_stream_compression_flush; +struct grpc_stream_compression_vtable { + bool (*compress)(grpc_stream_compression_context *ctx, grpc_slice_buffer *in, + grpc_slice_buffer *out, size_t *output_size, + size_t max_output_size, grpc_stream_compression_flush flush); + bool (*decompress)(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, + bool *end_of_context); + grpc_stream_compression_context *(*context_create)( + grpc_stream_compression_method method); + void (*context_destroy)(grpc_stream_compression_context *ctx); +}; + /** * Compress bytes provided in \a in with a given context, with an optional flush * at the end of compression. Emits at most \a max_output_size compressed bytes @@ -87,4 +105,10 @@ grpc_stream_compression_context *grpc_stream_compression_context_create( void grpc_stream_compression_context_destroy( grpc_stream_compression_context *ctx); -#endif /* GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_H */ +/** + * Parse stream compression method based on algorithm name + */ +int grpc_stream_compression_method_parse( + grpc_slice value, bool is_compress, grpc_stream_compression_method *method); + +#endif diff --git a/src/core/lib/compression/stream_compression_gzip.c b/src/core/lib/compression/stream_compression_gzip.c new file mode 100644 index 0000000000..abcbdb3a91 --- /dev/null +++ b/src/core/lib/compression/stream_compression_gzip.c @@ -0,0 +1,228 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/compression/stream_compression_gzip.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/slice/slice_internal.h" + +#define OUTPUT_BLOCK_SIZE (1024) + +typedef struct grpc_stream_compression_context_gzip { + grpc_stream_compression_context base; + + z_stream zs; + int (*flate)(z_stream *zs, int flush); +} grpc_stream_compression_context_gzip; + +static bool gzip_flate(grpc_stream_compression_context_gzip *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, int flush, + bool *end_of_context) { + GPR_ASSERT(flush == 0 || flush == Z_SYNC_FLUSH || flush == Z_FINISH); + /* Full flush is not allowed when inflating. */ + GPR_ASSERT(!(ctx->flate == inflate && (flush == Z_FINISH))); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + int r; + bool eoc = false; + size_t original_max_output_size = max_output_size; + while (max_output_size > 0 && (in->length > 0 || flush) && !eoc) { + size_t slice_size = max_output_size < OUTPUT_BLOCK_SIZE ? max_output_size + : OUTPUT_BLOCK_SIZE; + grpc_slice slice_out = GRPC_SLICE_MALLOC(slice_size); + ctx->zs.avail_out = (uInt)slice_size; + ctx->zs.next_out = GRPC_SLICE_START_PTR(slice_out); + while (ctx->zs.avail_out > 0 && in->length > 0 && !eoc) { + grpc_slice slice = grpc_slice_buffer_take_first(in); + ctx->zs.avail_in = (uInt)GRPC_SLICE_LENGTH(slice); + ctx->zs.next_in = GRPC_SLICE_START_PTR(slice); + r = ctx->flate(&ctx->zs, Z_NO_FLUSH); + if (r < 0 && r != Z_BUF_ERROR) { + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } else if (r == Z_STREAM_END && ctx->flate == inflate) { + eoc = true; + } + if (ctx->zs.avail_in > 0) { + grpc_slice_buffer_undo_take_first( + in, + grpc_slice_sub(slice, GRPC_SLICE_LENGTH(slice) - ctx->zs.avail_in, + GRPC_SLICE_LENGTH(slice))); + } + grpc_slice_unref_internal(&exec_ctx, slice); + } + if (flush != 0 && ctx->zs.avail_out > 0 && !eoc) { + GPR_ASSERT(in->length == 0); + r = ctx->flate(&ctx->zs, flush); + if (flush == Z_SYNC_FLUSH) { + switch (r) { + case Z_OK: + /* Maybe flush is not complete; just made some partial progress. */ + if (ctx->zs.avail_out > 0) { + flush = 0; + } + break; + case Z_BUF_ERROR: + case Z_STREAM_END: + flush = 0; + break; + default: + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } + } else if (flush == Z_FINISH) { + switch (r) { + case Z_OK: + case Z_BUF_ERROR: + /* Wait for the next loop to assign additional output space. */ + GPR_ASSERT(ctx->zs.avail_out == 0); + break; + case Z_STREAM_END: + flush = 0; + break; + default: + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } + } + } + + if (ctx->zs.avail_out == 0) { + grpc_slice_buffer_add(out, slice_out); + } else if (ctx->zs.avail_out < slice_size) { + slice_out.data.refcounted.length -= ctx->zs.avail_out; + grpc_slice_buffer_add(out, slice_out); + } else { + grpc_slice_unref_internal(&exec_ctx, slice_out); + } + max_output_size -= (slice_size - ctx->zs.avail_out); + } + grpc_exec_ctx_finish(&exec_ctx); + if (end_of_context) { + *end_of_context = eoc; + } + if (output_size) { + *output_size = original_max_output_size - max_output_size; + } + return true; +} + +static bool grpc_stream_compress_gzip(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, + grpc_slice_buffer *out, + size_t *output_size, + size_t max_output_size, + grpc_stream_compression_flush flush) { + if (ctx == NULL) { + return false; + } + grpc_stream_compression_context_gzip *gzip_ctx = + (grpc_stream_compression_context_gzip *)ctx; + GPR_ASSERT(gzip_ctx->flate == deflate); + int gzip_flush; + switch (flush) { + case GRPC_STREAM_COMPRESSION_FLUSH_NONE: + gzip_flush = 0; + break; + case GRPC_STREAM_COMPRESSION_FLUSH_SYNC: + gzip_flush = Z_SYNC_FLUSH; + break; + case GRPC_STREAM_COMPRESSION_FLUSH_FINISH: + gzip_flush = Z_FINISH; + break; + default: + gzip_flush = 0; + } + return gzip_flate(gzip_ctx, in, out, output_size, max_output_size, gzip_flush, + NULL); +} + +static bool grpc_stream_decompress_gzip(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, + grpc_slice_buffer *out, + size_t *output_size, + size_t max_output_size, + bool *end_of_context) { + if (ctx == NULL) { + return false; + } + grpc_stream_compression_context_gzip *gzip_ctx = + (grpc_stream_compression_context_gzip *)ctx; + GPR_ASSERT(gzip_ctx->flate == inflate); + return gzip_flate(gzip_ctx, in, out, output_size, max_output_size, + Z_SYNC_FLUSH, end_of_context); +} + +static grpc_stream_compression_context * +grpc_stream_compression_context_create_gzip( + grpc_stream_compression_method method) { + GPR_ASSERT(method == GRPC_STREAM_COMPRESSION_GZIP_COMPRESS || + method == GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS); + grpc_stream_compression_context_gzip *gzip_ctx = + (grpc_stream_compression_context_gzip *)gpr_zalloc( + sizeof(grpc_stream_compression_context_gzip)); + int r; + if (gzip_ctx == NULL) { + return NULL; + } + if (method == GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS) { + r = inflateInit2(&gzip_ctx->zs, 0x1F); + gzip_ctx->flate = inflate; + } else { + r = deflateInit2(&gzip_ctx->zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 0x1F, 8, + Z_DEFAULT_STRATEGY); + gzip_ctx->flate = deflate; + } + if (r != Z_OK) { + gpr_free(gzip_ctx); + return NULL; + } + + gzip_ctx->base.vtable = &grpc_stream_compression_gzip_vtable; + return (grpc_stream_compression_context *)gzip_ctx; +} + +static void grpc_stream_compression_context_destroy_gzip( + grpc_stream_compression_context *ctx) { + if (ctx == NULL) { + return; + } + grpc_stream_compression_context_gzip *gzip_ctx = + (grpc_stream_compression_context_gzip *)ctx; + if (gzip_ctx->flate == inflate) { + inflateEnd(&gzip_ctx->zs); + } else { + deflateEnd(&gzip_ctx->zs); + } + gpr_free(ctx); +} + +const grpc_stream_compression_vtable grpc_stream_compression_gzip_vtable = { + .compress = grpc_stream_compress_gzip, + .decompress = grpc_stream_decompress_gzip, + .context_create = grpc_stream_compression_context_create_gzip, + .context_destroy = grpc_stream_compression_context_destroy_gzip}; diff --git a/src/core/lib/compression/stream_compression_gzip.h b/src/core/lib/compression/stream_compression_gzip.h new file mode 100644 index 0000000000..7cf49a0de9 --- /dev/null +++ b/src/core/lib/compression/stream_compression_gzip.h @@ -0,0 +1,26 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_GZIP_H +#define GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_GZIP_H + +#include "src/core/lib/compression/stream_compression.h" + +extern const grpc_stream_compression_vtable grpc_stream_compression_gzip_vtable; + +#endif diff --git a/src/core/lib/compression/stream_compression_identity.c b/src/core/lib/compression/stream_compression_identity.c new file mode 100644 index 0000000000..3dfcf53b85 --- /dev/null +++ b/src/core/lib/compression/stream_compression_identity.c @@ -0,0 +1,94 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/compression/stream_compression_identity.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/slice/slice_internal.h" + +#define OUTPUT_BLOCK_SIZE (1024) + +/* Singleton context used for all identity streams. */ +static grpc_stream_compression_context identity_ctx = { + .vtable = &grpc_stream_compression_identity_vtable}; + +static void grpc_stream_compression_pass_through(grpc_slice_buffer *in, + grpc_slice_buffer *out, + size_t *output_size, + size_t max_output_size) { + if (max_output_size >= in->length) { + if (output_size) { + *output_size = in->length; + } + grpc_slice_buffer_move_into(in, out); + } else { + if (output_size) { + *output_size = max_output_size; + } + grpc_slice_buffer_move_first(in, max_output_size, out); + } +} + +static bool grpc_stream_compress_identity(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, + grpc_slice_buffer *out, + size_t *output_size, + size_t max_output_size, + grpc_stream_compression_flush flush) { + if (ctx == NULL) { + return false; + } + grpc_stream_compression_pass_through(in, out, output_size, max_output_size); + return true; +} + +static bool grpc_stream_decompress_identity( + grpc_stream_compression_context *ctx, grpc_slice_buffer *in, + grpc_slice_buffer *out, size_t *output_size, size_t max_output_size, + bool *end_of_context) { + if (ctx == NULL) { + return false; + } + grpc_stream_compression_pass_through(in, out, output_size, max_output_size); + if (end_of_context) { + *end_of_context = false; + } + return true; +} + +static grpc_stream_compression_context * +grpc_stream_compression_context_create_identity( + grpc_stream_compression_method method) { + GPR_ASSERT(method == GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS || + method == GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS); + /* No context needed in this case. Use fake context instead. */ + return (grpc_stream_compression_context *)&identity_ctx; +} + +static void grpc_stream_compression_context_destroy_identity( + grpc_stream_compression_context *ctx) { + return; +} + +const grpc_stream_compression_vtable grpc_stream_compression_identity_vtable = { + .compress = grpc_stream_compress_identity, + .decompress = grpc_stream_decompress_identity, + .context_create = grpc_stream_compression_context_create_identity, + .context_destroy = grpc_stream_compression_context_destroy_identity}; diff --git a/src/core/lib/compression/stream_compression_identity.h b/src/core/lib/compression/stream_compression_identity.h new file mode 100644 index 0000000000..41926e949e --- /dev/null +++ b/src/core/lib/compression/stream_compression_identity.h @@ -0,0 +1,27 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_IDENTITY_H +#define GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_IDENTITY_H + +#include "src/core/lib/compression/stream_compression.h" + +extern const grpc_stream_compression_vtable + grpc_stream_compression_identity_vtable; + +#endif diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index 498fa18545..5bd7884e28 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -110,8 +110,6 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_wakeup_initiated", "executor_queue_drained", "executor_push_retries", - "executor_threads_created", - "executor_threads_used", "server_requested_calls", "server_slowpath_requests_queued", }; @@ -221,8 +219,6 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of times an executor queue was drained", "Number of times we raced and were forced to retry pushing a closure to " "the executor", - "Size of the backing thread pool for overflow gRPC Core work", - "How many executor threads actually got used", "How many calls were requested (not necessarily received) by the server", "How many times was the server slow path taken (indicates too few " "outstanding requests)", @@ -240,7 +236,6 @@ const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { "http2_send_message_per_write", "http2_send_trailing_metadata_per_write", "http2_send_flowctl_per_write", - "executor_closures_per_wakeup", "server_cqs_checked", }; const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { @@ -256,7 +251,6 @@ const char *grpc_stats_histogram_doc[GRPC_STATS_HISTOGRAM_COUNT] = { "Number of streams whose payload was written per TCP write", "Number of streams terminated per TCP write", "Number of flow control updates written per TCP write", - "Number of closures executed each time an executor wakes up", "How many completion queues were checked looking for a CQ that had " "requested the incoming call", }; @@ -328,7 +322,6 @@ const uint8_t grpc_stats_table_7[102] = { const int grpc_stats_table_8[9] = {0, 1, 2, 4, 7, 13, 23, 39, 64}; const uint8_t grpc_stats_table_9[9] = {0, 0, 1, 2, 2, 3, 4, 4, 5}; void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 262144); if (value < 6) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_CALL_INITIAL_SIZE, @@ -354,7 +347,6 @@ void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_0, 64)); } void grpc_stats_inc_poll_events_returned(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 29) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), @@ -381,7 +373,6 @@ void grpc_stats_inc_poll_events_returned(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_2, 128)); } void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 16777216); if (value < 5) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_WRITE_SIZE, @@ -407,7 +398,6 @@ void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_4, 64)); } void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), @@ -433,7 +423,6 @@ void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_6, 64)); } void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 16777216); if (value < 5) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_SIZE, @@ -459,7 +448,6 @@ void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int value) { (exec_ctx), value, grpc_stats_table_4, 64)); } void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 16777216); if (value < 5) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_OFFER, @@ -486,7 +474,6 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) { } void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -514,7 +501,6 @@ void grpc_stats_inc_tcp_read_offer_iov_size(grpc_exec_ctx *exec_ctx, } void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 16777216); if (value < 5) { GRPC_STATS_INC_HISTOGRAM( @@ -542,7 +528,6 @@ void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx, } void grpc_stats_inc_http2_send_initial_metadata_per_write( grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -572,7 +557,6 @@ void grpc_stats_inc_http2_send_initial_metadata_per_write( } void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -600,7 +584,6 @@ void grpc_stats_inc_http2_send_message_per_write(grpc_exec_ctx *exec_ctx, } void grpc_stats_inc_http2_send_trailing_metadata_per_write( grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -630,7 +613,6 @@ void grpc_stats_inc_http2_send_trailing_metadata_per_write( } void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 1024); if (value < 13) { GRPC_STATS_INC_HISTOGRAM( @@ -656,36 +638,7 @@ void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_6, 64)); } -void grpc_stats_inc_executor_closures_per_wakeup(grpc_exec_ctx *exec_ctx, - int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ - value = GPR_CLAMP(value, 0, 1024); - if (value < 13) { - GRPC_STATS_INC_HISTOGRAM( - (exec_ctx), GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, value); - return; - } - union { - double dbl; - uint64_t uint; - } _val, _bkt; - _val.dbl = value; - if (_val.uint < 4637863191261478912ull) { - int bucket = - grpc_stats_table_7[((_val.uint - 4623507967449235456ull) >> 48)] + 13; - _bkt.dbl = grpc_stats_table_6[bucket]; - bucket -= (_val.uint < _bkt.uint); - GRPC_STATS_INC_HISTOGRAM( - (exec_ctx), GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, bucket); - return; - } - GRPC_STATS_INC_HISTOGRAM((exec_ctx), - GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, - grpc_stats_histo_find_bucket_slow( - (exec_ctx), value, grpc_stats_table_6, 64)); -} void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int value) { - /* Automatically generated by tools/codegen/core/gen_stats_data.py */ value = GPR_CLAMP(value, 0, 64); if (value < 3) { GRPC_STATS_INC_HISTOGRAM((exec_ctx), @@ -710,17 +663,17 @@ void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int value) { grpc_stats_histo_find_bucket_slow( (exec_ctx), value, grpc_stats_table_8, 8)); } -const int grpc_stats_histo_buckets[14] = {64, 128, 64, 64, 64, 64, 64, - 64, 64, 64, 64, 64, 64, 8}; -const int grpc_stats_histo_start[14] = {0, 64, 192, 256, 320, 384, 448, - 512, 576, 640, 704, 768, 832, 896}; -const int *const grpc_stats_histo_bucket_boundaries[14] = { +const int grpc_stats_histo_buckets[13] = {64, 128, 64, 64, 64, 64, 64, + 64, 64, 64, 64, 64, 8}; +const int grpc_stats_histo_start[13] = {0, 64, 192, 256, 320, 384, 448, + 512, 576, 640, 704, 768, 832}; +const int *const grpc_stats_histo_bucket_boundaries[13] = { grpc_stats_table_0, grpc_stats_table_2, grpc_stats_table_4, grpc_stats_table_6, grpc_stats_table_4, grpc_stats_table_4, grpc_stats_table_6, grpc_stats_table_4, grpc_stats_table_6, grpc_stats_table_6, grpc_stats_table_6, grpc_stats_table_6, - grpc_stats_table_6, grpc_stats_table_8}; -void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx, int x) = { + grpc_stats_table_8}; +void (*const grpc_stats_inc_histogram[13])(grpc_exec_ctx *exec_ctx, int x) = { grpc_stats_inc_call_initial_size, grpc_stats_inc_poll_events_returned, grpc_stats_inc_tcp_write_size, @@ -733,5 +686,4 @@ void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx, int x) = { grpc_stats_inc_http2_send_message_per_write, grpc_stats_inc_http2_send_trailing_metadata_per_write, grpc_stats_inc_http2_send_flowctl_per_write, - grpc_stats_inc_executor_closures_per_wakeup, grpc_stats_inc_server_cqs_checked}; diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index 9c5fb0178c..507e5f760e 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -112,8 +112,6 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_WAKEUP_INITIATED, GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED, GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, - GRPC_STATS_COUNTER_EXECUTOR_THREADS_CREATED, - GRPC_STATS_COUNTER_EXECUTOR_THREADS_USED, GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS, GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED, GRPC_STATS_COUNTER_COUNT @@ -133,7 +131,6 @@ typedef enum { GRPC_STATS_HISTOGRAM_HTTP2_SEND_MESSAGE_PER_WRITE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE, - GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP, GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED, GRPC_STATS_HISTOGRAM_COUNT } grpc_stats_histograms; @@ -164,11 +161,9 @@ typedef enum { GRPC_STATS_HISTOGRAM_HTTP2_SEND_TRAILING_METADATA_PER_WRITE_BUCKETS = 64, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_FIRST_SLOT = 768, GRPC_STATS_HISTOGRAM_HTTP2_SEND_FLOWCTL_PER_WRITE_BUCKETS = 64, - GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP_FIRST_SLOT = 832, - GRPC_STATS_HISTOGRAM_EXECUTOR_CLOSURES_PER_WAKEUP_BUCKETS = 64, - GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_FIRST_SLOT = 896, + GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_FIRST_SLOT = 832, GRPC_STATS_HISTOGRAM_SERVER_CQS_CHECKED_BUCKETS = 8, - GRPC_STATS_HISTOGRAM_BUCKETS = 904 + GRPC_STATS_HISTOGRAM_BUCKETS = 840 } grpc_stats_histogram_constants; #define GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED) @@ -421,11 +416,6 @@ typedef enum { GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_QUEUE_DRAINED) #define GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES) -#define GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(exec_ctx) \ - GRPC_STATS_INC_COUNTER((exec_ctx), \ - GRPC_STATS_COUNTER_EXECUTOR_THREADS_CREATED) -#define GRPC_STATS_INC_EXECUTOR_THREADS_USED(exec_ctx) \ - GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_EXECUTOR_THREADS_USED) #define GRPC_STATS_INC_SERVER_REQUESTED_CALLS(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS) #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \ @@ -472,17 +462,13 @@ void grpc_stats_inc_http2_send_trailing_metadata_per_write( grpc_stats_inc_http2_send_flowctl_per_write((exec_ctx), (int)(value)) void grpc_stats_inc_http2_send_flowctl_per_write(grpc_exec_ctx *exec_ctx, int x); -#define GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, value) \ - grpc_stats_inc_executor_closures_per_wakeup((exec_ctx), (int)(value)) -void grpc_stats_inc_executor_closures_per_wakeup(grpc_exec_ctx *exec_ctx, - int x); #define GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, value) \ grpc_stats_inc_server_cqs_checked((exec_ctx), (int)(value)) void grpc_stats_inc_server_cqs_checked(grpc_exec_ctx *exec_ctx, int x); -extern const int grpc_stats_histo_buckets[14]; -extern const int grpc_stats_histo_start[14]; -extern const int *const grpc_stats_histo_bucket_boundaries[14]; -extern void (*const grpc_stats_inc_histogram[14])(grpc_exec_ctx *exec_ctx, +extern const int grpc_stats_histo_buckets[13]; +extern const int grpc_stats_histo_start[13]; +extern const int *const grpc_stats_histo_bucket_boundaries[13]; +extern void (*const grpc_stats_inc_histogram[13])(grpc_exec_ctx *exec_ctx, int x); #endif /* GRPC_CORE_LIB_DEBUG_STATS_DATA_H */ diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 119d321746..5c0ab2262e 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -261,14 +261,6 @@ - counter: executor_push_retries doc: Number of times we raced and were forced to retry pushing a closure to the executor -- counter: executor_threads_created - doc: Size of the backing thread pool for overflow gRPC Core work -- counter: executor_threads_used - doc: How many executor threads actually got used -- histogram: executor_closures_per_wakeup - max: 1024 - buckets: 64 - doc: Number of closures executed each time an executor wakes up # server - counter: server_requested_calls doc: How many calls were requested (not necessarily received) by the server diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 3bcf83f9d8..54869977b0 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -85,7 +85,5 @@ executor_scheduled_to_self_per_iteration:FLOAT, executor_wakeup_initiated_per_iteration:FLOAT, executor_queue_drained_per_iteration:FLOAT, executor_push_retries_per_iteration:FLOAT, -executor_threads_created_per_iteration:FLOAT, -executor_threads_used_per_iteration:FLOAT, server_requested_calls_per_iteration:FLOAT, server_slowpath_requests_queued_per_iteration:FLOAT diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index cb4f506aed..58d9daf2dd 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -32,14 +32,16 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/spinlock.h" +#define MAX_DEPTH 2 + typedef struct { gpr_mu mu; gpr_cv cv; grpc_closure_list elems; + size_t depth; bool shutdown; bool queued_long_job; gpr_thd_id id; - grpc_closure_list local_elems; } thread_state; static thread_state *g_thread_state; @@ -54,35 +56,32 @@ static grpc_tracer_flag executor_trace = static void executor_thread(void *arg); -static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { - int n = 0; // number of closures executed +static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { + size_t n = 0; - while (!grpc_closure_list_empty(*list)) { - grpc_closure *c = list->head; - grpc_closure_list_init(list); - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; - if (GRPC_TRACER_ON(executor_trace)) { + grpc_closure *c = list.head; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { #ifndef NDEBUG - gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, - c->file_created, c->line_created); + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); #else - gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); #endif - } + } #ifndef NDEBUG - c->scheduled = false; + c->scheduled = false; #endif - n++; - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - grpc_exec_ctx_flush(exec_ctx); - } + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + n++; + grpc_exec_ctx_flush(exec_ctx); } - GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n); + return n; } bool grpc_executor_is_threaded() { @@ -127,7 +126,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_destroy(&g_thread_state[i].mu); gpr_cv_destroy(&g_thread_state[i].cv); - run_closures(exec_ctx, &g_thread_state[i].elems); + run_closures(exec_ctx, g_thread_state[i].elems); } gpr_free(g_thread_state); gpr_tls_destroy(&g_this_thread_state); @@ -151,14 +150,14 @@ static void executor_thread(void *arg) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); - GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx); - - bool used = false; + size_t subtract_depth = 0; for (;;) { if (GRPC_TRACER_ON(executor_trace)) { - gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state)); + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", + (int)(ts - g_thread_state), subtract_depth); } gpr_mu_lock(&ts->mu); + ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -171,13 +170,8 @@ static void executor_thread(void *arg) { gpr_mu_unlock(&ts->mu); break; } - if (!used) { - GRPC_STATS_INC_EXECUTOR_THREADS_USED(&exec_ctx); - used = true; - } GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx); - GPR_ASSERT(grpc_closure_list_empty(ts->local_elems)); - ts->local_elems = ts->elems; + grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); if (GRPC_TRACER_ON(executor_trace)) { @@ -185,7 +179,7 @@ static void executor_thread(void *arg) { } grpc_exec_ctx_invalidate_now(&exec_ctx); - run_closures(&exec_ctx, &ts->local_elems); + subtract_depth = run_closures(&exec_ctx, exec); } grpc_exec_ctx_finish(&exec_ctx); } @@ -218,10 +212,6 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); - if (is_short) { - grpc_closure_list_append(&ts->local_elems, closure, error); - return; - } } thread_state *orig_ts = ts; @@ -261,7 +251,8 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_cv_signal(&ts->cv); } grpc_closure_list_append(&ts->elems, closure, error); - try_new_thread = ts->elems.head != closure && + ts->depth++; + try_new_thread = ts->depth > MAX_DEPTH && cur_thread_count < g_max_threads && !ts->shutdown; if (!is_short) ts->queued_long_job = true; gpr_mu_unlock(&ts->mu); diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.c index 2732c159aa..6e85e4b61f 100644 --- a/src/core/lib/iomgr/socket_utils_windows.c +++ b/src/core/lib/iomgr/socket_utils_windows.c @@ -26,12 +26,8 @@ #include <grpc/support/log.h> const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) { -#ifdef GPR_WIN_INET_NTOP - return inet_ntop(af, src, dst, size); -#else /* Windows InetNtopA wants a mutable ip pointer */ return InetNtopA(af, (void *)src, dst, size); -#endif /* GPR_WIN_INET_NTOP */ } #endif /* GRPC_WINDOWS_SOCKETUTILS */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 78765818ca..14330bd1dd 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -79,6 +79,125 @@ static timer_shard g_shards[NUM_SHARDS]; * Access to this is protected by g_shared_mutables.mu */ static timer_shard *g_shard_queue[NUM_SHARDS]; +#ifndef NDEBUG + +/* == Hash table for duplicate timer detection == */ + +#define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */ + +static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */ +static grpc_timer *g_timer_ht[NUM_HASH_BUCKETS] = {NULL}; + +static void init_timer_ht() { + for (int i = 0; i < NUM_HASH_BUCKETS; i++) { + gpr_mu_init(&g_hash_mu[i]); + } +} + +static bool is_in_ht(grpc_timer *t) { + size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); + + gpr_mu_lock(&g_hash_mu[i]); + grpc_timer *p = g_timer_ht[i]; + while (p != NULL && p != t) { + p = p->hash_table_next; + } + gpr_mu_unlock(&g_hash_mu[i]); + + return (p == t); +} + +static void add_to_ht(grpc_timer *t) { + GPR_ASSERT(!t->hash_table_next); + size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); + + gpr_mu_lock(&g_hash_mu[i]); + grpc_timer *p = g_timer_ht[i]; + while (p != NULL && p != t) { + p = p->hash_table_next; + } + + if (p == t) { + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** Duplicate timer (%p) being added. Closure: (%p), created at: " + "(%s:%d), scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); + abort(); + } + + /* Timer not present in the bucket. Insert at head of the list */ + t->hash_table_next = g_timer_ht[i]; + g_timer_ht[i] = t; + gpr_mu_unlock(&g_hash_mu[i]); +} + +static void remove_from_ht(grpc_timer *t) { + size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); + bool removed = false; + + gpr_mu_lock(&g_hash_mu[i]); + if (g_timer_ht[i] == t) { + g_timer_ht[i] = g_timer_ht[i]->hash_table_next; + removed = true; + } else if (g_timer_ht[i] != NULL) { + grpc_timer *p = g_timer_ht[i]; + while (p->hash_table_next != NULL && p->hash_table_next != t) { + p = p->hash_table_next; + } + + if (p->hash_table_next == t) { + p->hash_table_next = t->hash_table_next; + removed = true; + } + } + gpr_mu_unlock(&g_hash_mu[i]); + + if (!removed) { + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** Removing timer (%p) that is not added to hash table. Closure " + "(%p), created at: (%s:%d), scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); + abort(); + } + + t->hash_table_next = NULL; +} + +/* If a timer is added to a timer shard (either heap or a list), it cannot + * be pending. A timer is added to hash table only-if it is added to the + * timer shard. + * Therefore, if timer->pending is false, it cannot be in hash table */ +static void validate_non_pending_timer(grpc_timer *t) { + if (!t->pending && is_in_ht(t)) { + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** gpr_timer_cancel() called on a non-pending timer (%p) which " + "is in the hash table. Closure: (%p), created at: (%s:%d), " + "scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); + abort(); + } +} + +#define INIT_TIMER_HASH_TABLE() init_timer_ht() +#define ADD_TO_HASH_TABLE(t) add_to_ht((t)) +#define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t)) +#define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t)) + +#else + +#define INIT_TIMER_HASH_TABLE() +#define ADD_TO_HASH_TABLE(t) +#define REMOVE_FROM_HASH_TABLE(t) +#define VALIDATE_NON_PENDING_TIMER(t) + +#endif + /* Thread local variable that stores the deadline of the next timer the thread * has last-seen. This is an optimization to prevent the thread from checking * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock, @@ -139,6 +258,8 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { shard->min_deadline = compute_min_deadline(shard); g_shard_queue[i] = shard; } + + INIT_TIMER_HASH_TABLE(); } void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { @@ -202,6 +323,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->closure = closure; timer->deadline = deadline; +#ifndef NDEBUG + timer->hash_table_next = NULL; +#endif + if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, @@ -229,6 +354,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_time_averaged_stats_add_sample(&shard->stats, (double)(deadline - now) / 1000.0); + + ADD_TO_HASH_TABLE(timer); + if (deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { @@ -290,7 +418,10 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, timer->pending ? "true" : "false"); } + if (timer->pending) { + REMOVE_FROM_HASH_TABLE(timer); + GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); timer->pending = false; if (timer->heap_index == INVALID_HEAP_INDEX) { @@ -298,6 +429,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } else { grpc_timer_heap_remove(&shard->heap, timer); } + } else { + VALIDATE_NON_PENDING_TIMER(timer); } gpr_mu_unlock(&shard->mu); } @@ -382,6 +515,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { + REMOVE_FROM_HASH_TABLE(timer); GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_REF(error)); n++; } diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 72a4ac1f10..f0597f6ea0 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -29,6 +29,9 @@ struct grpc_timer { struct grpc_timer *next; struct grpc_timer *prev; grpc_closure *closure; +#ifndef NDEBUG + struct grpc_timer *hash_table_next; +#endif }; #endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */ diff --git a/src/core/lib/security/credentials/composite/composite_credentials.c b/src/core/lib/security/credentials/composite/composite_credentials.c index 09fd60a12c..b67ff48d0f 100644 --- a/src/core/lib/security/credentials/composite/composite_credentials.c +++ b/src/core/lib/security/credentials/composite/composite_credentials.c @@ -87,6 +87,7 @@ static bool composite_call_get_request_metadata( ctx->on_request_metadata = on_request_metadata; GRPC_CLOSURE_INIT(&ctx->internal_on_request_metadata, composite_call_metadata_cb, ctx, grpc_schedule_on_exec_ctx); + bool synchronous = true; while (ctx->creds_index < ctx->composite_creds->inner.num_creds) { grpc_call_credentials *inner_creds = ctx->composite_creds->inner.creds_array[ctx->creds_index++]; @@ -95,19 +96,12 @@ static bool composite_call_get_request_metadata( ctx->md_array, &ctx->internal_on_request_metadata, error)) { if (*error != GRPC_ERROR_NONE) break; } else { + synchronous = false; // Async return. break; } } - // If we got through all creds synchronously or we got a synchronous - // error on one of them, return synchronously. - if (ctx->creds_index == ctx->composite_creds->inner.num_creds || - *error != GRPC_ERROR_NONE) { - gpr_free(ctx); - return true; - } - // At least one inner cred is returning asynchronously, so we'll - // return asynchronously as well. - return false; + if (synchronous) gpr_free(ctx); + return synchronous; } static void composite_call_cancel_get_request_metadata( diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c index 73e0c23e0f..ee20241e3f 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.c +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c @@ -31,6 +31,9 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/validate_metadata.h" +grpc_tracer_flag grpc_plugin_credentials_trace = + GRPC_TRACER_INITIALIZER(false, "plugin_credentials"); + static void plugin_destruct(grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds) { grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds; @@ -53,6 +56,62 @@ static void pending_request_remove_locked( } } +// Checks if the request has been cancelled. +// If not, removes it from the pending list, so that it cannot be +// cancelled out from under us. +// When this returns, r->cancelled indicates whether the request was +// cancelled before completion. +static void pending_request_complete( + grpc_exec_ctx *exec_ctx, grpc_plugin_credentials_pending_request *r) { + gpr_mu_lock(&r->creds->mu); + if (!r->cancelled) pending_request_remove_locked(r->creds, r); + gpr_mu_unlock(&r->creds->mu); + // Ref to credentials not needed anymore. + grpc_call_credentials_unref(exec_ctx, &r->creds->base); +} + +static grpc_error *process_plugin_result( + grpc_exec_ctx *exec_ctx, grpc_plugin_credentials_pending_request *r, + const grpc_metadata *md, size_t num_md, grpc_status_code status, + const char *error_details) { + grpc_error *error = GRPC_ERROR_NONE; + if (status != GRPC_STATUS_OK) { + char *msg; + gpr_asprintf(&msg, "Getting metadata from plugin failed with error: %s", + error_details); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + } else { + bool seen_illegal_header = false; + for (size_t i = 0; i < num_md; ++i) { + if (!GRPC_LOG_IF_ERROR("validate_metadata_from_plugin", + grpc_validate_header_key_is_legal(md[i].key))) { + seen_illegal_header = true; + break; + } else if (!grpc_is_binary_header(md[i].key) && + !GRPC_LOG_IF_ERROR( + "validate_metadata_from_plugin", + grpc_validate_header_nonbin_value_is_legal(md[i].value))) { + gpr_log(GPR_ERROR, "Plugin added invalid metadata value."); + seen_illegal_header = true; + break; + } + } + if (seen_illegal_header) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Illegal metadata"); + } else { + for (size_t i = 0; i < num_md; ++i) { + grpc_mdelem mdelem = grpc_mdelem_from_slices( + exec_ctx, grpc_slice_ref_internal(md[i].key), + grpc_slice_ref_internal(md[i].value)); + grpc_credentials_mdelem_array_add(r->md_array, mdelem); + GRPC_MDELEM_UNREF(exec_ctx, mdelem); + } + } + } + return error; +} + static void plugin_md_request_metadata_ready(void *request, const grpc_metadata *md, size_t num_md, @@ -64,54 +123,24 @@ static void plugin_md_request_metadata_ready(void *request, NULL, NULL); grpc_plugin_credentials_pending_request *r = (grpc_plugin_credentials_pending_request *)request; - // Check if the request has been cancelled. - // If not, remove it from the pending list, so that it cannot be - // cancelled out from under us. - gpr_mu_lock(&r->creds->mu); - if (!r->cancelled) pending_request_remove_locked(r->creds, r); - gpr_mu_unlock(&r->creds->mu); - grpc_call_credentials_unref(&exec_ctx, &r->creds->base); + if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { + gpr_log(GPR_INFO, + "plugin_credentials[%p]: request %p: plugin returned " + "asynchronously", + r->creds, r); + } + // Remove request from pending list if not previously cancelled. + pending_request_complete(&exec_ctx, r); // If it has not been cancelled, process it. if (!r->cancelled) { - if (status != GRPC_STATUS_OK) { - char *msg; - gpr_asprintf(&msg, "Getting metadata from plugin failed with error: %s", - error_details); - GRPC_CLOSURE_SCHED(&exec_ctx, r->on_request_metadata, - GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg)); - gpr_free(msg); - } else { - bool seen_illegal_header = false; - for (size_t i = 0; i < num_md; ++i) { - if (!GRPC_LOG_IF_ERROR("validate_metadata_from_plugin", - grpc_validate_header_key_is_legal(md[i].key))) { - seen_illegal_header = true; - break; - } else if (!grpc_is_binary_header(md[i].key) && - !GRPC_LOG_IF_ERROR( - "validate_metadata_from_plugin", - grpc_validate_header_nonbin_value_is_legal( - md[i].value))) { - gpr_log(GPR_ERROR, "Plugin added invalid metadata value."); - seen_illegal_header = true; - break; - } - } - if (seen_illegal_header) { - GRPC_CLOSURE_SCHED( - &exec_ctx, r->on_request_metadata, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Illegal metadata")); - } else { - for (size_t i = 0; i < num_md; ++i) { - grpc_mdelem mdelem = grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_ref_internal(md[i].key), - grpc_slice_ref_internal(md[i].value)); - grpc_credentials_mdelem_array_add(r->md_array, mdelem); - GRPC_MDELEM_UNREF(&exec_ctx, mdelem); - } - GRPC_CLOSURE_SCHED(&exec_ctx, r->on_request_metadata, GRPC_ERROR_NONE); - } - } + grpc_error *error = + process_plugin_result(&exec_ctx, r, md, num_md, status, error_details); + GRPC_CLOSURE_SCHED(&exec_ctx, r->on_request_metadata, error); + } else if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { + gpr_log(GPR_INFO, + "plugin_credentials[%p]: request %p: plugin was previously " + "cancelled", + r->creds, r); } gpr_free(r); grpc_exec_ctx_finish(&exec_ctx); @@ -125,6 +154,7 @@ static bool plugin_get_request_metadata(grpc_exec_ctx *exec_ctx, grpc_closure *on_request_metadata, grpc_error **error) { grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds; + bool retval = true; // Synchronous return. if (c->plugin.get_metadata != NULL) { // Create pending_request object. grpc_plugin_credentials_pending_request *pending_request = @@ -142,12 +172,60 @@ static bool plugin_get_request_metadata(grpc_exec_ctx *exec_ctx, c->pending_requests = pending_request; gpr_mu_unlock(&c->mu); // Invoke the plugin. The callback holds a ref to us. + if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { + gpr_log(GPR_INFO, "plugin_credentials[%p]: request %p: invoking plugin", + c, pending_request); + } grpc_call_credentials_ref(creds); - c->plugin.get_metadata(c->plugin.state, context, - plugin_md_request_metadata_ready, pending_request); - return false; + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX]; + size_t num_creds_md = 0; + grpc_status_code status = GRPC_STATUS_OK; + const char *error_details = NULL; + if (!c->plugin.get_metadata(c->plugin.state, context, + plugin_md_request_metadata_ready, + pending_request, creds_md, &num_creds_md, + &status, &error_details)) { + if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { + gpr_log(GPR_INFO, + "plugin_credentials[%p]: request %p: plugin will return " + "asynchronously", + c, pending_request); + } + return false; // Asynchronous return. + } + // Returned synchronously. + // Remove request from pending list if not previously cancelled. + pending_request_complete(exec_ctx, pending_request); + // If the request was cancelled, the error will have been returned + // asynchronously by plugin_cancel_get_request_metadata(), so return + // false. Otherwise, process the result. + if (pending_request->cancelled) { + if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { + gpr_log(GPR_INFO, + "plugin_credentials[%p]: request %p was cancelled, error " + "will be returned asynchronously", + c, pending_request); + } + retval = false; + } else { + if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { + gpr_log(GPR_INFO, + "plugin_credentials[%p]: request %p: plugin returned " + "synchronously", + c, pending_request); + } + *error = process_plugin_result(exec_ctx, pending_request, creds_md, + num_creds_md, status, error_details); + } + // Clean up. + for (size_t i = 0; i < num_creds_md; ++i) { + grpc_slice_unref_internal(exec_ctx, creds_md[i].key); + grpc_slice_unref_internal(exec_ctx, creds_md[i].value); + } + gpr_free((void *)error_details); + gpr_free(pending_request); } - return true; + return retval; } static void plugin_cancel_get_request_metadata( @@ -159,6 +237,10 @@ static void plugin_cancel_get_request_metadata( c->pending_requests; pending_request != NULL; pending_request = pending_request->next) { if (pending_request->md_array == md_array) { + if (GRPC_TRACER_ON(grpc_plugin_credentials_trace)) { + gpr_log(GPR_INFO, "plugin_credentials[%p]: cancelling request %p", c, + pending_request); + } pending_request->cancelled = true; GRPC_CLOSURE_SCHED(exec_ctx, pending_request->on_request_metadata, GRPC_ERROR_REF(error)); diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.h b/src/core/lib/security/credentials/plugin/plugin_credentials.h index 57266d589a..f56df9eac5 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.h +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.h @@ -21,6 +21,8 @@ #include "src/core/lib/security/credentials/credentials.h" +extern grpc_tracer_flag grpc_plugin_credentials_trace; + struct grpc_plugin_credentials; typedef struct grpc_plugin_credentials_pending_request { diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 2366c24910..8fbde3d1b4 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -25,6 +25,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/credentials/plugin/plugin_credentials.h" #include "src/core/lib/security/transport/auth_filters.h" #include "src/core/lib/security/transport/secure_endpoint.h" #include "src/core/lib/security/transport/security_connector.h" @@ -84,4 +85,7 @@ void grpc_register_security_filters(void) { maybe_prepend_server_auth_filter, NULL); } -void grpc_security_init() { grpc_security_register_handshaker_factories(); } +void grpc_security_init() { + grpc_security_register_handshaker_factories(); + grpc_register_tracer(&grpc_plugin_credentials_trace); +} |