diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/channel/compress_filter.c | 12 | ||||
-rw-r--r-- | src/core/lib/channel/compress_filter.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.c | 17 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 29 | ||||
-rw-r--r-- | src/core/lib/support/string_util_win32.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/byte_buffer_reader.c | 19 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 32 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 2 |
8 files changed, 93 insertions, 22 deletions
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 5510c79b18..0e548c61b8 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -47,7 +47,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" -int grpc_compress_filter_trace = 0; +int grpc_compression_trace = 0; typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ @@ -171,7 +171,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, did_compress = grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { - if (grpc_compress_filter_trace) { + if (grpc_compression_trace) { char *algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; @@ -185,12 +185,14 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_swap(&calld->slices, &tmp); calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { - if (grpc_compress_filter_trace) { + if (grpc_compression_trace) { char *algo_name; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); - gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.", - algo_name); + gpr_log( + GPR_DEBUG, + "Algorithm '%s' enabled but decided not to compress. Input size: %d", + algo_name, calld->slices.length); } } diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h index cf5879d82e..0ce5d08837 100644 --- a/src/core/lib/channel/compress_filter.h +++ b/src/core/lib/channel/compress_filter.h @@ -38,7 +38,7 @@ #define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request" -extern int grpc_compress_filter_trace; +extern int grpc_compression_trace; /** Compression filter for outgoing data. * diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 2146c7dd1f..e451479073 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -39,6 +39,22 @@ #include "src/core/lib/profiling/timers.h" +bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) { + if (!exec_ctx->cached_ready_to_finish) { + exec_ctx->cached_ready_to_finish = exec_ctx->check_ready_to_finish( + exec_ctx, exec_ctx->check_ready_to_finish_arg); + } + return exec_ctx->cached_ready_to_finish; +} + +bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) { + return false; +} + +bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) { + return true; +} + #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; @@ -61,6 +77,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { } void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { + exec_ctx->cached_ready_to_finish = true; grpc_exec_ctx_flush(exec_ctx); } diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 976cc40347..9d47a262f8 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -53,6 +53,9 @@ typedef struct grpc_workqueue grpc_workqueue; * - track a list of work that needs to be delayed until the top of the * call stack (this provides a convenient mechanism to run callbacks * without worrying about locking issues) + * - provide a decision maker (via grpc_exec_ctx_ready_to_finish) that provides + * signal as to whether a borrowed thread should continue to do work or + * should actively try to finish up and get this thread back to its owner * * CONVENTIONS: * Instance of this must ALWAYS be constructed on the stack, never @@ -63,18 +66,26 @@ typedef struct grpc_workqueue grpc_workqueue; */ struct grpc_exec_ctx { grpc_closure_list closure_list; + bool cached_ready_to_finish; + void *check_ready_to_finish_arg; + bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; -#define GRPC_EXEC_CTX_INIT \ - { GRPC_CLOSURE_LIST_INIT } +#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ + { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check } #else struct grpc_exec_ctx { - int unused; + bool cached_ready_to_finish; + void *check_ready_to_finish_arg; + bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; -#define GRPC_EXEC_CTX_INIT \ - { 0 } +#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ + { false, finish_check_arg, finish_check } #endif +#define GRPC_EXEC_CTX_INIT \ + GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL) + /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. * Returns true if work was performed, false otherwise. */ @@ -86,6 +97,14 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, bool success, grpc_workqueue *offload_target_or_null); +/** Returns true if we'd like to leave this execution context as soon as + possible: useful for deciding whether to do something more or not depending + on outside context */ +bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx); +/** A finish check that is never ready to finish */ +bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored); +/** A finish check that is always ready to finish */ +bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored); /** Add a list of closures to be executed at the next flush/finish point. * Leaves \a list empty. */ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/support/string_util_win32.c b/src/core/lib/support/string_util_win32.c index f3cb0c050f..0d7bcdb5aa 100644 --- a/src/core/lib/support/string_util_win32.c +++ b/src/core/lib/support/string_util_win32.c @@ -83,7 +83,7 @@ char *gpr_format_message(int messageid) { DWORD status = FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + NULL, (DWORD)messageid, MAKELANGID(LANG_ENGLISH, SUBLANG_DEFAULT), (LPTSTR)(&tmessage), 0, NULL); if (status == 0) return gpr_strdup("Unable to retrieve error string"); message = gpr_tchar_to_char(tmessage); diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c index 809fd5f1fa..c97079f638 100644 --- a/src/core/lib/surface/byte_buffer_reader.c +++ b/src/core/lib/surface/byte_buffer_reader.c @@ -62,12 +62,19 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, case GRPC_BB_RAW: gpr_slice_buffer_init(&decompressed_slices_buffer); if (is_compressed(reader->buffer_in)) { - grpc_msg_decompress(reader->buffer_in->data.raw.compression, - &reader->buffer_in->data.raw.slice_buffer, - &decompressed_slices_buffer); - reader->buffer_out = - grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, - decompressed_slices_buffer.count); + if (grpc_msg_decompress(reader->buffer_in->data.raw.compression, + &reader->buffer_in->data.raw.slice_buffer, + &decompressed_slices_buffer) == 0) { + gpr_log(GPR_ERROR, + "Unexpected error decompressing data for algorithm with enum " + "value '%d'. Reading data as if it were uncompressed.", + reader->buffer_in->data.raw.compression); + reader->buffer_out = reader->buffer_in; + } else { /* all fine */ + reader->buffer_out = + grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, + decompressed_slices_buffer.count); + } gpr_slice_buffer_destroy(&decompressed_slices_buffer); } else { /* not compressed, use the input buffer as output */ reader->buffer_out = reader->buffer_in; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9b2b94eedf..c8728fa278 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -261,6 +261,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, call->channel = channel; call->cq = cq; call->parent = parent_call; + /* Always support no compression */ + GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = server_transport_data == NULL; if (call->is_client) { GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); @@ -408,6 +410,7 @@ static void set_status_code(grpc_call *call, status_source source, static void set_compression_algorithm(grpc_call *call, grpc_compression_algorithm algo) { + GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); call->compression_algorithm = algo; } @@ -828,12 +831,16 @@ static uint32_t decode_status(grpc_mdelem *md) { return status; } -static uint32_t decode_compression(grpc_mdelem *md) { +static grpc_compression_algorithm decode_compression(grpc_mdelem *md) { grpc_compression_algorithm algorithm = grpc_compression_algorithm_from_mdstr(md->value); if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); + gpr_log(GPR_ERROR, + "Invalid incoming compression algorithm: '%s'. Interpreting " + "incoming data as uncompressed.", + md_c_str); + return GRPC_COMPRESS_NONE; } return algorithm; } @@ -1087,6 +1094,24 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_initial_filter, call); + /* make sure the received grpc-encoding is amongst the ones listed in + * grpc-accept-encoding */ + + GPR_ASSERT(call->encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->encodings_accepted_by_peer, + call->compression_algorithm)) { + extern int grpc_compression_trace; + if (grpc_compression_trace) { + char *algo_name; + grpc_compression_algorithm_name(call->compression_algorithm, + &algo_name); + gpr_log(GPR_ERROR, + "Compression algorithm (grpc-encoding = '%s') not present in " + "the bitset of accepted encodings (grpc-accept-encodings: " + "'0x%x')", + algo_name, call->encodings_accepted_by_peer); + } + } if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { @@ -1474,7 +1499,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_call_error err; GRPC_API_TRACE( - "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", + "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, " + "reserved=%p)", 5, (call, ops, (unsigned long)nops, tag, reserved)); if (reserved != NULL) { diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 57c6897626..1c8b709015 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -164,7 +164,7 @@ void grpc_init(void) { grpc_register_tracer("channel_stack_builder", &grpc_trace_channel_stack_builder); grpc_register_tracer("http1", &grpc_http1_trace); - grpc_register_tracer("compression", &grpc_compress_filter_trace); + grpc_register_tracer("compression", &grpc_compression_trace); grpc_security_pre_init(); grpc_iomgr_init(); grpc_executor_init(); |